您的位置:首页 > 其它

zookeeper源码学习(一)

2016-08-14 23:47 239 查看
最近在学习paxos和zookeeper,有收获也有很多不懂的地方,写些笔记,希望和对paxos感兴趣的同学一起讨论哈,不懂的问题还希望能得到大牛们的指点~~呵呵

学习过程中,受教于多位前辈的文章:

淘宝核心技术团队 : http://rdc.taobao.com/blog/cs/?p=162

CODEDUMP博客 : http://www.codedump.info/?p=224

xhh198781的专栏 : http://blog.csdn.net/xhh198781/article/details/6619203

 

Zookeeper的一致性的实现分为两个部分:leader选举,数据同步。

 

zookeeper集群的运行过程:

Zookeeper集群的基本运行过程是,server启动时首先从配置文件中读取数据,之后各个server发送自己的投票进行选举,选出leader之后,各个server接收/监听客户端请求,leader与followers同步数据,当数据同步完成后就可以回复客户端的请求。读请求可以由接受请求的server直接回复,写请求要发送到leader进行处理。当leader宕机时。Follower发现leader失效则发起新一轮的选举(是发现leader失效便进行选举还是租约超时时进行选举?),选出新的leader后,leader与followers同步数据……如此进行

 

zookeeper集群主要数据的含义:

Server启动时从磁盘中读取的数据:

Id  :server id,这个id唯一标识一台机器,存储在myid文件中,配置集群时手动写入

Zxid:标识了本机想要选举谁为leader,是本机目前所见到的最大的id值(在代码里面这个zxid是怎么增长的?)

投票时发送的消息包括四种信息:

Id    :server id,这个id唯一标识一台机器,存储在myid文件中,配置集群时手动写入

Zxid  :标识了本机想要选举谁为leader,是本机目前所见到的最大的id值

epoch:标识leader的变更,表示每个算法instance的一个round

state :本机的状态信息(包括looking,leading,following,observing)

 

zookeeper集群的leader选举:

Zookeeper用于选举leader有三个类:FastLeaderElection类,LeaderElection类,AuthFastLeaderElection类。默认使用FastLeaderElection类。FastLeaderElection选举算法用到的类是FastLeaderElection类,在org.apache.zookeeper.server.quorum包中。

 

FastLeaderElection类的主要属性成员:

FinalizeWait:这是一个server得知他完成/赢得了选举后需要等待的时间。在zookeeper中,只有收到了所有server的回复并且由这些回复信息可以选出leader,那么选举过程可以结束。如果仅收到了一个多数派的回复,即使这个多数派选出了一个leader,也要等待一段时间即FinalizeWait,如果在这段时间内没有新消息到来才可以确定选举结束。这么做是为了保证不会有哪个server因为启动较晚(在其他server完成了选举之后才启动)或者其他原因没有参与到选举中来而导致id更大的server没有成为leader。

MaxNotificationInterval:两个连续的通知检查之间的时间上限,默认60秒,它是指数退避算法的界限值(这个应该是paxoslease中用到的)

QuorumCnxManager对象manager:管理TCP连接,FastLeaderElection消息是使用TCP数据报的。

linkedBlockingQueue<ToSend> sendqueue : 存储tosend消息,本server要向其他server发送的消息

linkedBlockingQueue<Notification> recvqueue :存储notification消息,本server从其他server接收到的回复消息。这个消息队列是本地消息队列,经过WorkerReceiver线程处理后发过来的。不同于recvQueue队列,该队列是外部消息队列,接受所有外部发来的消息。

WorkerSender ws:发送消息线程类的对象,作为后台线程运行(Daemon)

WorkerReceiver wr:接收消息线程类的对象,作为后台线程运行(Daemon)

QuorumPeer self:

Logicalclock:我的理解是标识一个实例中的一个round

ProposedLeader:选举的leader的id

ProposedZxid:选举的leader的zxid      (这两个数据算作是持久化的数据吧?)

 

该类中主要的内部类:

1.表示数据结构的类:

Notification类:消息结构,当本server改变了所选举的leader时向其他server发送消息用的数据结构。该类包括了5个主要数据:

   Leader – 本server想要选举的leader的id(server id)

   Zxid  -- 本server想要选举的leader的zxid

   Epoch – 标识算法instance中的一个round

   State – 本server的当前状态(包括looking,leading,following,observing)

   Sid  – 发送方server的ip地址

ToSend类:消息结构,server传递消息的数据结构

   Leader – 本server想要选举的leader的id(server id)

   Zxid  -- 本server想要选举的leader的zxid

   Epoch – 标识算法instance中的一个round

   State – 本server的当前状态(包括looking,leading,following,observing)

   Sid  – 接收方server的ip地址

2.线程池Messenger类

Messenger类中有两种线程:

   WorkerReceiver线程类:接收线程,接收来自其他server的消息并处理这些消息,在选举过程没有停止的时候,接收线程就循环处理接收到的消息。处理消息的过程是:在外部消息队列recvQueue中取得一个消息,如果这个消息的发送端是个observer,那么立刻回复,将本server要选举的leader、当前状态、epoch信息发送过去。如果这个消息来自于一个participate,那么按本server的状态讨论,如果本server处于looking状态,那么将该消息放到本地接收队列中,同时判断发送方的状态,如果发送方也是looking状态并且其epoch值比较小,那么回复它一个消息告知本server的选举的leader信息。如果本server不是looking状态,而对方是looking状态的话,就回复一个消息告知本server所知道的leader信息。

WorkerSender线程类:发送线程,负责发送消息到其他server。发送消息的过程是:只要选举没有停止就从本地发送队列sendqueue中取一个消息并将它发送到指定的server。

 

发送线程:

逐个将sendqueue队列中的消息发出去。

 

接收线程:

在选举过程没有停止时,循环处理队列中的消息。

如果消息来自observer,立即回复,相应代码段:





如果消息来自另一个participate,进行判断:

若本server是looking状态,则将该消息放入到本地队列recvqueue中以备lookForLeader()方法使用。如果同时对方也是looking状态并且提案号比本server小,则回复一个消息告知本server选举的leader等信息。(如果本server和对方都是looking状态,但对方的提案号比本server的提案号大,这种情况会在选举线程中做处理,此时该消息已经放入了本地消息队列)。对应代码段:





若本server不是looking状态(leading状态或者following状态),对方是looking状态,则向它告知本节点目前所知的leader信息。(这种情况很可能是应为一个多数派已经通过一个leader,选举完成,但是由于一些原因有些server还不知道选举已经完成了,在选举线程lookforLeader()方法中的最有一个case语句处理的就是这种情况)。对应的代码段:





 

选举线程:

选举由LookForLeader ()方法完成。

两个局部变量:

Hashmap  recvset -- 用来存储本轮选举中从participate server接收到的合法消息。每当有更高编号的提案被发起时,recvset都被清空(旧消息是上一轮选举的信息,新轮开始时它们已经没有用了)。

Hashmap  outofelection -- (这个不理解哎)

选举过程:

1.

【我不是很理解zookeeper是怎么用的fast paxos,leader选举是没有leader要选leader,而fast paxos是需要leader在开始的阶段发送一个any message的。我的理解是这一步是fast
paxos中省略了prepare阶段后直接执行的P2(a)段,将本server所选的值和提案号一并发给所有其他的participate
server。这是站在proposer角度所做的动作 – 提案】

因为该线程是发起选举的线程,所以首先增加logicalclock(算法的提案编号),表示进入了下一轮选举。同时初始化/更新提案信息:本serverid和目前本server看到的最大的zxid。用这个两个数据来初始化/更新提案信息,集群启动时zxid都是0,各个server都选举自己做leader。向所有participate server发送这个提案消息(以Notifications消息的形式,因为此时leader改变了)。

对应的代码段:





2.选举没有结束时,循环处理本地消息队列recvqueue中的消息

如果本地消息队列recvqueue中已经没有消息,那么检查queueSendMap队列,如果其中没有消息,则重新向所有的participate server发送notification消息;如果其中还有未发送的消息,则这些剩余消息的目的server重新建立连接,并将消息发送给它们。使用指数退避算法更改notTimeout值,notTimeout是在消息队列中取消息没有取到时,距下次取所间隔/等待的时间。对应的代码段:





 

如果本地消息队列中有消息,则取出下一个消息,并按其发送端server的状态来处理:

1)发送端server是looking状态

  【这个地方我觉得是进行的P2(b)阶段,针对号和值的大小分别对其他participate的回复/提案消息进行处理。该是站在accepter角度的动作。sendNotifications的时候,更多的可能是发给learner的批准消息,这时候他把所有participate当成leaner而自己是accepter。】

如果发送端server是looking状态,先判断该消息的提案号是否大于本server的提案号,如果是,说明收到了有更新的选举被发起,则更新本server的提案编号为接收到的这个最新的提案号,同时清空记录合法消息的集合recvset,更新本server的提案信息并向所有participate server发送notification消息,告知本server提案信息的改变。对应的代码段:





 

如果消息发送端的提案号小于本server的提案号,则对该消息置之不理。

如果提案号相等,判断本server所选举的leader与消息该消息所选的leader编号的大小,如果消息中所选的leader编号更大,则更新本server的提案值,重新向所有节点发送notification消息。对应的代码段:





 

【这个时候,是站在learner的角度,来判断是否收到了足够的消息结束这次选举】

如果该消息是来自participate server的合法消息,将其放入到集合recvset中。处理完该消息之后,判断此时是否已经收到了所有participate server的消息,或者收到了一个多数派的消息。代码走到这里,如果已经接收到了所有participate server的消息,说明所有这些回复消息中没有比proposedLeader(本server所选举的leader)更大的提案了,那么说明proposedLeader是目前公认最大的,所以proposedLeader赢得了选举成为leader,本server根据选举结果设置自己的状态。对应的代码段:





 

如果没有收到所有participate的回复消息,但是本server选举的leader(proposedLeader)得到了一个多数派的认可,这个时候不是立刻认定proposedLeader赢得了选举,而是观察一段时间(finalizeWait),如果在这段时间内没有新的更高编号的提案到达,就可以确定proposedLeader赢得了选举。对应的代码段:





 

 

2)发送端server是observing状态

不做任何处理

3)其他状态(leading,following)

这里不太理解。Outofelection存储的应该是该算法instance中本轮选举之外的消息,为什么要用它来做判断呢,所有n.state != ServerState.LEADING的情况都放在一起处理?不管这两个提案号之间相差多少?相关代码段:



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper