zkClient的并发控制
2014-01-13 20:29
369 查看
1.ZkConnection实现IZkConnection,是org.apache.zookeeper.ZooKeeper的代理类,通过ReentrantLock实现connect和close的并发控制。因为connect方法和close方法对成员变量_zk进行了赋值和判空操作,所有要进行并发控制。代码如下:
@Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } } public void close() throws InterruptedException { _zookeeperLock.lock(); try { if (_zk != null) { LOG.debug("Closing ZooKeeper connected to " + _servers); _zk.close(); _zk = null; } } finally { _zookeeperLock.unlock(); } }
2.org.I0Itec.zkclient.ZkLock继承jdk自旋锁ReentrantLock,内部定义三个java.util.concurrent.locks.Condition的成员变量,分别实现节点数据改变,节点状态改变和节点改变三种condition,java的condition介绍可以参考 http://blog.sina.com.cn/s/blog_87fc744801018q7l.html,在ZkClient里,包含一个ZkLock的成员变量,用于实现zkClient的并发控制,比如zkClient的waitUntilExists方法:
public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) throws ZkInterruptedException { Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); LOG.debug("Waiting until znode '" + path + "' becomes available."); if (exists(path)) { return true; } acquireEventLock(); try { while (!exists(path, true)) { boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout); if (!gotSignal) { return false; } } return true; } catch (InterruptedException e) { throw new ZkInterruptedException(e); } finally { getEventLock().unlock(); } }
如果path未创建成功,zkNodeEventCondition会wait一段时间,然后process方法负责释放condition,如下:
public void process(WatchedEvent event) { LOG.debug("Received event: " + event); _zookeeperEventThread = Thread.currentThread(); boolean stateChanged = event.getPath() == null; boolean znodeChanged = event.getPath() != null; boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeChildrenChanged; getEventLock().lock(); try { // We might have to install child change event listener if a new node was created if (getShutdownTrigger()) { LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered"); return; } if (stateChanged) { processStateChanged(event); } if (dataChanged) { processDataOrChildChange(event); } } finally { if (stateChanged) { getEventLock().getStateChangedCondition().signalAll(); // If the session expired we have to signal all conditions, because watches might have been removed and // there is no guarantee that those // conditions will be signaled at all after an Expired event // TODO PVo write a test for this if (event.getState() == KeeperState.Expired) { getEventLock().getZNodeEventCondition().signalAll(); getEventLock().getDataChangedCondition().signalAll(); // We also have to notify all listeners that something might have changed fireAllEvents(); } } if (znodeChanged) { getEventLock().getZNodeEventCondition().signalAll(); } if (dataChanged) { getEventLock().getDataChangedCondition().signalAll(); } getEventLock().unlock(); LOG.debug("Leaving process event"); } }
阅读更多
相关文章推荐
- Spring控制任务并发
- JDBC高级特性(二)事务、并发控制和行集
- 2017.4.12 开涛shiro教程-第十八章-并发登录人数控制
- 对apache中并发控制参数prefork理解和调优
- SQLite剖析之锁和并发控制
- PHP多进程并发控制的测试用例
- 数据库并发事务控制 一:综述
- 并发控制
- linux设备驱动之并发控制
- 对高并发流量控制的一点思考 推荐
- Linux内核开发之并发控制(五)
- Android深度探索:HAL与驱动开发学习笔记--并发控制之顺序锁
- 并发控制
- EF+MySQL乐观锁控制电商并发下单扣减库存,在高并发下的问题
- 事务——恢复和并发控制的基本单位
- 八张图教你彻底理解数据库并发控制之隔离级别(下)
- 数据库 并发控制 常见锁
- 数据库并发控制及SQL Server的并发控制机制
- LINQ体验(10)——LINQ to SQL语句之开放式并发控制和事务
- EF+SQLSERVER控制并发下抢红包减余额(改进)