zookeeper学习
2015-09-06 00:02
525 查看
在去年准备面试的时候,有一个问题被问到过两次:如何实现一个跨进程、跨主机的分布式锁。这个问题我当时有查过,一种是通过Redis,另一种是通过zookeeper,因为当时还没有学习java,所以没有研究过zookeeper。
但是上周因为项目需要,开始学习kafka——一种消息中间件,里面有用到zookeeper,所以开始了解这个。
从解决上面的问题开始:如何通过zookeeper实现一个分布式锁,贴代码,引用自学良的博客
下面是测试代码,这部分是自己写的:
上面的测试代码很简单,流程如下:
1. 创建一个分布式锁;
2. 加锁,在一个文件中写入0~9的数,需要注意的是,每次打开文件只写入一个数字,之后关闭文件;
3. 解锁;
同时启动两个实例执行上面的测试代码,最后文件中的内容是:01234567890123456789。
再看对比测试,把测试代码中解锁和解锁部分的代码注释,也就是
很明显,分布式的锁起作用了,不加锁时两个实例的循环可以被打断,所以输出会比较混乱;而加锁后,只有一个实例执行完以后另一个实例开始执行,输出就比较规律。
zookeeper会维护一个具有层次关系的数据结构,比较类似一个文件系统,如图所示:
Zookeeper 这种数据结构有如下这些特点:
- 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
- znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
- znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
- znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
- znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
- znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,不止是跨进程的锁是基于这个特性实现的,像统一配置管理、集群Leader选举、同步队列等都是利用它实现的。
讲完这些理论,说说在linux上zookeeper安装,能够百度到,只说流程:
1. 安装JDK(),如果不装,运行服务端的时候不会有任何失败提示,但是客户端会连接不上;
2. 解压,写配置文件;
3. 运行服务端、客户端;
但是上周因为项目需要,开始学习kafka——一种消息中间件,里面有用到zookeeper,所以开始了解这个。
从解决上面的问题开始:如何通过zookeeper实现一个分布式锁,贴代码,引用自学良的博客
package zk.lock; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2182","test"); lock.lock(); //do something... } catch (Exception e) { e.printStackTrace(); } finally { if(lock != null) lock.unlock(); } * @author xueliang * */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//竞争资源的标志 private String waitNode;//等待前一个锁 private String myZnode;//当前锁 private CountDownLatch latch;//计数器 private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>(); /** * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 * @param config 127.0.0.1:2181 * @param lockName 竞争资源标志,lockName中不能包含单词lock */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 创建一个与服务器的连接 try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 创建根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //创建临时子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出所有子节点 List<String> subNodes = zk.getChildren(root, false); //取出所有lockName的锁 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 return true; } //如果不是最小的节点,找到比自己小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
下面是测试代码,这部分是自己写的:
/** * Created by cheng on 2015/9/5. */ import org.junit.Test; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.Writer; import zk.lock.DistributedLock; public class DistributedLockTest { static final String FILE_PATH = "D:\\testDistributedLock.txt"; @Test public void testDistributedLock() throws Exception{ DistributedLock lock = new DistributedLock("192.168.10.108:2181","filelock"); try { //加锁的代码 lock.lock(); for(int i=0;i<10;i++){ writeIntToFile(i); Thread.sleep(1000); } } finally { //解锁的代码 lock.unlock(); } } /** * 在文件中写入一个数字 * @param i 需要在文件中写入的数字 */ private void writeIntToFile(int i){ File file = new File(FILE_PATH); Writer writer = null; try{ writer = new FileWriter(file,true); writer.write(i+"\n"); } catch (Exception e){ System.out.println(e); } finally { if(writer!=null){ try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
上面的测试代码很简单,流程如下:
1. 创建一个分布式锁;
2. 加锁,在一个文件中写入0~9的数,需要注意的是,每次打开文件只写入一个数字,之后关闭文件;
3. 解锁;
同时启动两个实例执行上面的测试代码,最后文件中的内容是:01234567890123456789。
再看对比测试,把测试代码中解锁和解锁部分的代码注释,也就是
lock.lock()和
lock.unlock()这两句,同样是两个实例,得到的结果是:01021324354657687989
很明显,分布式的锁起作用了,不加锁时两个实例的循环可以被打断,所以输出会比较混乱;而加锁后,只有一个实例执行完以后另一个实例开始执行,输出就比较规律。
zookeeper会维护一个具有层次关系的数据结构,比较类似一个文件系统,如图所示:
Zookeeper 这种数据结构有如下这些特点:
- 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
- znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
- znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
- znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
- znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
- znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,不止是跨进程的锁是基于这个特性实现的,像统一配置管理、集群Leader选举、同步队列等都是利用它实现的。
讲完这些理论,说说在linux上zookeeper安装,能够百度到,只说流程:
1. 安装JDK(),如果不装,运行服务端的时候不会有任何失败提示,但是客户端会连接不上;
2. 解压,写配置文件;
3. 运行服务端、客户端;
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树
- [原创]java局域网聊天系统