Raft

Raft is a consensus algorithm that maintains shared log in a distributed environment. For a bunch of server running Raft, one is leader and others are followers. The leader reads requests from client, make sure quorum of servers has agreed on the request, then apply the request and reply to client.

When a leader timeout or fails, other followers starts leader election to elect a new leader.

Leader Election

We start by initializing raft struct on each instance. The upper layer application calls Make to initialize the raft struct. According to the Raft paper’s figure 2, we need state, currentTerm, votedFor, and log in raft struct. We also need electionTimer and majority for leader election.

type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int
}

type Entry struct {
	Term int
}

type ServerState int

const (
	Follower ServerState = iota
	Candidate
	Leader
)

// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (3A, 3B, 3C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	state ServerState
	// persistent state on all servers
	currentTerm int
	votedFor    int
	log         []Entry
	// // volatile state on all servers
	// commitIndex int
	// lastApplied int
	// // volatile state on leaders (reinitialized after election)
	// nextIndex  []int
	// matchIndex []int

	electionTimer time.Time
	majority      int
}

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (3A, 3B, 3C).
	rf.state = Follower
	rf.currentTerm = 0
	rf.votedFor = -1 // null
	rf.log = make([]Entry, 1)
	// rf.commitIndex = 0
	// rf.lastApplied = 0
	// rf.nextIndex = make([]int, len(peers))
	// rf.matchIndex = make([]int, len(peers))
	rf.electionTimer = time.Now()
	rf.majority = len(peers)/2 + 1

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start ticker goroutine to start elections
	go rf.ticker()
	return rf
}

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true

	// Your code here (3B).

	return index, term, isLeader
}

func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (3A).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	term = rf.currentTerm
	isleader = (rf.state == Leader)
	return term, isleader
}

Notice that we start ticker goroutine when we initialize the raft struct.

When the instance is alive, we check if a leader election should be started. If yes, the instance increment its term, vote for itself, change the state of the instance to Candidate, and start leaderElection goroutine.

When the instance is alive and it is Leader, we start heartbeat goroutine to send heartbeat message to all followers.

We pause a random amount of time between each check.

func (rf *Raft) ticker() {
	for rf.killed() == false {
		// Your code here (3A)
		// Check if a leader election should be started.
		rf.mu.Lock()
		if time.Since(rf.electionTimer) >= time.Duration(1000)*time.Millisecond && rf.state != Leader {
			// start leader election
			rf.currentTerm += 1
			rf.votedFor = rf.me
			rf.state = Candidate
			rf.electionTimer = time.Now()
			args := RequestVoteArgs{
				Term:         rf.currentTerm,
				CandidateId:  rf.me,
				LastLogIndex: len(rf.log) - 1,
				LastLogTerm:  rf.log[len(rf.log)-1].Term,
			}
			go rf.leaderElection(args)
		} else if rf.state == Leader {
			args := AppendEntriesArgs{
				Term:     rf.currentTerm,
				LeaderId: rf.me,
			}
			go rf.heartbeat(args)
		}
		rf.mu.Unlock()

		// pause for a random amount of time between 50 and 350
		// milliseconds.
		ms := 50 + (rand.Int63() % 300)
		time.Sleep(time.Duration(ms) * time.Millisecond)
	}
}

Leader election uses RPC to run RequestVote on all other instances. If the instance get the vote, it increments the vote count. If it gets majority votes and it is still in Candidate state, the instance becomes Leader.

reply := <-ch is the Go way of notifying the thread has done.

In RequestVote, we use the condition described in the paper (if not yet voted and requester has newer or same log) to see if we are going to give the vote to the requester.

func (rf *Raft) leaderElection(args RequestVoteArgs) {
	count := 1
	ch := make(chan RequestVoteReply)
	for idx := range rf.peers {
		if idx == rf.me {
			continue
		}
		go func(idx int) {
			reply := RequestVoteReply{}
			ok := rf.sendRequestVote(idx, &args, &reply)
			if !ok {
				for {
					if rf.sendRequestVote(idx, &args, &reply) {
						break
					}
				}
			}
			ch <- reply
		}(idx)
	}
	for i := 0; i < len(rf.peers)-1; i++ {
		reply := <-ch
		rf.mu.Lock()
		// see lower term, ignore
		if reply.Term < rf.currentTerm {
			rf.mu.Unlock()
			continue
		}
		// see higher term, convert to follower
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.state = Follower
			rf.votedFor = -1
		}
		if reply.VoteGranted {
			count += 1
		}
		if rf.state == Candidate && count >= rf.majority {
			// become leader
			rf.state = Leader
		}
		rf.mu.Unlock()
	}
}

type RequestVoteArgs struct {
	// Your data here (3A, 3B).
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

type RequestVoteReply struct {
	// Your data here (3A).
	Term        int
	VoteGranted bool
}

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (3A, 3B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	// see higher term, convert to follower
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = Follower
		rf.votedFor = -1
	}
	// reply false if term < currentTerm
	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		reply.VoteGranted = false
		return
	}

	reply.Term = rf.currentTerm
	reply.VoteGranted = false
	rfLastLogTerm := rf.log[len(rf.log)-1].Term
	rfLastLogIndex := len(rf.log) - 1
	if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (args.LastLogTerm > rfLastLogTerm || (args.LastLogTerm == rfLastLogTerm && args.LastLogIndex >= rfLastLogIndex)) {
		rf.votedFor = args.CandidateId
		rf.electionTimer = time.Now()
		reply.VoteGranted = true
	}
}

Heartbeat message will be used for synchronizing the log, but for this stage we only use it to keep followers updated. After sending heartbeat message to all followers, we wait for their reply by reply := <-ch.

When follower’s AppendEntries RPC was called and succeed, it will update its election timer, indicating the leader is still alive.


func (rf *Raft) heartbeat(args AppendEntriesArgs) {
	ch := make(chan AppendEntriesReply)
	for idx := range rf.peers {
		if idx == rf.me {
			continue
		}
		go func(idx int) {
			reply := AppendEntriesReply{}
			ok := rf.sendAppendEntries(idx, &args, &reply)
			if !ok {
				for {
					if rf.sendAppendEntries(idx, &args, &reply) {
						break
					}
				}
			}
			ch <- reply
		}(idx)
	}
	for i := 0; i < len(rf.peers)-1; i++ {
		reply := <-ch
		rf.mu.Lock()
		// see lower term, ignore
		if reply.Term < rf.currentTerm {
			rf.mu.Unlock()
			continue
		}
		// see higher term, convert to follower
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.state = Follower
			rf.votedFor = -1
		}
		rf.mu.Unlock()
	}
}

// AppendEntries RPC arguments
type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []interface{}
	LeaderCommit int
}

// AppendEntries RPC reply
type AppendEntriesReply struct {
	Term    int
	Success bool
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	return ok
}

// AppendEntries RPC handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	// see higher term, convert to follower
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = Follower
		rf.votedFor = -1
	}
	// reply false if term < currentTerm
	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}
	// while waiting for vote, see leader
	if rf.state == Candidate && args.Term >= rf.currentTerm {
		rf.state = Follower
	}

	rf.electionTimer = time.Now()
	reply.Term = rf.currentTerm
	reply.Success = true
}

References