您的位置:首页 > 其它

Zookeeper实现分布式锁

2017-03-14 17:54 239 查看

1.实现步骤

首先要创建一个锁的根节点,比如/mylock。

想要获取锁的客户端在锁的根节点下面创建znode,作为/mylock的子节点,节点的类型要选择 CreateMode.PERSISTENT_SEQUENTIAL,节点的名字”lock-“,假设目前同时有3个客户端想要获得锁,那么/mylock下的目录应该是这个样子的。

lock-0000000001,lock-0000000002,lock-0000000003

“lock-“是前缀 , 0000000001,0000000002,0000000003 是zk服务端自动生成的自增数字。

当前客户端通过getChildren(/mylock)获取所有子节点列表并根据自增数字排序,然后判断一下自己创建的节点的顺序是不是在列表当中最小的,如果是 那么获取到锁,如果不是,那么获取自己的前一个节点,并设置监听这个节点的变化,当节点变化时重新执行步骤3 直到自己是编号最小的一个为止。

举例:假设当前客户端创建的节点是0000000002,因为它的编号不是最小的,所以获取不到锁,那么它就找到它前面的一个节点0000000001 并对它设置监听。

释放锁,当前获得锁的客户端在操作完成后删除自己创建的节点,这样会触发zk的事件给其它客户端知道,这样其它客户端会重新执行(步骤3)。

举例:加入客户端0000000001获取到锁,然后客户端0000000002加入进来获取锁,发现自己不是编号最小的,那么它会监听它前面节点的事件(0000000001的事件)然后执行步骤(3),当客户端0000000001操作完成后删除自己的节点,这时zk服务端会发送事件,这时客户端0000000002会接收到该事件,然后重复步骤3直到获取到锁)

2.实现代码

package com.zookeeper.base;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZookeeperClient {

// 超时时间
private static final int SESSION_TIMEOUT = 5000;
// server列表
//  private static final String HOSTS = "192.168.223.144:2181,192.168.223.145:2181,192.168.223.146:2181";
private static final String HOSTS = "127.0.0.1:2181";
private String groupNode = "mylock";
private String subNode = "lock-";
private ZooKeeper zk;
// 当前client创建的子节点
private String thisPath;
// 当前client等待的子节点
private String waitPath;
// 多线程工具类
private CountDownLatch cdl = new CountDownLatch(1);

public void connectZookeeper() throws Exception {
zk = new ZooKeeper(HOSTS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
try{
if(event.getState() == KeeperState.SyncConnected) {
if(EventType.None == event.getType()) {
System.out.println("建立连接...");
cdl.countDown();
}
// 如果检测到节点删除, 并且是waitPath节点
else if(EventType.NodeDeleted == event.getType() &&
event.getPath().equals(waitPath)) {
// 确认thisPath是否真的是列表中最小节点
List<String> childrenNodes = zk.getChildren("/"+groupNode, false);
String thisNode = thisPath.substring(("/"+groupNode+"/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if(index == 0) {
// 确定是最小节点,进行业务处理
doSomething();
} else {
// 说明waitPath由于异常挂掉
// 更新waitPath
waitPath = "/" + groupNode + "/" +childrenNodes.get(index-1);
// 如果存在waitPath节点重新注册监听,
// 如果判断waitPath已经被删除就执行业务
if(zk.exists(waitPath, true) == null) {
doSomething();
}
}
}
}
}catch(Exception e) {
e.printStackTrace();
}
}
});
// 阻塞等待连接建立
cdl.await();
// 创建父节点,不能直接创建子节点
if(zk.exists("/"+groupNode, false) == null) {
zk.create("/"+groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建子节点
thisPath = zk.create("/"+groupNode+"/"+subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一会让结果更清晰
Thread.sleep(100);
// 注意没有必要监听"/locks"子节点的变化情况
List<String> childrenNodes = zk.getChildren("/"+groupNode, false);
if(childrenNodes.size() == 1) {
doSomething();
} else {
String thisNode = this.thisPath.substring(("/"+groupNode+"/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if(index == -1) {
// never happend
} else if (index == 0) {
// 说明thisNode是最小的,当前client获取锁
doSomething();
} else {
// 获取排名比thisPath前一位节点
this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index-1);
// 在waitPath注册监听事件, 当wait被删除, zookeeper会回调监听器process方法
zk.getData(waitPath, true, new Stat());
}
}
}

private void doSomething() throws Exception {
try {
System.out.println("gain lock:"+this.thisPath);
// 模拟做一些业务操作
Thread.sleep(2000);

}catch(Exception e) {
e.printStackTrace();
}finally { // 业务操作之后释放节点,否则后续节点不会被提醒
System.out.println("finshed:"+thisPath);
// 将thisPath删除, 监听thisPath的client将获得通知
// 相当于释放锁
zk.delete(this.thisPath, -1);
}
}

public static void main(String[] args) throws Exception {
for(int i=0;i<10;i++) {
new Thread(){
@Override
public void run() {
try{
ZookeeperClient zc = new ZookeeperClient();
zc.connectZookeeper();
}catch(Exception e){
e.printStackTrace();
}
}
}.start();
}
Thread.sleep(Integer.MAX_VALUE);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper 分布式