您的位置:首页 > 其它

zkClient的并发控制

2014-01-13 20:29 369 查看

1.ZkConnection实现IZkConnection,是org.apache.zookeeper.ZooKeeper的代理类,通过ReentrantLock实现connect和close的并发控制。因为connect方法和close方法对成员变量_zk进行了赋值和判空操作,所有要进行并发控制。代码如下:

@Override
public void connect(Watcher watcher) {
_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
_zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + _servers, e);
}
} finally {
_zookeeperLock.unlock();
}
}

public void close() throws InterruptedException {
_zookeeperLock.lock();
try {
if (_zk != null) {
LOG.debug("Closing ZooKeeper connected to " + _servers);
_zk.close();
_zk = null;
}
} finally {
_zookeeperLock.unlock();
}
}

2.org.I0Itec.zkclient.ZkLock继承jdk自旋锁ReentrantLock,内部定义三个java.util.concurrent.locks.Condition的成员变量,分别实现节点数据改变,节点状态改变和节点改变三种condition,java的condition介绍可以参考 http://blog.sina.com.cn/s/blog_87fc744801018q7l.html,在ZkClient里,包含一个ZkLock的成员变量,用于实现zkClient的并发控制,比如zkClient的waitUntilExists方法:

public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) throws ZkInterruptedException {
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
LOG.debug("Waiting until znode '" + path + "' becomes available.");
if (exists(path)) {
return true;
}
acquireEventLock();
try {
while (!exists(path, true)) {
boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout);
if (!gotSignal) {
return false;
}
}
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
}

如果path未创建成功,zkNodeEventCondition会wait一段时间,然后process方法负责释放condition,如下:

public void process(WatchedEvent event) {
LOG.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();

boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeChildrenChanged;

getEventLock().lock();
try {

// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();

// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
// TODO PVo write a test for this
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
LOG.debug("Leaving process event");
}
}


阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: