您的位置:首页 > 运维架构

zookeeper的分布式锁的实现

2016-11-03 14:05 239 查看
分布式锁能够在一组进程之间提供互斥机制,使得在任何时刻都只有一个进程可以持有锁,分布式锁可以用在大型分布式系统中实现领导者选举,在任何时间点,持有锁的那个进程就是系统的领导者

分布式锁的思路:首先需要指定一个groupZnode作为锁,通常用它来描述被锁定的实体,称为/lock;然后希望获得锁的客户端创建一些短暂顺序znode,作为锁znode的子节点。作为子节点,在任何时刻,顺序号最小的客户端持有锁。例如:两个客户端同时创建znode,分别为/lock/sub_01,/lock/sub_02,那么创建/lock/sub_01的客户端将会持有锁,因为他的znode顺序号最小。zookeeper服务是顺序的仲裁者,因为他负责分配顺序号。

通过删除znode  /lock/sub_01即可简单地将锁释放;另外,如果客户端进程死亡,对应的短暂znode也会被删除。接下来,创建/lock/sub_02的客户端持有锁,因为他的顺序号紧跟前一个。通过在创建客户端的时候添加观察,可以使得客户端在获得锁的时候得到通知。

下面是实现分布式锁的样例代码:

emptypackage com.wangl.hadoop.zookeeper.learn;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public abstract class DistributeLock {
private static final int SESSION_TIMEOUT = 5000;
private static final String HOSTS = "host1:2181,host2:2181,host3:2181";
protected String groupNode = "lock";
private String subNode = "sub";
protected ZooKeeper zk;
protected String thisPath;
private String waitPath;
private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect() throws IOException, InterruptedException, KeeperException{
zk = new ZooKeeper(HOSTS, SESSION_TIMEOUT, new Watcher(){

public void process(WatchedEvent event) {
if(event.getState() == KeeperState.SyncConnected){
connectedSignal.countDown();
}
if(event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)){
//如果发生非人为的异常而导致客户端中断连接从而使得其下一个客户端会过早的获得锁
//为了防止这种情况的发生,在节点删除事件被触发的时候,在进行一次检测,如果该节点的角标不为0,则重新设置waitPath
//这样就避免了这种冲突的发生。
try {
List children = zk.getChildren("/"+groupNode, false);
if(children.isEmpty()){
System.out.println(groupNode + "is empty");
return;
}
Collections.sort(children);
String thisNode = thisPath.substring(("/"+groupNode+"/").length());
int index = children.indexOf(thisNode);
if(index == 0){
doSomething();
}
else{
//如果角标不为0,说明其上个节点发生了异常导致触发该事件,则重新设置waitPath
waitPath = "/" + groupNode +"/" + children.get(index - 1);
//对新设置的waitPath添加观察
Stat stat = zk.exists(waitPath, true);
if(stat == null){
doSomething();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

});

connectedSignal.await();

thisPath = zk.create("/"+groupNode+"/"+subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List children = zk.getChildren("/"+groupNode, false);
Collections.sort(children);
String thisNode = thisPath.substring(("/"+groupNode+"/").length());
int index = children.indexOf(thisNode);
if(index == -1){}
if(index == 0){
doSomething();
}else{
waitPath = "/" + groupNode +"/" + children.get(index - 1);
zk.exists(waitPath, true);
}

}
//可以定义成为抽象方法,子类可以设置客户端要做的事
protected abstract void doSomething();

public static void main(String args[]) throws InterruptedException{
for(int i=0;i<10;i++){
new Thread(new Runnable(){

public void run() {
try {
DistributeLock lock = new SetDataDistributeLock();
lock.connect();
} catch (Exception e) {
e.printStackTrace();
}

}

}).start();
}
Thread.sleep(Long.MAX_VALUE);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  分布式 zookeeper hadoop