使用Zookeeper实现分布式锁
2016-12-21 00:16
495 查看
Zookeeper的一个典型应用场景就是分布式锁,锁的实现是利用Zookeeper创建的临时时序节点(创建的时候CreateMode为EPHEMERAL_SEQUENTIAL)和节点变动的监听器实现的。时序节点保证了节点的创建在分布式系统情况下还是有先后顺序的,监听器使得客户端能感受到节点的变动情况。
具体步骤
1、创建一个永久性节点,作锁的根目录2、当要获取一个锁时,在锁目录下创建一个临时有序列的节点
3、检查锁目录的子节点是否有序列比它小,若有则监听比它小的上一个节点,当前锁处于等待状态
4、当等待时间超过Zookeeper session的连接时间(sessionTimeout)时,当前session过期,Zookeeper自动删除此session创建的临时节点,等待状态结束,获取锁失败
5、当监听器触发时,等待状态结束,获得锁
流程图
详细代码
ZookeeperClient工具类
为了方便取得Zookeeper实例,先实行一个获取Zookeeper工具类 ZookeeperClient:package org.massive.lock; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * Created by Massive on 2016/12/18. */ public class ZookeeperClient { private static String connectionString = "localhost:2181"; private static int sessionTimeout = 10000; public static ZooKeeper getInstance() throws IOException, InterruptedException { //-------------------------------------------------------------- // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss) // 这里等Zookeeper的连接完成才返回实例 //-------------------------------------------------------------- final CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(sessionTimeout,TimeUnit.MILLISECONDS); return zk; } public static String getConnectionString() { return connectionString; } public static void setConnectionString(String connectionString) { ZookeeperClient.connectionString = connectionString; } public static int getSessionTimeout() { return sessionTimeout; } public static void setSessionTimeout(int sessionTimeout) { ZookeeperClient.sessionTimeout = sessionTimeout; } }
DistributedLock
package org.massive.lock; import org.apache.commons.lang3.RandomUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by massive on 2016-12-15. */ public class DistributedLock { private String lockId; private static final String LOCK_ROOT = "/LOCKS"; //-------------------------------------------------------------- // data为存储的节点数据内容 // 由于锁机制用的是序列功能的特性,data的值不重要,只要利于网络传输即可 //-------------------------------------------------------------- private final static byte[] data = {0x12, 0x34}; private final CountDownLatch latch = new CountDownLatch(1); private ZooKeeper zk; private int sessionTimeout; public DistributedLock(ZooKeeper zk,int sessionTimeout) { this.zk = zk; this.sessionTimeout = sessionTimeout; } public DistributedLock() throws IOException, KeeperException, InterruptedException { this.zk = ZookeeperClient.getInstance(); this.sessionTimeout = ZookeeperClient.getSessionTimeout(); } class LockWatcher implements Watcher { @Override public void process(WatchedEvent event) { //-------------------------------------------------------------- // 监控节点变化(本程序为序列的上一节点) // 若为节点删除,证明序列的上一节点已删除,此时释放阀门让当前的lock获得锁 //-------------------------------------------------------------- if (event.getType() == Event.EventType.NodeDeleted) latch.countDown(); } } /** * @return * @throws KeeperException * @throws InterruptedException */ public synchronized boolean lock() { //-------------------------------------------------------------- // 保证锁根节点存在,若不存在则创建它 //-------------------------------------------------------------- createLockRootIfNotExists(); try { lockId = zk.create(LOCK_ROOT + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("thread " + Thread.currentThread().getName() + " create the lock node: " + lockId + ", trying to get lock now"); //-------------------------------------------------------------- // 获得锁根节点下的各锁子节点,并排序 //-------------------------------------------------------------- List<String> nodes = zk.getChildren(LOCK_ROOT, true); SortedSet<String> sortedNode = new TreeSet<String>(); for (String node : nodes) { sortedNode.add(LOCK_ROOT + "/" + node); } String first = sortedNode.first(); SortedSet<String> lessThanMe = sortedNode.headSet(lockId); //-------------------------------------------------------------- // 检查是否有比当前锁节点lockId更小的节点,若有则监控当前节点的前一节点 //-------------------------------------------------------------- if (lockId.equals(first)) { System.out.println("thread " + Thread.currentThread().getName() + " has get the lock, lockId is " + lockId); return true; } else if (!lessThanMe.isEmpty()) { String prevLockId = lessThanMe.last(); zk.exists(prevLockId, new LockWatcher()); //-------------------------------------------------------------- // 阀门等待sessionTimeout的时间 // 当等待sessionTimeout的时间过后,上一个lock的Zookeeper连接会过期,删除所有临时节点,触发监听器 //-------------------------------------------------------------- latch.await(sessionTimeout, TimeUnit.MILLISECONDS); System.out.println("thread " + Thread.currentThread().getName() + " has get the lock, lockId is " + lockId); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return true; } public synchronized boolean unlock() { //-------------------------------------------------------------- // 删除lockId节点以释放锁 //-------------------------------------------------------------- try { System.out.println("thread " + Thread.currentThread().getName() + " unlock the lock: " + lockId + ", the node: " + lockId + " had been deleted"); zk.delete(lockId, -1); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } finally { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } return false; } /** * 保证锁根节点存在,若不存在则创建它 */ public void createLockRootIfNotExists() { try { Stat stat = zk.exists(LOCK_ROOT, false); if (stat == null) { zk.create(LOCK_ROOT, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { final CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { DistributedLock lock = null; try { lock = new DistributedLock(); latch.countDown(); latch.await(); lock.lock(); Thread.sleep(RandomUtils.nextInt(200, 500)); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (lock != null) { lock.unlock(); } } } }).start(); } } }
测试
运行main方法,本机器的某次输出结果thread Thread-8 create the lock node: /LOCKS/0000000090, trying to get lock now thread Thread-9 create the lock node: /LOCKS/0000000092, trying to get lock now thread Thread-3 create the lock node: /LOCKS/0000000093, trying to get lock now thread Thread-2 create the lock node: /LOCKS/0000000091, trying to get lock now thread Thread-5 create the lock node: /LOCKS/0000000094, trying to get lock now thread Thread-7 create the lock node: /LOCKS/0000000095, trying to get lock now thread Thread-1 create the lock node: /LOCKS/0000000098, trying to get lock now thread Thread-6 create the lock node: /LOCKS/0000000096, trying to get lock now thread Thread-4 create the lock node: /LOCKS/0000000097, trying to get lock now thread Thread-0 create the lock node: /LOCKS/0000000099, trying to get lock now thread Thread-8 has get the lock, lockId is /LOCKS/0000000090 thread Thread-8 unlock the lock: /LOCKS/0000000090, the node: /LOCKS/0000000090 had been deleted thread Thread-2 has get the lock, lockId is /LOCKS/0000000091 thread Thread-2 unlock the lock: /LOCKS/0000000091, the node: /LOCKS/0000000091 had been deleted thread Thread-9 has get the lock, lockId is /LOCKS/0000000092 thread Thread-9 unlock the lock: /LOCKS/0000000092, the node: /LOCKS/0000000092 had been deleted thread Thread-3 has get the lock, lockId is /LOCKS/0000000093 thread Thread-3 unlock the lock: /LOCKS/0000000093, the node: /LOCKS/0000000093 had been deleted thread Thread-5 has get the lock, lockId is /LOCKS/0000000094 thread Thread-5 unlock the lock: /LOCKS/0000000094, the node: /LOCKS/0000000094 had been deleted thread Thread-7 has get the lock, lockId is /LOCKS/0000000095 thread Thread-7 unlock the lock: /LOCKS/0000000095, the node: /LOCKS/0000000095 had been deleted thread Thread-6 has get the lock, lockId is /LOCKS/0000000096 thread Thread-6 unlock the lock: /LOCKS/0000000096, the node: /LOCKS/0000000096 had been deleted thread Thread-4 has get the lock, lockId is /LOCKS/0000000097 thread Thread-4 unlock the lock: /LOCKS/0000000097, the node: /LOCKS/0000000097 had been deleted thread Thread-1 has get the lock, lockId is /LOCKS/0000000098 thread Thread-1 unlock the lock: /LOCKS/0000000098, the node: /LOCKS/0000000098 had been deleted thread Thread-0 has get the lock, lockId is /LOCKS/0000000099 thread Thread-0 unlock the lock: /LOCKS/0000000099, the node: /LOCKS/0000000099 had been deleted
注:测试的结果证明了在同一JVM下,锁是正确的。由于本文锁的实现使用的是Zookeeper的监听机制,理论上在分布式系统下获取锁的顺序也是序列的顺序。但真正验证结果的正确性须在分布式系统下测试需同时运行几个JVM实例,在同一时刻去获取锁,再看输出结果。
参考文章:
http://zookeeper.apache.org/doc/r3.4.9/recipes.htmlhttp://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
本文为原创,转载请注明出处http://blog.csdn.net/massivestars/article/details/53771532
相关文章推荐
- 使用zookeeper实现分布式锁
- java使用zookeeper实现的分布式锁示例
- 使用ZooKeeper实现Java跨JVM的分布式锁
- 使用ZooKeeper实现Java跨JVM的分布式锁
- 使用ZooKeeper实现Java跨JVM的分布式锁
- [分布式] 使用 Dubbo、ZooKeeper 实现集群负载均衡
- 使用java反射和zookeeper实现的分布式服务
- 使用zookeeper实现分布式master选举(c 接口版本)
- 使用zookeeper实现分布式锁
- 使用ZooKeeper实现Java跨JVM的分布式锁
- java使用zookeeper实现的分布式锁示例
- 使用 ZooKeeper 实现分布式锁
- 使用ZooKeeper实现Java跨JVM的分布式锁
- 使用redis和zookeeper实现分布式锁
- 使用zookeeper实现分布式锁
- zookeeper 实现分布式锁zookeeper 使用 Curator 示例监听、分布式锁
- 使用Zookeeper实现分布式锁
- 使用redis和zookeeper实现分布式锁
- 使用zookeeper实现分布式锁
- 使用zookeeper实现分布式master选举(c 接口版本)