您的位置:首页 > 其它

ZooKeeper实现分布式锁

2015-05-08 10:01 281 查看
共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。



图 4. Zookeeper 实现 Locks 的流程图

实现原理的另外一种描述:

客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。

客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。

客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。

如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

问题所在

上面这个分布式锁的实现中,大体能够满足了一般的分布式集群竞争锁的需求。这里说的一般性场景是指集群规模不大,一般在10台机器以内。

不过,细想上面的实现逻辑,我们很容易会发现一个问题,步骤4,“即获取所有的子点,判断自己创建的节点是否已经是序号最小的节点”,这个过程,在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知——这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。而这个问题的根源在于,没有找准客户端真正的关注点。

我们再来回顾一下上面的分布式锁竞争过程,它的核心逻辑在于:判断自己是否是所有节点中序号最小的。于是,很容易可以联想的到的是,每个节点的创建者只需要关注比自己序号小的那个节点。

改进后的分布式锁实现

下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”locknode”节点下序号比自己小的那个节点是否存在即可。实现如下:

客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。

客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。

客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。

如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。

之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“locknode”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。

下面是改进后的一种实现:

public class DistributedLock implements Watcher {
private static final String hostPort = "localhost:2181";
private ZooKeeper zk = null;
private String root;
private Object mutex = null;

public DistributedLock(String root) {
this.root = root;
try {
this.zk = new ZooKeeper(hostPort, 10 * 1000, this);
Stat stat = zk.exists(root, true);
mutex = new Date();
if (null == stat) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
//释放锁的时候会删除临时节点,所以此处只监听删除节点的事件,以此减少客户端线程被notify的次数。
if (event.getType().equals(Event.EventType.NodeDeleted)) {
System.out.println(Thread.currentThread().getName() + "-- event -- " + JSON.toJSONString(event));
try {
synchronized (mutex) {
mutex.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public String lock() throws Exception {
String name = "default";
String response = null;
for (;;) {
synchronized(mutex) {
//获取所有的子节点并排序
List<String> children = zk.getChildren(root, false);
String[] nodes = children.toArray(new String[children.size()]);
Arrays.sort(nodes);
System.out.println(JSON.toJSONString(nodes) + "--" + Thread.currentThread().getName());
if (nodes.length > 0) {
//如果当前节点是子节点中最小的节点,那么当前节点获得锁。
if ((root + "/" + nodes[0]).equals(response)) {
System.out.println(Thread.currentThread().getName() + "-- get lock");
return response;
} else {
//如果当前节点不存在,则创建
if (null == response) {
response = zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "-- create znode response " + response);
}

//此处监听所有小于当前节点的节点中最大的那个,以此减少客户端收到watcher event的次数,也可以减轻服务器端压力,避免羊群效应。(此处是一个优化,不然的话都监听当前持有锁的节点。)
String responseNode = response.substring(root.length() + 1 );
String lowerNode = null;
for (int i = 0; i < nodes.length; i++) {
if (nodes[i].equals(responseNode)) {
lowerNode = nodes[i - 1];
}
}

Stat stat = zk.exists(root + "/" + lowerNode, true);
if (null == stat) {
//如果监听的节点不存在,继续!
System.out.println(Thread.currentThread().getName() + "-- null " + lowerNode);
continue;
} else {
System.out.println(Thread.currentThread().getName() + "-- listener " + lowerNode);
mutex.wait();
System.out.println(Thread.currentThread().getName() + "-- continue ");
}
}
} else {
//如果当前没有任何子节点,那么创建。
response = zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "-- create znode");
System.out.println(Thread.currentThread().getName() + "-- create znode response " + response);
}
}
}
}

public void unlock(String name) throws Exception {
synchronized (mutex) {
zk.delete(name, -1);
System.out.println(Thread.currentThread().getName() + "-- unlock -- " + name );
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: