您的位置:首页 > 其它

zookeeper学习__分布式共享锁简单Demo

2017-07-09 23:13 453 查看
zookeeper是一个分布式协调服务,为用户的分布式应用提供协调服务。使用范围有:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……

文章利用zookeeper实现简单的分布式锁

zookeeper虽然提供的服务很多,但底层只有两个功能

1.管理程序提交的数据;

2.为程序提供数据节点监听服务

分布式锁的简单实现

分布式系统多个应用需要操作同一个资源,为资源操作线程安全问题,创建分布式锁,应用取得锁才能操作资源,否则阻塞直到分布锁到该应用节点时才能操作资源。

思路:多个应用在zookeeper上注册,zookeeper采用排序最小的分配锁,如果是最小就获取锁并操作数据,然后删除该节点。

zookeeper集群安装

zookeeper超过半数以上集群便可存活,因此适合安装奇数节个集群。

官网上下载zookeeper-3.x.x.tar.gz包上传到linux虚拟机中解压,在zoo.cfg文件中配置各集群节点的ip和端口。

# zoo.cfg配置
server.1=192.168.10.121:2888:3888
server.2=192.168.10.122:2888:3888
server.3=192.168.10.123:2888:3888


依次启动各群集节点的zookeeper:
cd /usr/local/zookeeper
./bin zkServer.sh start
启动zookeeper服务

Java实现

通过连接集群的Zookeeper对象操作节点,主要方法有create/delete/exists,创建zookeeper时可创建监听对象及监听回调方法,当节点状态改变时则触发事件(监听仅一次有效),因此可重新通过get(… true)使监听方法激活为true.

public class DistributedClient {

//zookeeper主机
private String hosts = "192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181";

//超时时间
private int SESSION_TIME_OUT = 2000;

private String subNode = "/subNode";
private String groupNode = "/group";

private ZooKeeper zk;
private volatile String thisPath; //当前节点

public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIME_OUT, new Watcher() {
//监听回调
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged
&& groupNode.equals(event.getPath())) {
//获取子节点,并对父节点进行监听
try {
List<String> childrenList = zk.getChildren(groupNode, true);
String thisNode = thisPath.substring((groupNode + "/").length());
//比较自己是否是最小的节点
Collections.sort(childrenList);
if (0 == childrenList.indexOf(thisNode)) {
doSomething();  //特定的业务逻辑,用完锁之后需要删除
//重新注册一次锁, 带序号的锁
thisPath = zk.create(groupNode + subNode,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
} catch (Exception e) {
System.err.println("zookeeper is error");
}

}
}

});

if(null == zk.exists(groupNode, true)) {
zk.create(groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //父节点必须为persistent才可创建子节点
}
//初始化创建锁
thisPath = zk.create(groupNode + subNode,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(1000);
List<String> childrenNodes = zk.getChildren(groupNode, true);
if (1 == childrenNodes.size()) {
doSomething();
thisPath = zk.create(groupNode + subNode,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}

/**
* 特写业务实现
* @throws Exception
* @throws InterruptedException
*/
private void doSomething() throws Exception {
System.out.println("get this lock" + thisPath);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("finish use this lock, will to release: " + thisPath);
zk.delete(this.thisPath, -1); //-1表示不指定版本
}

}

@Test
public void testNode() {
try {
for (int i = 0; i < 5; i++)
{
new Thread() {
public void run() {
try {
DistributedClient dl = new DistributedClient();
dl.connectZookeeper();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();

}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
System.err.println("zookeeper connect error");
e.printStackTrace();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: