您的位置:首页 > 编程语言 > Java开发

zookeeper学习

2015-09-06 00:02 525 查看
在去年准备面试的时候,有一个问题被问到过两次:如何实现一个跨进程、跨主机的分布式锁。这个问题我当时有查过,一种是通过Redis,另一种是通过zookeeper,因为当时还没有学习java,所以没有研究过zookeeper。

但是上周因为项目需要,开始学习kafka——一种消息中间件,里面有用到zookeeper,所以开始了解这个。

从解决上面的问题开始:如何通过zookeeper实现一个分布式锁,贴代码,引用自学良的博客

package zk.lock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
DistributedLock lock = null;
try {
lock = new DistributedLock("127.0.0.1:2182","test");
lock.lock();
//do something...
} catch (Exception e) {
e.printStackTrace();
}
finally {
if(lock != null)
lock.unlock();
}
* @author xueliang
*
*/
public class DistributedLock implements Lock, Watcher{
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//竞争资源的标志
private String waitNode;//等待前一个锁
private String myZnode;//当前锁
private CountDownLatch latch;//计数器
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();

/**
* 创建分布式锁,使用前请确认config配置的zookeeper服务可用
* @param config 127.0.0.1:2181
* @param lockName 竞争资源标志,lockName中不能包含单词lock
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
// 创建一个与服务器的连接
try {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
// 创建根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
exception.add(e);
} catch (KeeperException e) {
exception.add(e);
} catch (InterruptedException e) {
exception.add(e);
}
}

/**
* zookeeper节点的监视器
*/
public void process(WatchedEvent event) {
if(this.latch != null) {
this.latch.countDown();
}
}

public void lock() {
if(exception.size() > 0){
throw new LockException(exception.get(0));
}
try {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
return;
}
else{
waitForLock(waitNode, sessionTimeout);//等待锁
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public boolean tryLock() {
try {
String splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName can not contains \\u000B");
//创建临时子节点
myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " is created ");
//取出所有子节点
List<String> subNodes = zk.getChildren(root, false);
//取出所有lockName的锁
List<String> lockObjNodes = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}

public boolean tryLock(long time, TimeUnit unit) {
try {
if(this.tryLock()){
return true;
}
return waitForLock(waitNode,time);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
System.out.println("unlock " + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public void lockInterruptibly() throws InterruptedException {
this.lock();
}

public Condition newCondition() {
return null;
}

public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}

}


下面是测试代码,这部分是自己写的:

/**
* Created by cheng on 2015/9/5.
*/
import org.junit.Test;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;

import zk.lock.DistributedLock;
public class DistributedLockTest {
static final String FILE_PATH = "D:\\testDistributedLock.txt";
@Test
public void testDistributedLock() throws Exception{
DistributedLock lock = new DistributedLock("192.168.10.108:2181","filelock");
try {
//加锁的代码
lock.lock();
for(int i=0;i<10;i++){
writeIntToFile(i);
Thread.sleep(1000);
}
}
finally {
//解锁的代码
lock.unlock();
}

}
/**
* 在文件中写入一个数字
* @param i 需要在文件中写入的数字
*/
private void writeIntToFile(int i){
File file = new File(FILE_PATH);
Writer writer = null;
try{
writer = new FileWriter(file,true);
writer.write(i+"\n");
}
catch (Exception e){
System.out.println(e);
}
finally {
if(writer!=null){
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

}


上面的测试代码很简单,流程如下:

1. 创建一个分布式锁;

2. 加锁,在一个文件中写入0~9的数,需要注意的是,每次打开文件只写入一个数字,之后关闭文件;

3. 解锁;

同时启动两个实例执行上面的测试代码,最后文件中的内容是:01234567890123456789。

再看对比测试,把测试代码中解锁和解锁部分的代码注释,也就是
lock.lock()
lock.unlock()
这两句,同样是两个实例,得到的结果是:01021324354657687989

很明显,分布式的锁起作用了,不加锁时两个实例的循环可以被打断,所以输出会比较混乱;而加锁后,只有一个实例执行完以后另一个实例开始执行,输出就比较规律。

zookeeper会维护一个具有层次关系的数据结构,比较类似一个文件系统,如图所示:



Zookeeper 这种数据结构有如下这些特点:

- 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1

- znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录

- znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据

- znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了

- znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2

- znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,不止是跨进程的锁是基于这个特性实现的,像统一配置管理、集群Leader选举、同步队列等都是利用它实现的。

讲完这些理论,说说在linux上zookeeper安装,能够百度到,只说流程:

1. 安装JDK(),如果不装,运行服务端的时候不会有任何失败提示,但是客户端会连接不上;

2. 解压,写配置文件;

3. 运行服务端、客户端;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息