您的位置:首页 > 其它

ZooKeeper源码(3)server包非核心类

2017-10-05 19:49 363 查看

这个是最主要的包,内容很多,可能会分好几次来写

admin包



JettyAdminServer和DummyAdminServer是AdminServer接口的实现类。其中DummyAdmin是空的实现,主要还是看JettyAdminServer的实现。AdminServerFactory是一个工厂方法,一般就是直接返回JettyAdminServer,也不用看了。其中要把ZooKeeperServer加入重点关注。



这个包是3.5新增的,用于监控状态的,不是核心组件,不看了。

auth包:各自验证授权,也不看,因为看不懂。。。

command包:



四字指令,AbstractFourLetterCommand是各种四字指令的基类,用到模板方法模式,并且在FourLetterCommands做一个集中管理 ,稍微看一下。

其实这种模式的最好读了,看一个等于看一堆

它告诉我要关注ServerCnxn,ServerCnxnFactory,ZooKeeperServer这3个类

public abstract class AbstractFourLetterCommand {
private static final Logger LOG = LoggerFactory
.getLogger(AbstractFourLetterCommand.class);
protected static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests";
protected PrintWriter pw;
protected ServerCnxn serverCnxn;
protected ZooKeeperServer zkServer;
protected ServerCnxnFactory factory;

public AbstractFourLetterCommand(PrintWriter pw, ServerCnxn serverCnxn) {
this.pw = pw;
this.serverCnxn = serverCnxn;
}

public void start() {
run();
}

public void run() {
try {
//调用子类的commandRun()
commandRun();
} catch (IOException ie) {
LOG.error("Error in running command ", ie);
} finally {
serverCnxn.cleanupWriterSocket(pw);
}
}

public void setZkServer(ZooKeeperServer zkServer) {
this.zkServer = zkServer;
}

/**
* @return true if the server is running, false otherwise.
*/
boolean isZKServerRunning() {
return zkServer != null && zkServer.isRunning();
}

public void setFactory(ServerCnxnFactory factory) {
this.factory = factory;
}

public abstract void commandRun() throws IOException;
}

//某一个子类
public class StatCommand extends AbstractFourLetterCommand {
private static final Logger LOG = LoggerFactory
.getLogger(AbstractFourLetterCommand.class);
private int len;
public StatCommand(PrintWriter pw, ServerCnxn serverCnxn, int len) {
super(pw, serverCnxn);
this.len = len;
}

@Override
public void commandRun() {
if (!isZKServerRunning()) {
pw.println(ZK_NOT_SERVING);
} else {
pw.print("Zookeeper version: ");
pw.println(Version.getFullVersion());
if (zkServer instanceof ReadOnlyZooKeeperServer) {
pw.println("READ-ONLY mode; serving only read-only clients");
}
if (len == FourLetterCommands.statCmd) {
LOG.info("Stat command output");
pw.println("Clients:");
for(ServerCnxn c : factory.getConnections()){
c.dumpConnectionInfo(pw, true);
pw.println();
}
pw.println();
}
pw.print(zkServer.serverStats().toString());
pw.print("Node count: ");
pw.println(zkServer.getZKDatabase().getNodeCount());
}
}
}




persist包



看名字和内容是写文件的,有一个类的注释写的很好,来看一下。

* The format of a Transactional log is as follows:
* <blockquote><pre>
* LogFile:
*     FileHeader TxnList ZeroPad
*
* FileHeader: {
*     magic 4bytes (ZKLG)
*     version 4bytes
*     dbid 8bytes
*   }
*
* TxnList:
*     Txn || Txn TxnList
*
* Txn:
*     checksum Txnlen TxnHeader Record 0x42
*
* checksum: 8bytes Adler32 is currently used
*   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
*     len 4bytes
*
* TxnHeader: {
*     sessionid 8bytes
*     cxid 4bytes
*     zxid 8bytes
*     time 8bytes
*     type 4bytes
*   }
*
* Record:
*     See Jute definition file for details on the various record types
*
* ZeroPad:
*     0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>




protected void serialize(DataTree dt,Map<Long, Integer> sessions,
OutputArchive oa, FileHeader header) throws IOException {
// this is really a programmatic error and not something that can
// happen at runtime
if(header==null)
throw new IllegalStateException(
"Snapshot's not open for writing: uninitialized header");
header.serialize(oa, "fileheader");
SerializeUtils.serializeSnapshot(dt,oa,sessions);
}


通过源码发现,完成工作的是SerializeUtils

Util类



ConfigUtil中有,创建服务器的,记录一下

QuorumPeer.QuorumServer qs;
try {
qs = new QuorumPeer.QuorumServer(-1, value);
} catch (ConfigException e) {
e.printStackTrace();
continue;
}


SerializeUtils

public static void deserializeSnapshot(DataTree dt,InputArchive ia,
Map<Long, Integer> sessions) throws IOException {
int count = ia.readInt("count");
while (count > 0) {
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id
+ " with timeout: " + to);
}
count--;
}
dt.deserialize(ia, "tree");
}

public static void serializeSnapshot(DataTree dt,OutputArchive oa,
Map<Long, Integer> sessions) throws IOException {
HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
oa.writeInt(sessSnap.size(), "count");
for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
oa.writeLong(entry.getKey().longValue(), "id");
oa.writeInt(entry.getValue().intValue(), "timeout");
}
dt.serialize(oa, "tree");
}


可以看到,有dt.serialize(oa, “tree”);委派给了DataTree自己序列化。

其他类比较扯,不看了

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: