您的位置:首页 > 其它

zookeeper实现分布式锁

2017-12-19 15:20 344 查看
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>

private static final String ROOT_LOCKS="/LOCKS";//根节点
private static int number = 10;
private static Random random=new Random();
public static void main(String[] args) throws InterruptedException {
sharedLock();
//testReadWriterLock();
}
private static int getNumber() throws InterruptedException {
//TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
return number--;
}

/**
* 全局同步分布式锁, 同一时间多台机器只有一台能获得同一把锁.
* InterProcessMutex、InterProcessSemaphoreMutex
* @throws InterruptedException
*/
private static void sharedLock() throws InterruptedException {
InterProcessMutex interProcessMutex = new InterProcessMutex(CuratorClientUtils.getCuratorFramework(), ROOT_LOCKS);
final CountDownLatch countdown = new CountDownLatch(1);

for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
countdown.await(50,TimeUnit.MILLISECONDS);
//加锁
interProcessMutex.acquire();
//-------------业务处理开始
int number = getNumber();
System.out.println(number);
//-------------业务处理结束
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放
System.out.println(Thread.currentThread().getName() + "  是否获取锁 " + interProcessMutex.isAcquiredInThisProcess());
if(interProcessMutex.isAcquiredInThisProcess()) {//判断是否持有锁 进而进行锁是否释放的操作
interProcessMutex.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();
}
Thread.sleep(100);
countdown.countDown();
}

public class CuratorClientUtils {
private static String ZK_URL = "192.168.1.45:2181,192.168.1.46:2181,192.168.1.41:2181";
private static CuratorFramework curatorFramework = null;
public static CuratorFramework getCuratorFramework(){
if (curatorFramework == null){
curatorFramework = CuratorFrameworkFactory.newClient(ZK_URL,
5000,5000, new ExponentialBackoffRetry(1000,5));
}
curatorFramework.start();
return curatorFramework;
}
}

利用临时顺序节点实现共享锁的一般做法

首先介绍一下,Zookeeper中有一种节点叫做顺序节点,故名思议,假如我们在/lock/目录下创建节3个点,ZooKeeper集群会按照提起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。

ZooKeeper中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZooKeeper集群断开连接,则开节点自动被删除。

利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:

客户端调用create()方法创建名为“locknode/”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。

客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。

客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。

如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>

public class ZookeeperClient {
private static String ZK_URL = "192.168.1.45:2181,192.168.1.46:2181,192.168.1.41:2181";
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
private static int sessionTimeout=6000;
private ZookeeperClient(){}
private static class MyZookeeperClient{
public static ZooKeeper getZooKeeper(){
try {
if (zooKeeper==null){
zooKeeper = new ZooKeeper(ZK_URL, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
}
});
countDownLatch.await();
}
}catch (Exception e){
e.printStackTrace();
}
return zooKeeper;
}
}
public static ZooKeeper getInstance(){
return MyZookeeperClient.getZooKeeper();
}

/**
* Getter method for property <tt>sessionTimeout</tt>
*
* @return property value of sessionTimeout
*/
public static int getSessionTimeout() {
return sessionTimeout;
}

}

public class LockWatcher implements Watcher {
private CountDownLatch countDownLatch;
public LockWatcher(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted){
countDownLatch.countDown();
}
}
}

public class DistributeLock {
private ZooKeeper zooKeeper;
private static final String ROOT_LOCKS="/LOCKS";//根节点
private final static byte[] data={1,2,3}; //节点的数据
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String lockId;
public DistributeLock(){
this.zooKeeper = ZookeeperClient.getInstance();
}
public boolean getLock(){
try {
//创建临时有序节点
lockId = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"->成功创建了lock节点["+lockId+"], 开始去竞争锁");
//获取根节点所有子节点
List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
TreeSet<String> orderNodes = new TreeSet<String>();
for (String childrenNode : childrenNodes) {
orderNodes.add(ROOT_LOCKS+"/"+childrenNode);
}
//取出最小的节点
String first = orderNodes.first();
//对比,如果跟lockId 相等,当前对象拿到锁
if (lockId.equals(first)){
System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockId+"]");
return true;
}
SortedSet<String> headSet = orderNodes.headSet(lockId);
if (!CollectionUtils.isEmpty(headSet)){
//获取当前节点的  前面的节点
String last = headSet.last();
//注册删除事件
zooKeeper.exists(last,new LockWatcher(countDownLatch));
countDownLatch.await(ZookeeperClient.getSessionTimeout(), TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockId+"]");
return true;
}

} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean unlock(){
System.out.println(Thread.currentThread().getName()+"->开始释放锁:["+lockId+"]");
try {
zooKeeper.delete(lockId,-1);
System.out.println("节点["+lockId+"]成功被删除");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
}


测试类:

public class DistributeLockTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Random random=new Random();
for(int i=0;i<10;i++){
new Thread(()->{
DistributeLock lock=null;
try {
lock=new DistributeLock();
boolean lock1 = lock.getLock();
System.out.println("*****************"+lock1);
Thread.sleep(random.nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock!=null){
lock.unlock();
}
}
}).start();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: