MIT6.824-lab2 raft
2017-10-22 21:25
543 查看
Raft- Part A
本次的实验为MIT6.824 Distribute System 中 Lab2 raft part A,通过课程所给的代码的骨架,要求实验raft中选举部分的功能。由于我的水平较渣,觉得课程的难度较大,与其说是实验比如说是对Github上的大神的代码进行理解,并结合论文进行思考。希望有朝一日能够自己做出一个实验啊!Implement leader election and heartbeats (
AppendEntriesRPCs with no log entries). The goal for Part 2A is for a single leader to be elected, for the leader to remain the leader if there are no failures, and for a new leader to take over if the old leader fails or if packets to/from the old leader are lost. Run
go test -run 2Ato test your 2A code.
Part A 部分要求实现raft的选举功能,其中raft中的每一个server都会维护自己的一个raft的结构体,其中只有两种RPC:
AppendEntries RPC和
RequestVote RPC。其中当
AppendEntres RPC中的log为空时用作
HeartBeat维护server 和 follower之间的通信,而
RequestVote RPC则用作当follower转变成candidate时,向其他的server请求投票时的通信。整个选举的过程可以概括为下面的这张图,而其中又有许多的细节需要结合论文中的Figure 2中的每一个条件实现;
可以概括为如下几个步骤
系统初始化时,所有的server都是FLLOWER状态,并维护一个计时器,计时结束会成为CANDIDATE并开始选举;FOLLOWER每当收到一次Herat Beat或
RequestVote RPC时重置计时器;
CANDIDATE在选举的时候依旧会维护一个计时器向所有的SERVER发送
Request Vote,在结束时仍未选举完,重新开始选举。当CANDIDATE获得大部分的投票后变为LEADER;在选举的过程中若收到来自更新的term的
AppendEntries RPC或者自己的term小于其他server的term时,转变为FOLLOWER;
LEADER开始广播
Heart Beat包,广播自己成为LEADER的事实,并且维护和FOLLOWER之间的通信。当发现FOLLOWER的reply中有更高的term时转变为FOLLOWER;
代码解析
定义一些常量const ( FOLLOWER = iota CANDIDATE LEADER HEARTBEAT_INTERVAL = 100 MIN_ELECTION_INTERVAL = 400 MAX_ELECTION_INTERVAL = 500 )
创建必要的结构体
raft相当于server中的state machine
type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. votedFor int //投票给某个CANDIDATE id voteAcquired int //收到的票数总数,用于判断能否成为LEADER state int32 //server当前的状态 currentTerm int32 //server当前的term voteCh chan struct{} //由于是并行发送用于指示server收到VoteRequest后的操作 appendCh chan struct{} //收到Append Entries后的操作 electionTimer *time.Timer //用于FOLLOWER和CANDIDATE的操作 }
发送
RequestVote RPC包中的内容
type RequestVoteArgs struct { Term int32 CandidateId int }
server对于
RequestVote RPC包的回复
type RequestVoteReply struct { // Your data here (2A). Term int32 VoteGranted bool }
发送
AppendEntries RPC包中的内容
//此处只涉及选举,不考虑replication及log中的内容 type AppendEntriesArgs struct { Term int32 LeaderID int }
接收
AppendEntries RPC包,并回复
同样这里不考虑CANDIDATE中的log是否是最新的,只要有合适的
VoteReuest PRC就投票
type AppendEntriesReply struct { Term int32 Success bool }
一些重要的工具函数
func (rf *Raft) getTerm() int32 { return atomic.LoadInt32(&rf.currentTerm) } func (rf *Raft) incrementTerm() int32 { return atomic.AddInt32(&rf.currentTerm, 1) } func (rf *Raft) isState(state int32) bool { return atomic.LoadInt32(&rf.state) == state } func (rf *Raft) GetState() (int, bool) { var term int var isleader bool // Your code here (2A). term = int(rf.getTerm()) isleader = rf.isState(LEADER) return term, isleader } func randElectionDuration() time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) return time.Millisecond * time.Duration( r.Int63n(MAX_ELECTION_INTERVAL-MIN_ELECTION_INTERVAL)+MIN_ELECTION_INTERVAL ) }
CANDIDATE发送RequestVote RPC
包
用于CANDIDATE发送包和对reply的处理遍历每一个server,若投票了则voteGranted自增1
若发现reply中的term大于自身当前的term,则转变为FOLLOWER
func (rf *Raft) broadcastVoteReq() { args := RequestVoteArgs{Term: atomic.LoadInt32(&rf.currentTerm), CandidateId: rf.me} for i, _ := range rf.peers { if i == rf.me { continue } // is it is candidate then send vote req go func(server int) { var reply RequestVoteReply if rf.isState(CANDIDATE) && rf.sendRequestVote(server, &args, &reply) { rf.mu.Lock() defer rf.mu.Unlock() //deal with the reply if reply.VoteGranted { rf.voteAcquired += 1 } else { if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.updateStateTo(FOLLOWER) d6cd } } } else { fmt.Printf("Server %d send vote req failed.\n", rf.me) } }(i) } }
server对RequestVote RPC
的处理
依据论文Figure 2中的要求,有以下几个处理的要点:- 判断自身的term是否大于CANDIDATE的term,若是则拒绝投票,并更新reply中的term
- 若当前的term小于CANDIDATE发送的term,更新自身的term,并将自己的状态变为follower(当前状态为CANDIDATE或LEADER)
- 当两者的term相等时,必须是初始状态,否则拒绝投票(FOLLOWER转变为CANDIDATE时候term自增1,因此必须大于其他server)
- 投票成功后,利用信道通知主循环(整个操作是并行的)
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm { reply.VoteGranted = false reply.Term = rf.currentTerm } else if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.updateStateTo(FOLLOWER) rf.votedFor = args.CandidateId reply.VoteGranted = true } else { if rf.votedFor == -1 { rf.votedFor = args.CandidateId reply.VoteGranted = true } else { reply.VoteGranted = false } } if reply.VoteGranted == true { go func() { rf.voteCh <- struct{}{} }() } }
LEADER广播AppendEntries RPC
,和对reply的处理
由于这里只是单纯的选举所以处理的逻辑很简单,只处理失败的回复:如果reply中的term大于当前LEADER中的term,则LEADER将自己的状态变成FOLLOWER
func (rf *Raft) broadcastAppendEnrties() { args := AppendEntriesArgs{Term: atomic.LoadInt32(&rf.currentTerm), LeaderID: rf.me} for i, _ := range rf.peers { if i == rf.me { continue } go func(server int) { var reply AppendEntriesReply if rf.isState(LEADER) && rf.sendAppendEntries(server, &args, &reply) { rf.mu.Lock() defer rf.mu.Unlock() if reply.Success { } else { if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.updateStateTo(FOLLOWER) } } } }(i) } }
server对AppendEntries RPC
的处理
同样,raft中的term可以作为判断是不是最新的依据,根据Figure 2,server在处理AppendEntries RPC,有如下注意点:
- 如果
args.Term < rf.currentTerm,
reply.Term = rf.currentTerm;
- 如果
args.Term > rf.currentTerm,更新当前的term并且将状态改变为FOLLOWER(针对当前状态为CANDIDATE的server)
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() //deal with the msg and reply if args.Term < rf.currentTerm { reply.Success = false reply.Term = rf.currentTerm } else if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.updateStateTo(FOLLOWER) reply.Success = true } else { reply.Success = true } go func() { rf.appendCh <- struct{}{} }() }
状态转变的函数
需要注意的是当转变成FOLLOWER时,要将自己的VoteFor初始化为-1,当转变成CANDIDATE时候要注意开始选举LEADER
func (rf *Raft) updateStateTo(state int32) { if rf.isState(state) { return } stateDesc := []string{"FOLLOWER", "CANDIDATE", "LEADER"} preState := rf.state switch state { case FOLLOWER: rf.state = FOLLOWER rf.votedFor = -1 case CANDIDATE: rf.state = CANDIDATE rf.startElection() case LEADER: rf.state = LEADER default: fmt.Printf("Warning: invalid state %d, do nothing.\n", state) } fmt.Printf("In term %d: Server %d transfer from %s to %s\n", rf.currentTerm, rf.me, stateDesc[preState], stateDesc[rf.state]) }
开始新一轮选举的函数
按照论文的要求在开始选举新的LEADER的时候要重置计时器,因为CANDIDATE会为自己投票所以票数初始置为1func (rf *Raft) startElection() { rf.incrementTerm() //first increase current term rf.votedFor = rf.me //vote for self rf.voteAcquired = 1 //acquire self's vote rf.electionTimer.Reset(randElectionDuration()) rf.broadcastVoteReq() }
整个状态的循环实现
不断的循环,并根据当前server的角色进行不同的操作FOLLOWER
当收到CANDIDATE发送的
VoteRequest RPC时,重置计时器
当收到LEADER发送的
AppendEntries RPC时,重置计时器
当计时器结束时,说明当前的server断绝联系,于是FOLLOWER转变为CANDIDATE开始新一轮的选举
CANDIDATE
在收到
AppendEntries RPC时,说明LEADER依旧存在,停止选举转变为FOLLOWER
当计时器结束时,重新开始选举
当投票数大于总数的一半时,当选LEADER
LEADER
间隔一段时间发送
HeartBeat心跳包维持LEADER和其他的server的通信
func (rf *Raft) startLoop() { rf.electionTimer = time.NewTimer(randElectionDuration()) for { switch atomic.LoadInt32(&rf.state) { case FOLLOWER: select { case <-rf.voteCh: rf.electionTimer.Reset(randElectionDuration()) case <-rf.appendCh: rf.electionTimer.Reset(randElectionDuration()) case <-rf.electionTimer.C: //time out rf.mu.Lock() rf.updateStateTo(CANDIDATE) rf.mu.Unlock() } case CANDIDATE: rf.mu.Lock() select { case <-rf.appendCh: rf.updateStateTo(FOLLOWER) case <-rf.electionTimer.C: rf.electionTimer.Reset(randElectionDuration()) rf.startElection() default: if rf.voteAcquired > len(rf.peers)/2 { rf.updateStateTo(LEADER) } } rf.mu.Unlock() case LEADER: rf.broadcastAppendEnrties() time.Sleep(HEARTBEAT_INTERVAL * time.Millisecond) } }
初始化并开始循环
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.state = FOLLOWER rf.votedFor = -1 rf.voteCh = make(chan struct{}) rf.appendCh = make(chan struct{}) // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) go rf.startLoop() return rf }
总结
第一次将论文中的算法“实现”,虽然是参考了大神的代码,还是觉得收获满满。首先,纠正了我看论文一目十行的方法,以前总觉得论文这东西太虚,只是所谓的想法,经过这次无数遍的研读之后,发现论文中的每一句话对于整个算法的执行都有至关重要的作用。因此,以后在看论文的时候,需要逐句理解,最好能够抽象出整个模型,并且思考论文这么做对于整个系统的实现有什么好处
其次,自己的代码的能力实在是弱的不行,我能看得懂这些代码,并能在脑中把每一个函数链接起来,但是距离完全靠自己的能力去实现一个这样的系统还有相当长的路要走。
希望能坚持将自己所看的论文、方法都能用自己的话去说出来,加深下理解。
我的新建的博客欢迎大家访问。
本博客基于Hexo和Next主题搭建
本代码参考链接
相关文章推荐
- MIT6.824 Lab2 Raft(1)
- MIT6.824 Lab2 Raft(2)
- MIT6.824 Lab2 Raft(3)
- MIT 6.824 : Spring 2015 lab2 训练笔记
- MIT 6.824 lab2 启动流程以及raft算法实现
- MIT 6.824 Spring 2015
- MIT6.824 Lab1 MapReduce
- 分布式系统MIT6.824设计详解
- MIT 6.824 Lab1 mapreduce
- [MIT6.828]LAB2 Exercise & Question总结
- [MIT 6.824 Distributed System] Google File System
- 分布式系统MIT 6.824学习资源
- [MIT6.828]LAB2 Challenge 总结
- MIT6.824 课程学习初步
- MIT 6.828 Lab2
- MIT6.828 Lab2:第1部分 Physical Page Management
- MIT 6.824 : Spring 2015 lab1 训练笔记
- MIT 分布式系统 6.824 2012 LAB4
- MIT6.828 Lab2: 第3部分 Kernel Address Space
- mit 6.824 mapreduce