Raft
Raft is a consensus algorithm that maintains shared log in a distributed environment. For a bunch of server running Raft, one is leader and others are followers. The leader reads requests from client, make sure quorum of servers has agreed on the request, then apply the request and reply to client.
When a leader timeout or fails, other followers starts leader election to elect a new leader.
Leader Election
We start by initializing raft struct on each instance. The upper layer application calls Make to initialize the raft struct. According to the Raft paper’s figure 2, we need state, currentTerm, votedFor, and log in raft struct. We also need electionTimer and majority for leader election.
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
type Entry struct {
Term int
}
type ServerState int
const (
Follower ServerState = iota
Candidate
Leader
)
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
state ServerState
// persistent state on all servers
currentTerm int
votedFor int
log []Entry
// // volatile state on all servers
// commitIndex int
// lastApplied int
// // volatile state on leaders (reinitialized after election)
// nextIndex []int
// matchIndex []int
electionTimer time.Time
majority int
}
//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
// start agreement on a new log entry
// rf.GetState() (term, isLeader)
// ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (3A, 3B, 3C).
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = -1 // null
rf.log = make([]Entry, 1)
// rf.commitIndex = 0
// rf.lastApplied = 0
// rf.nextIndex = make([]int, len(peers))
// rf.matchIndex = make([]int, len(peers))
rf.electionTimer = time.Now()
rf.majority = len(peers)/2 + 1
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
go rf.ticker()
return rf
}
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (3B).
return index, term, isLeader
}
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (3A).
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = (rf.state == Leader)
return term, isleader
}
Notice that we start ticker goroutine when we initialize the raft struct.
When the instance is alive, we check if a leader election should be started. If yes, the instance increment its term, vote for itself, change the state of the instance to Candidate, and start leaderElection goroutine.
When the instance is alive and it is Leader, we start heartbeat goroutine to send heartbeat message to all followers.
We pause a random amount of time between each check.
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here (3A)
// Check if a leader election should be started.
rf.mu.Lock()
if time.Since(rf.electionTimer) >= time.Duration(1000)*time.Millisecond && rf.state != Leader {
// start leader election
rf.currentTerm += 1
rf.votedFor = rf.me
rf.state = Candidate
rf.electionTimer = time.Now()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.log) - 1,
LastLogTerm: rf.log[len(rf.log)-1].Term,
}
go rf.leaderElection(args)
} else if rf.state == Leader {
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
}
go rf.heartbeat(args)
}
rf.mu.Unlock()
// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
Leader election uses RPC to run RequestVote on all other instances. If the instance get the vote, it increments the vote count. If it gets majority votes and it is still in Candidate state, the instance becomes Leader.
reply := <-ch is the Go way of notifying the thread has done.
In RequestVote, we use the condition described in the paper (if not yet voted and requester has newer or same log) to see if we are going to give the vote to the requester.
func (rf *Raft) leaderElection(args RequestVoteArgs) {
count := 1
ch := make(chan RequestVoteReply)
for idx := range rf.peers {
if idx == rf.me {
continue
}
go func(idx int) {
reply := RequestVoteReply{}
ok := rf.sendRequestVote(idx, &args, &reply)
if !ok {
for {
if rf.sendRequestVote(idx, &args, &reply) {
break
}
}
}
ch <- reply
}(idx)
}
for i := 0; i < len(rf.peers)-1; i++ {
reply := <-ch
rf.mu.Lock()
// see lower term, ignore
if reply.Term < rf.currentTerm {
rf.mu.Unlock()
continue
}
// see higher term, convert to follower
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
}
if reply.VoteGranted {
count += 1
}
if rf.state == Candidate && count >= rf.majority {
// become leader
rf.state = Leader
}
rf.mu.Unlock()
}
}
type RequestVoteArgs struct {
// Your data here (3A, 3B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
// Your data here (3A).
Term int
VoteGranted bool
}
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (3A, 3B).
rf.mu.Lock()
defer rf.mu.Unlock()
// see higher term, convert to follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1
}
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
rfLastLogTerm := rf.log[len(rf.log)-1].Term
rfLastLogIndex := len(rf.log) - 1
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (args.LastLogTerm > rfLastLogTerm || (args.LastLogTerm == rfLastLogTerm && args.LastLogIndex >= rfLastLogIndex)) {
rf.votedFor = args.CandidateId
rf.electionTimer = time.Now()
reply.VoteGranted = true
}
}
Heartbeat message will be used for synchronizing the log, but for this stage we only use it to keep followers updated. After sending heartbeat message to all followers, we wait for their reply by reply := <-ch.
When follower’s AppendEntries RPC was called and succeed, it will update its election timer, indicating the leader is still alive.
func (rf *Raft) heartbeat(args AppendEntriesArgs) {
ch := make(chan AppendEntriesReply)
for idx := range rf.peers {
if idx == rf.me {
continue
}
go func(idx int) {
reply := AppendEntriesReply{}
ok := rf.sendAppendEntries(idx, &args, &reply)
if !ok {
for {
if rf.sendAppendEntries(idx, &args, &reply) {
break
}
}
}
ch <- reply
}(idx)
}
for i := 0; i < len(rf.peers)-1; i++ {
reply := <-ch
rf.mu.Lock()
// see lower term, ignore
if reply.Term < rf.currentTerm {
rf.mu.Unlock()
continue
}
// see higher term, convert to follower
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
}
rf.mu.Unlock()
}
}
// AppendEntries RPC arguments
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []interface{}
LeaderCommit int
}
// AppendEntries RPC reply
type AppendEntriesReply struct {
Term int
Success bool
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
// AppendEntries RPC handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// see higher term, convert to follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1
}
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// while waiting for vote, see leader
if rf.state == Candidate && args.Term >= rf.currentTerm {
rf.state = Follower
}
rf.electionTimer = time.Now()
reply.Term = rf.currentTerm
reply.Success = true
}