Zookeeper实现分布式锁
2017-03-14 17:54
239 查看
1.实现步骤
首先要创建一个锁的根节点,比如/mylock。想要获取锁的客户端在锁的根节点下面创建znode,作为/mylock的子节点,节点的类型要选择 CreateMode.PERSISTENT_SEQUENTIAL,节点的名字”lock-“,假设目前同时有3个客户端想要获得锁,那么/mylock下的目录应该是这个样子的。
lock-0000000001,lock-0000000002,lock-0000000003
“lock-“是前缀 , 0000000001,0000000002,0000000003 是zk服务端自动生成的自增数字。
当前客户端通过getChildren(/mylock)获取所有子节点列表并根据自增数字排序,然后判断一下自己创建的节点的顺序是不是在列表当中最小的,如果是 那么获取到锁,如果不是,那么获取自己的前一个节点,并设置监听这个节点的变化,当节点变化时重新执行步骤3 直到自己是编号最小的一个为止。
举例:假设当前客户端创建的节点是0000000002,因为它的编号不是最小的,所以获取不到锁,那么它就找到它前面的一个节点0000000001 并对它设置监听。
释放锁,当前获得锁的客户端在操作完成后删除自己创建的节点,这样会触发zk的事件给其它客户端知道,这样其它客户端会重新执行(步骤3)。
举例:加入客户端0000000001获取到锁,然后客户端0000000002加入进来获取锁,发现自己不是编号最小的,那么它会监听它前面节点的事件(0000000001的事件)然后执行步骤(3),当客户端0000000001操作完成后删除自己的节点,这时zk服务端会发送事件,这时客户端0000000002会接收到该事件,然后重复步骤3直到获取到锁)
2.实现代码
package com.zookeeper.base; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class ZookeeperClient { // 超时时间 private static final int SESSION_TIMEOUT = 5000; // server列表 // private static final String HOSTS = "192.168.223.144:2181,192.168.223.145:2181,192.168.223.146:2181"; private static final String HOSTS = "127.0.0.1:2181"; private String groupNode = "mylock"; private String subNode = "lock-"; private ZooKeeper zk; // 当前client创建的子节点 private String thisPath; // 当前client等待的子节点 private String waitPath; // 多线程工具类 private CountDownLatch cdl = new CountDownLatch(1); public void connectZookeeper() throws Exception { zk = new ZooKeeper(HOSTS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { try{ if(event.getState() == KeeperState.SyncConnected) { if(EventType.None == event.getType()) { System.out.println("建立连接..."); cdl.countDown(); } // 如果检测到节点删除, 并且是waitPath节点 else if(EventType.NodeDeleted == event.getType() && event.getPath().equals(waitPath)) { // 确认thisPath是否真的是列表中最小节点 List<String> childrenNodes = zk.getChildren("/"+groupNode, false); String thisNode = thisPath.substring(("/"+groupNode+"/").length()); // 排序 Collections.sort(childrenNodes); int index = childrenNodes.indexOf(thisNode); if(index == 0) { // 确定是最小节点,进行业务处理 doSomething(); } else { // 说明waitPath由于异常挂掉 // 更新waitPath waitPath = "/" + groupNode + "/" +childrenNodes.get(index-1); // 如果存在waitPath节点重新注册监听, // 如果判断waitPath已经被删除就执行业务 if(zk.exists(waitPath, true) == null) { doSomething(); } } } } }catch(Exception e) { e.printStackTrace(); } } }); // 阻塞等待连接建立 cdl.await(); // 创建父节点,不能直接创建子节点 if(zk.exists("/"+groupNode, false) == null) { zk.create("/"+groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建子节点 thisPath = zk.create("/"+groupNode+"/"+subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // wait一会让结果更清晰 Thread.sleep(100); // 注意没有必要监听"/locks"子节点的变化情况 List<String> childrenNodes = zk.getChildren("/"+groupNode, false); if(childrenNodes.size() == 1) { doSomething(); } else { String thisNode = this.thisPath.substring(("/"+groupNode+"/").length()); // 排序 Collections.sort(childrenNodes); int index = childrenNodes.indexOf(thisNode); if(index == -1) { // never happend } else if (index == 0) { // 说明thisNode是最小的,当前client获取锁 doSomething(); } else { // 获取排名比thisPath前一位节点 this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index-1); // 在waitPath注册监听事件, 当wait被删除, zookeeper会回调监听器process方法 zk.getData(waitPath, true, new Stat()); } } } private void doSomething() throws Exception { try { System.out.println("gain lock:"+this.thisPath); // 模拟做一些业务操作 Thread.sleep(2000); }catch(Exception e) { e.printStackTrace(); }finally { // 业务操作之后释放节点,否则后续节点不会被提醒 System.out.println("finshed:"+thisPath); // 将thisPath删除, 监听thisPath的client将获得通知 // 相当于释放锁 zk.delete(this.thisPath, -1); } } public static void main(String[] args) throws Exception { for(int i=0;i<10;i++) { new Thread(){ @Override public void run() { try{ ZookeeperClient zc = new ZookeeperClient(); zc.connectZookeeper(); }catch(Exception e){ e.printStackTrace(); } } }.start(); } Thread.sleep(Integer.MAX_VALUE); } }
相关文章推荐
- 关于如何用Zookeeper实现分布式锁机制
- Zookeeper实现分布式共享锁
- 利用curator实现的zookeeper分布式锁服务
- zookeeper实现分布式的思路
- zookeeper 实现分布式锁
- 巧用zookeeper实现分布式并行计算 - J2EE企业应用 顾问/咨询 Java传教士 -H.E.'s Blog
- Dubbo框架应用之(四)--Dubbo基于Zookeeper实现分布式实例
- 使用zookeeper实现分布式锁
- 使用 ZooKeeper 实现分布式锁
- Zookeeper实现分布式锁 分类: hadoop Java 2015-06-25 22:38 68人阅读 评论(0) 收藏
- 基于zookeeper实现的分布式锁
- ZooKeeper实现分布式队列Queue
- 巧用zookeeper实现分布式并行计算
- 用zookeeper实现分布式session
- ZooKeeper实现分布式队列Queue
- Zookeeper分布式锁(多进程竞争)实现的代码示例分享
- 使用zookeeper实现分布式共享锁
- ZooKeeper实现分布式队列Queue
- 利用curator实现的zookeeper分布式锁服务
- Zookeeper实现分布式锁