您的位置:首页 > 其它

ZooKeeper源码阅读(二):客户端

2013-07-23 12:43 375 查看
源代码:
http://svn.apache.org/repos/asf/zookeeper/trunk/
导入eclipse:

在包含build.xml目录下执行ant eclipse将产生.classpath文件

目录结构:

src/recipes:提供了各种Zookeeper应用例子

src/c:提供了c版客户端。zookeeper_st,zookeeper_mt两个library

src/contrib:别人贡献的代码?

src/generated:由jute生成的java实体类

客户端入口:org.apache.zookeeper.ZooKeeperMain

//读取命令行输入,用MyCommandOptions解析。

//内部类MyCommandOptions包含成员命令名command、参数列表cmdArgs

-option value –option value command cmdArgs

//根据以上解析的ip、端口,连接到ZooKeeper

zk = newZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
newMyWatcher(), readOnly);

//执行命令,在ZooKeeperMain.run()

//ZooKeeperMain只是一个外壳,使用jline实现了命令提示功能。

//commandMapCli将提供的命令命令名与执行体CliCommand关联

//execute from commandMap
CliCommandcliCmd = commandMapCli.get(cmd);
if(cliCmd!=null) {
cliCmd.setZk(zk);
watch =cliCmd.parse(args).exec();
}

//最终转到调用ZooKeeper方法

//提供的命令:

quit:Zk.close()关闭zk连接,调用cnxn.close()

history:列出历史记录

redo index:重新执行历史记录

printwatches [on]:查看/设置watche开关状态

connect:connectToZK(host)连接zk

//ZooKeeper内部连接

cnxn = newClientCnxn(connectStringParser.getChrootPath(),
hostProvider,sessionTimeout,this,watchManager,
getClientCnxnSocket(),canBeReadOnly);
cnxn.start();

ClientCnxn包含SendThread和EventThread两个线程

SendThread将事件添加到waitEvents队列中,EventThread线程消费该队列。

//下面以ls命令为例

//调用zk.getChildren

public
boolean
exec() throwsKeeperException, InterruptedException {
String path= args[1];
boolean watch =cl.hasOption("w");
List<String> children = zk.getChildren(path, watch);
out.println(children);
return watch;
}

//getChildren生成request

RequestHeader h = newRequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = newGetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = newGetChildrenResponse();
ReplyHeader r =
cnxn.submitRequest(h, request,response, wcb);

//submitRequest调用queuePacket,阻塞直到收到response。

publicReplyHeadersubmitRequest(RequestHeaderh, Record request,
Recordresponse, WatchRegistration watchRegistration)
throwsInterruptedException {
ReplyHeaderr = new ReplyHeader();
Packetpacket = queuePacket(h,r, request, response,null,null,null,
null, watchRegistration);
synchronized(packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}

//queuePacket将Packet添加到outgoingQueue队列中

packet= new Packet(h, r, request, response,watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath =clientPath;
packet.serverPath =serverPath;

outgoingQueue.add(packet);

//然后唤醒selector
sendThread.getClientCnxnSocket().wakeupCnxn();

//sendThread.run消费outgoingQueue

clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);

//selector判断读/写事件

//doTransport调用doIO,doIO解析Response

//读事件

int rc =sock.read(incomingBuffer);
sendThread.readResponse(incomingBuffer);
//写事件

sock.write(p.bb);

//readResponse

//收到的package包括header,token, response字段

// 当header中xid是-1时,收到的notification,response字段解析得到WatchedEvent

// event将放入eventThread事件队列watingEvents

WatcherEventevent = new WatcherEvent();
event.deserialize(bbia, "response");



eventThread.queueEvent(we );

//在finally块中调用finishPacket

try {
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if(replyHdr.getZxid() > 0) {
lastZxid =replyHdr.getZxid();
}
if(packet.response !=
null&& replyHdr.getErr() == 0) {
packet.response.deserialize(bbia,
"response");
}
} finally {
finishPacket(packet);
}

//finishPacket

//如果有回调,finishPacket也将packet放到eventThread的队列中

//否则设置packet.finish,此时submitRequest返回response。

if (p.cb ==
null) {
synchronized (p) {
p.finished =
true
;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}

//EventThread.run线程消费waintingEvents

Objectevent = waitingEvents.take();
processEvent(event);

以下图片转自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: