ZooKeeper源码(3)server包非核心类
2017-10-05 19:49
363 查看
这个是最主要的包,内容很多,可能会分好几次来写
admin包
JettyAdminServer和DummyAdminServer是AdminServer接口的实现类。其中DummyAdmin是空的实现,主要还是看JettyAdminServer的实现。AdminServerFactory是一个工厂方法,一般就是直接返回JettyAdminServer,也不用看了。其中要把ZooKeeperServer加入重点关注。
这个包是3.5新增的,用于监控状态的,不是核心组件,不看了。
auth包:各自验证授权,也不看,因为看不懂。。。
command包:
四字指令,AbstractFourLetterCommand是各种四字指令的基类,用到模板方法模式,并且在FourLetterCommands做一个集中管理 ,稍微看一下。
其实这种模式的最好读了,看一个等于看一堆
它告诉我要关注ServerCnxn,ServerCnxnFactory,ZooKeeperServer这3个类
public abstract class AbstractFourLetterCommand { private static final Logger LOG = LoggerFactory .getLogger(AbstractFourLetterCommand.class); protected static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests"; protected PrintWriter pw; protected ServerCnxn serverCnxn; protected ZooKeeperServer zkServer; protected ServerCnxnFactory factory; public AbstractFourLetterCommand(PrintWriter pw, ServerCnxn serverCnxn) { this.pw = pw; this.serverCnxn = serverCnxn; } public void start() { run(); } public void run() { try { //调用子类的commandRun() commandRun(); } catch (IOException ie) { LOG.error("Error in running command ", ie); } finally { serverCnxn.cleanupWriterSocket(pw); } } public void setZkServer(ZooKeeperServer zkServer) { this.zkServer = zkServer; } /** * @return true if the server is running, false otherwise. */ boolean isZKServerRunning() { return zkServer != null && zkServer.isRunning(); } public void setFactory(ServerCnxnFactory factory) { this.factory = factory; } public abstract void commandRun() throws IOException; } //某一个子类 public class StatCommand extends AbstractFourLetterCommand { private static final Logger LOG = LoggerFactory .getLogger(AbstractFourLetterCommand.class); private int len; public StatCommand(PrintWriter pw, ServerCnxn serverCnxn, int len) { super(pw, serverCnxn); this.len = len; } @Override public void commandRun() { if (!isZKServerRunning()) { pw.println(ZK_NOT_SERVING); } else { pw.print("Zookeeper version: "); pw.println(Version.getFullVersion()); if (zkServer instanceof ReadOnlyZooKeeperServer) { pw.println("READ-ONLY mode; serving only read-only clients"); } if (len == FourLetterCommands.statCmd) { LOG.info("Stat command output"); pw.println("Clients:"); for(ServerCnxn c : factory.getConnections()){ c.dumpConnectionInfo(pw, true); pw.println(); } pw.println(); } pw.print(zkServer.serverStats().toString()); pw.print("Node count: "); pw.println(zkServer.getZKDatabase().getNodeCount()); } } }
persist包
看名字和内容是写文件的,有一个类的注释写的很好,来看一下。
* The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote>
protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException { // this is really a programmatic error and not something that can // happen at runtime if(header==null) throw new IllegalStateException( "Snapshot's not open for writing: uninitialized header"); header.serialize(oa, "fileheader"); SerializeUtils.serializeSnapshot(dt,oa,sessions); }
通过源码发现,完成工作的是SerializeUtils
Util类
ConfigUtil中有,创建服务器的,记录一下
QuorumPeer.QuorumServer qs; try { qs = new QuorumPeer.QuorumServer(-1, value); } catch (ConfigException e) { e.printStackTrace(); continue; }
SerializeUtils
public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map<Long, Integer> sessions) throws IOException { int count = ia.readInt("count"); while (count > 0) { long id = ia.readLong("id"); int to = ia.readInt("timeout"); sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } dt.deserialize(ia, "tree"); } public static void serializeSnapshot(DataTree dt,OutputArchive oa, Map<Long, Integer> sessions) throws IOException { HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions); oa.writeInt(sessSnap.size(), "count"); for (Entry<Long, Integer> entry : sessSnap.entrySet()) { oa.writeLong(entry.getKey().longValue(), "id"); oa.writeInt(entry.getValue().intValue(), "timeout"); } dt.serialize(oa, "tree"); }
可以看到,有dt.serialize(oa, “tree”);委派给了DataTree自己序列化。
其他类比较扯,不看了
相关文章推荐
- ZooKeeper源码(4)server包核心:Election类
- 【Zookeeper】源码分析之网络通信(二)之NIOServerCnxn
- Ambari-server源码分析:核心类agent.rest.AgentResource
- Kafka源码深度解析-序列14 -Server核心组件之3-SocketServer与NIO- 1+N+M 模型
- Ambari-server源码分析:核心类-心跳处理agent.HeartBeatHandler
- zookeeper源码分析-server-util
- Kafka源码深度解析-序列13 -Server核心组件之2(续)- TimingWheel本质与DelayedOperationPurgatory核心结构
- 【Zookeeper】源码分析之服务器(五)之ObserverZooKeeperServer
- SPServer源码分析(四): 核心服务器类SP_Server分析
- Kafka源码深度解析-序列11 -Server核心组件之1-KafkaController选举过程/Failover与Resignation
- 【Zookeeper】源码分析之服务器(二)之ZooKeeperServer
- 【Zookeeper】源码分析之网络通信(三)之NettyServerCnxn
- Kafka源码深度解析-序列12 -Server核心组件之2-ReplicaManager核心数据结构与Replica同步原理
- 【Zookeeper】源码分析之服务器(四)之FollowerZooKeeperServer
- ZooKeeper服务端单机版 ZooKeeperServer初始化源码分析
- 【Zookeeper】源码分析之服务器(三)之LeaderZooKeeperServer
- ZooKeeper源码解析(四):client如何和server连接
- Zookeeper源码之ZookeeperServer
- Zookeeper源码分析之一Server启动
- Zookeeper源码解析之Server启动