raft exercise
unknown
golang
2 years ago
16 kB
121
No Index
package raft // // this is an outline of the API that raft must expose to // the service (or tester). see comments below for // each of these functions for more details. // // rf = Make(...) // create a new Raft server. // rf.Start(command interface{}) (index, term, isleader) // start agreement on a new log entry // rf.GetState() (term, isLeader) // ask a Raft for its current term, and whether it thinks it is leader // ApplyMsg // each time a new entry is committed to the log, each Raft peer // should send an ApplyMsg to the service (or tester) // in the same server. // import ( "math/rand" "mit_distributed_systems/labrpc" "sync" "sync/atomic" "time" ) // import "bytes" // import "../labgob" // as each Raft peer becomes aware that successive log entries are // committed, the peer should send an ApplyMsg to the service (or // tester) on the same server, via the applyCh passed to Make(). set // CommandValid to true to indicate that the ApplyMsg contains a newly // committed log entry. // // in Lab 3 you'll want to send other kinds of messages (e.g., // snapshots) on the applyCh; at that point you can add fields to // ApplyMsg, but set CommandValid to false for these other uses. type ApplyMsg struct { CommandValid bool Command interface{} CommandIndex int } const ( FOLLOWER = 0 CANDIDATE = 1 LEADER = 2 ) type LogEntry struct { Command interface{} Term int } // A Go object implementing a single Raft peer. type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. state int currentTerm int votedFor int numVotes int commitIndex int lastApplied int nextIndex []int matchIndex []int log []LogEntry // Channels votedChan chan bool heartBeatChan chan bool electionWonChan chan bool stepDownChan chan bool } // return currentTerm and whether this server // believes it is the leader. func (rf *Raft) GetState() (int, bool) { rf.mu.Lock() defer rf.mu.Unlock() return rf.currentTerm, rf.state == LEADER } // save Raft's persistent state to stable storage, // where it can later be retrieved after a crash and restart. // see paper's Figure 2 for a description of what should be persistent. func (rf *Raft) persist() { // Your code here (2C). // Example: // w := new(bytes.Buffer) // e := labgob.NewEncoder(w) // e.Encode(rf.xxx) // e.Encode(rf.yyy) // data := w.Bytes() // rf.persister.SaveRaftState(data) } // restore previously persisted state. func (rf *Raft) readPersist(data []byte) { if data == nil || len(data) < 1 { // bootstrap without any state? return } // Your code here (2C). // Example: // r := bytes.NewBuffer(data) // d := labgob.NewDecoder(r) // var xxx // var yyy // if d.Decode(&xxx) != nil || // d.Decode(&yyy) != nil { // error... // } else { // rf.xxx = xxx // rf.yyy = yyy // } } // example RequestVote RPC arguments structure. // field names must start with capital letters! type RequestVoteArgs struct { // Your data here (2A, 2B). Term int CandidateId int LastLogIndex int LastLogTerm int } // example RequestVote RPC reply structure. // field names must start with capital letters! type RequestVoteReply struct { // Your data here (2A). Term int VoteGranted bool } // example RequestVote RPC handler. func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm if args.Term < rf.currentTerm { // if candidate's term lower reply.VoteGranted = false // don't grant vote return } if args.Term > rf.currentTerm { // if the request vote has a higher term, become a follower rf.toFollower(args.Term) } reply.VoteGranted = false // initially don't give vote if rf.votedFor == -1 || rf.votedFor == args.CandidateId { // Could possibly do this in one if statement, but it seems more readable this way if args.LastLogTerm > rf.getLastTerm() { // candidate's last log term is higher reply.VoteGranted = true rf.votedFor = args.CandidateId rf.sendToNonBlockChan(rf.votedChan, true) return } if args.LastLogTerm == rf.getLastTerm() { // if the logs end with the same term if args.LastLogIndex >= rf.getLastIndex() { // if candidate's log is longer reply.VoteGranted = true // give it the vote rf.votedFor = args.CandidateId rf.sendToNonBlockChan(rf.votedChan, true) return } } } } // example code to send a RequestVote RPC to a server. // server is the index of the target server in rf.peers[]. // expects RPC arguments in args. // fills in *reply with RPC reply, so caller should // pass &reply. // the types of the args and reply passed to Call() must be // the same as the types of the arguments declared in the // handler function (including whether they are pointers). // // The labrpc package simulates a lossy network, in which servers // may be unreachable, and in which requests and replies may be lost. // Call() sends a request and waits for a reply. If a reply arrives // within a timeout interval, Call() returns true; otherwise // Call() returns false. Thus Call() may not return for a while. // A false return can be caused by a dead server, a live server that // can't be reached, a lost request, or a lost reply. // // Call() is guaranteed to return (perhaps after a delay) *except* if the // handler function on the server side does not return. Thus there // is no need to implement your own timeouts around Call(). // // look at the comments in ../labrpc/labrpc.go for more details. // // if you're having trouble getting RPC to work, check that you've // capitalized all field names in structs passed over RPC, and // that the caller passes the address of the reply struct with &, not // the struct itself. func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) if !ok { // if we didn't get a reply return } rf.mu.Lock() if rf.state != CANDIDATE { // if we're not the candidate return // skip the vote } if args.Term != rf.currentTerm { // if the term we sent is different from our term now return // skip the vote } if reply.Term > rf.currentTerm { // if the follower's term is higher than ours (there's another leader) return // skip the vote } if reply.VoteGranted { rf.numVotes++ if rf.numVotes == len(rf.peers)/2+1 { rf.sendToNonBlockChan(rf.electionWonChan, true) } } rf.mu.Unlock() } type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry LeaderCommit int } type AppendEntriesReply struct { Term int Success bool } func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.Success = false return } if args.Term > rf.currentTerm { reply.Term = args.Term rf.toFollower(args.Term) } rf.sendToNonBlockChan(rf.heartBeatChan, true) } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) if !ok { return } rf.mu.Lock() defer rf.mu.Unlock() if rf.state != LEADER { return } if args.Term != rf.currentTerm { return } if reply.Term < rf.currentTerm { return } if reply.Term > rf.currentTerm { rf.sendToNonBlockChan(rf.stepDownChan, true) return } } // the service using Raft (e.g. a k/v server) wants to start // agreement on the next command to be appended to Raft's log. if this // server isn't the leader, returns false. otherwise start the // agreement and return immediately. there is no guarantee that this // command will ever be committed to the Raft log, since the leader // may fail or lose an election. even if the Raft instance has been killed, // this function should return gracefully. // // the first return value is the index that the command will appear at // if it's ever committed. the second return value is the current // term. the third return value is true if this server believes it is // the leader. func (rf *Raft) Start(command interface{}) (int, int, bool) { index := -1 term := -1 isLeader := true // Your code here (2B). return index, term, isLeader } // the tester doesn't halt goroutines created by Raft after each test, // but it does call the Kill() method. your code can use killed() to // check whether Kill() has been called. the use of atomic avoids the // need for a lock. // // the issue is that long-running goroutines use memory and may chew // up CPU time, perhaps causing later tests to fail and generating // confusing debug output. any goroutine with a long-running loop // should call killed() to check whether it should stop. func (rf *Raft) Kill() { atomic.StoreInt32(&rf.dead, 1) // Your code here, if desired. } func (rf *Raft) killed() bool { z := atomic.LoadInt32(&rf.dead) return z == 1 } // this function func (rf *Raft) handleServer() { for !rf.killed() { rf.mu.Lock() serverState := rf.state rf.mu.Unlock() switch serverState { // Followers: // reset election timer on heartbeat // reset election timer on vote // handle election timeouts case FOLLOWER: select { case <-rf.votedChan: // skip re-election if we voted case <-rf.heartBeatChan: // skip re-election timer if we got a heart-beat case <-time.After(rf.getElectionTimeout()): // convert from follower to candidate and start election rf.toCandidate(FOLLOWER) } // Candidates need to handle: case CANDIDATE: select { case <-rf.stepDownChan: // we are already a follower, so next select iteration it will go to the follower case case <-rf.electionWonChan: //time.Sleep(1 * time.Millisecond) rf.toLeader() // There is a lock conflict here. If this function acquires to lock instantly, some other crucial part of the program doesn't, and it fails. case <-time.After(rf.getElectionTimeout()): rf.toCandidate(CANDIDATE) } // Leaders need to handle: case LEADER: select { case <-rf.stepDownChan: // we are already a follower, so next select iteration it won't send the heartbeat case <-time.After(120 * time.Millisecond): rf.mu.Lock() rf.heartBeat() rf.mu.Unlock() } } } } func (rf *Raft) toLeader() { //time.Sleep(1 * time.Millisecond) rf.mu.Lock() defer rf.mu.Unlock() if rf.state != CANDIDATE { // if we're not the candidate (maybe somebody else won the election before us) return // return } rf.cleanUpChans() rf.state = LEADER rf.nextIndex = make([]int, len(rf.peers)) rf.matchIndex = make([]int, len(rf.peers)) lastLogIndex := rf.getLastIndex() + 1 for i := 0; i < len(rf.peers); i++ { rf.nextIndex[i] = lastLogIndex } rf.heartBeat() } func (rf *Raft) toCandidate(originalState int) { rf.mu.Lock() defer rf.mu.Unlock() if rf.state != originalState { return } rf.cleanUpChans() // change state rf.state = CANDIDATE // increase current term rf.currentTerm++ // vote for self rf.votedFor = rf.me rf.numVotes = 1 // clean-up all the channels rf.startElection() } // always check that lock is being held before calling this function! func (rf *Raft) toFollower(term int) { initialState := rf.state // need to change state before asking leader (if server is the leader) to step down // so that in the next switch iteration it will go right to the follower case rf.state = FOLLOWER rf.currentTerm = term rf.votedFor = -1 // now that our state is the follower, we can check the initial state before the change if initialState != FOLLOWER { // if we are the leader rf.sendToNonBlockChan(rf.stepDownChan, true) // step down } } // always check that lock is being held before calling this function! func (rf *Raft) startElection() { if rf.state != CANDIDATE { return } args := RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: rf.getLastIndex(), LastLogTerm: rf.getLastTerm(), } for server := range rf.peers { if server == rf.me { continue } go rf.sendRequestVote(server, &args, &RequestVoteReply{}) } } // always check that lock is being held before calling this function! func (rf *Raft) heartBeat() { if rf.state != LEADER { return } for server := range rf.peers { if server == rf.me { continue } entries := rf.log[rf.nextIndex[server]:] prevLogIndex := rf.nextIndex[server] - 1 prevLogTerm := -1 if prevLogIndex > -1 { prevLogTerm = rf.log[prevLogIndex].Term } args := AppendEntriesArgs{ Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: prevLogIndex, PrevLogTerm: prevLogTerm, Entries: make([]LogEntry, len(entries)), LeaderCommit: rf.commitIndex, } copy(args.Entries, entries) go rf.sendAppendEntries(server, &args, &AppendEntriesReply{}) } } // always check that lock is being held before calling this function! func (rf *Raft) getLastIndex() int { return len(rf.log) - 1 } // always check that lock is being held before calling this function! func (rf *Raft) getLastTerm() int { if rf.getLastIndex() == -1 { return -1 } return rf.log[rf.getLastIndex()].Term } // always check that lock is being held before calling this function! func (rf *Raft) cleanUpChans() { rf.heartBeatChan = make(chan bool) rf.votedChan = make(chan bool) rf.stepDownChan = make(chan bool) rf.electionWonChan = make(chan bool) } func (rf *Raft) getElectionTimeout() time.Duration { time := time.Duration(350+rand.Intn(250)) * time.Millisecond return time } // this allows us to send a message to the channel without blocking // it avoids a lot of stalling and slowing down of servers which would lead to // timing problems func (rf *Raft) sendToNonBlockChan(c chan bool, x bool) { select { case c <- x: default: } } // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this // server's port is peers[me]. all the servers' peers[] arrays // have the same order. persister is a place for this server to // save its persistent state, and also initially holds the most // recent saved state, if any. applyCh is a channel on which the // tester or service expects Raft to send ApplyMsg messages. // Make() must return quickly, so it should start goroutines // for any long-running work. func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.state = FOLLOWER rf.currentTerm = 0 rf.log = make([]LogEntry, 0) rf.currentTerm = 0 rf.heartBeatChan = make(chan bool) rf.votedChan = make(chan bool) rf.electionWonChan = make(chan bool) rf.stepDownChan = make(chan bool) rf.votedFor = -1 rf.commitIndex = 0 rf.lastApplied = 0 rf.matchIndex = make([]int, len(rf.peers)) rf.nextIndex = make([]int, len(rf.peers)) // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) go rf.handleServer() return rf }
Editor is loading...