zookeeper实现分布式锁
2017-12-19 15:20
344 查看
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
利用临时顺序节点实现共享锁的一般做法
首先介绍一下,Zookeeper中有一种节点叫做顺序节点,故名思议,假如我们在/lock/目录下创建节3个点,ZooKeeper集群会按照提起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。
ZooKeeper中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZooKeeper集群断开连接,则开节点自动被删除。
利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:
客户端调用create()方法创建名为“locknode/”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。
客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。
释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可
测试类:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
private static final String ROOT_LOCKS="/LOCKS";//根节点 private static int number = 10; private static Random random=new Random(); public static void main(String[] args) throws InterruptedException { sharedLock(); //testReadWriterLock(); } private static int getNumber() throws InterruptedException { //TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); return number--; } /** * 全局同步分布式锁, 同一时间多台机器只有一台能获得同一把锁. * InterProcessMutex、InterProcessSemaphoreMutex * @throws InterruptedException */ private static void sharedLock() throws InterruptedException { InterProcessMutex interProcessMutex = new InterProcessMutex(CuratorClientUtils.getCuratorFramework(), ROOT_LOCKS); final CountDownLatch countdown = new CountDownLatch(1); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { countdown.await(50,TimeUnit.MILLISECONDS); //加锁 interProcessMutex.acquire(); //-------------业务处理开始 int number = getNumber(); System.out.println(number); //-------------业务处理结束 } catch (Exception e) { e.printStackTrace(); } finally { try { //释放 System.out.println(Thread.currentThread().getName() + " 是否获取锁 " + interProcessMutex.isAcquiredInThisProcess()); if(interProcessMutex.isAcquiredInThisProcess()) {//判断是否持有锁 进而进行锁是否释放的操作 interProcessMutex.release(); } } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start(); } Thread.sleep(100); countdown.countDown(); }
public class CuratorClientUtils { private static String ZK_URL = "192.168.1.45:2181,192.168.1.46:2181,192.168.1.41:2181"; private static CuratorFramework curatorFramework = null; public static CuratorFramework getCuratorFramework(){ if (curatorFramework == null){ curatorFramework = CuratorFrameworkFactory.newClient(ZK_URL, 5000,5000, new ExponentialBackoffRetry(1000,5)); } curatorFramework.start(); return curatorFramework; } }
利用临时顺序节点实现共享锁的一般做法
首先介绍一下,Zookeeper中有一种节点叫做顺序节点,故名思议,假如我们在/lock/目录下创建节3个点,ZooKeeper集群会按照提起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。
ZooKeeper中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZooKeeper集群断开连接,则开节点自动被删除。
利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:
客户端调用create()方法创建名为“locknode/”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。
客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。
释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
public class ZookeeperClient { private static String ZK_URL = "192.168.1.45:2181,192.168.1.46:2181,192.168.1.41:2181"; private static CountDownLatch countDownLatch=new CountDownLatch(1); private static ZooKeeper zooKeeper = null; private static int sessionTimeout=6000; private ZookeeperClient(){} private static class MyZookeeperClient{ public static ZooKeeper getZooKeeper(){ try { if (zooKeeper==null){ zooKeeper = new ZooKeeper(ZK_URL, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } } }); countDownLatch.await(); } }catch (Exception e){ e.printStackTrace(); } return zooKeeper; } } public static ZooKeeper getInstance(){ return MyZookeeperClient.getZooKeeper(); } /** * Getter method for property <tt>sessionTimeout</tt> * * @return property value of sessionTimeout */ public static int getSessionTimeout() { return sessionTimeout; } }
public class LockWatcher implements Watcher { private CountDownLatch countDownLatch; public LockWatcher(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted){ countDownLatch.countDown(); } } }
public class DistributeLock { private ZooKeeper zooKeeper; private static final String ROOT_LOCKS="/LOCKS";//根节点 private final static byte[] data={1,2,3}; //节点的数据 private CountDownLatch countDownLatch = new CountDownLatch(1); private String lockId; public DistributeLock(){ this.zooKeeper = ZookeeperClient.getInstance(); } public boolean getLock(){ try { //创建临时有序节点 lockId = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+"->成功创建了lock节点["+lockId+"], 开始去竞争锁"); //获取根节点所有子节点 List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true); TreeSet<String> orderNodes = new TreeSet<String>(); for (String childrenNode : childrenNodes) { orderNodes.add(ROOT_LOCKS+"/"+childrenNode); } //取出最小的节点 String first = orderNodes.first(); //对比,如果跟lockId 相等,当前对象拿到锁 if (lockId.equals(first)){ System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockId+"]"); return true; } SortedSet<String> headSet = orderNodes.headSet(lockId); if (!CollectionUtils.isEmpty(headSet)){ //获取当前节点的 前面的节点 String last = headSet.last(); //注册删除事件 zooKeeper.exists(last,new LockWatcher(countDownLatch)); countDownLatch.await(ZookeeperClient.getSessionTimeout(), TimeUnit.MILLISECONDS); System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockId+"]"); return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public boolean unlock(){ System.out.println(Thread.currentThread().getName()+"->开始释放锁:["+lockId+"]"); try { zooKeeper.delete(lockId,-1); System.out.println("节点["+lockId+"]成功被删除"); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false; } }
测试类:
public class DistributeLockTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); Random random=new Random(); for(int i=0;i<10;i++){ new Thread(()->{ DistributeLock lock=null; try { lock=new DistributeLock(); boolean lock1 = lock.getLock(); System.out.println("*****************"+lock1); Thread.sleep(random.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); }finally { if(lock!=null){ lock.unlock(); } } }).start(); } } }
相关文章推荐
- Dubbo框架应用之(四)--Dubbo基于Zookeeper实现分布式实例
- [分布式] 使用 Dubbo、ZooKeeper 实现集群负载均衡
- 基于zookeeper简单实现分布式锁
- 【JEECG Dubbo专题】Dubbo+Zookeeper+Spring整合应用篇-Dubbo基于Zookeeper实现分布式服务(二)
- AAA【Dubbo实战】 Dubbo+Zookeeper+Spring整合应用篇-Dubbo基于Zookeeper实现分布式服务(二)
- zookeeper第三方库curator即可方便地实现分布式锁
- ZooKeeper 实现分布式队列
- zookeeper-分布式锁的代码实现-【每日五分钟搞定大数据】
- 使用 ZooKeeper 实现分布式锁
- 使用ZooKeeper实现Java跨JVM的分布式锁
- Dubbo基于Zookeeper实现分布式服务:Dubbo+Zookeeper+Spring整合应用
- zookeeper实现分布式锁
- 【转载】zookeeper 分布式锁 实现
- 利用ZooKeeper简单实现分布式锁
- zookeeper适用场景:分布式锁实现
- 【Dubbo实战】 Dubbo+Zookeeper+Spring整合应用篇-Dubbo基于Zookeeper实现分布式服务(二)
- zookeeper实现分布式的思路
- 使用zookeeper实现分布式锁
- Dubbo框架应用之(四)--Dubbo基于Zookeeper实现分布式实例
- Zookeeper系列四:Zookeeper在大型分布式系统中的应用、Zookeeper实现分布式锁