Untitled
unknown
golang
2 years ago
24 kB
24
Indexable
// this version can pass all up to basic agree 2B. But cannot pass fail agree 2B // after reconnection, it fails // // raft.go // ======= // Write your code in this file // We will use the original version of all other // files for testing // package raft // // API // === // This is an outline of the API that your raft implementation should // expose. // // rf = NewPeer(...) // Create a new Raft server. // // rf.PutCommand(command interface{}) (index, term, isleader) // PutCommand agreement on a new log entry // // rf.GetState() (me, term, isLeader) // Ask a Raft peer for "me", its current term, and whether it thinks it // is a leader // // ApplyCommand // Each time a new entry is committed to the log, each Raft peer // should send an ApplyCommand to the service (e.g. tester) on the // same server, via the applyCh channel passed to NewPeer() // import ( "fmt" "io/ioutil" "log" "math" "math/rand" "os" "sync" "time" "github.com/cmu440/rpc" ) // Set to false to disable debug logs completely // Make sure to set kEnableDebugLogs to false before submitting const kEnableDebugLogs = false // Set to true to log to stdout instead of file const kLogToStdout = true // Change this to output logs to a different directory const kLogOutputDir = "./raftlogs/" // // ApplyCommand // ======== // // As each Raft peer becomes aware that successive log entries are // committed, the peer should send an ApplyCommand to the service (or // tester) on the same server, via the applyCh passed to NewPeer() // type ApplyCommand struct { Index int Command interface{} } type LogEntry struct { Index int Term int Command interface{} // AppendCount int Committed bool } // // Raft struct // =========== // // A Go object implementing a single Raft peer // type Raft struct { mux sync.Mutex // Lock to protect shared access to this peer's state peers []*rpc.ClientEnd // RPC end points of all peers me int // this peer's index into peers[] // You are expected to create reasonably clear log files before asking a // debugging question on Piazza or OH. Use of this logger is optional, and // you are free to remove it completely. logger *log.Logger // We provide you with a separate logger per peer. // Your data here (2A, 2B). // Look at the Raft paper's Figure 2 for a description of what // state a Raft peer should maintain role int // 1 for leader, 2 for follower, and 3 for candidate currentTerm int votedFor int // -1 if not yet voted receivedVoteCount int // receive how many grants from all servers becomesLeader chan bool othersElected chan bool log []LogEntry // list or slice or other data structures? commitIndex int lastApplied int nextIndex []int // ?? leader maintains a nextIndex for each follower, which is the index of the next log entry the leader will send to the follower matchIndex []int // ?? closeRoutine chan bool votedResetTicker chan bool applyCh chan ApplyCommand } // // GetState() // ========== // // Return "me", current term and whether this peer // believes it is the leader // func (rf *Raft) GetState() (int, int, bool) { var me int var term int var isleader bool // Your code here (2A) // fmt.Println("In GetState!") rf.mux.Lock() // deadlock here! me = rf.me term = rf.currentTerm if rf.role == 1 { isleader = true } else { isleader = false } rf.mux.Unlock() // fmt.Println("GetState returns! me = ", me, ", term = ", term, ", isleader = ", isleader) return me, term, isleader } // // RequestVoteArgs // =============== // // Example RequestVote RPC arguments structure // // Please note // =========== // Field names must start with capital letters! // type RequestVoteArgs struct { // Your data here (2A, 2B) Term int CandidateId int LastLogIndex int LastLogTerm int } // // RequestVoteReply // ================ // // Example RequestVote RPC reply structure. // // Please note // =========== // Field names must start with capital letters! // // type RequestVoteReply struct { // Your data here (2A) Term int VoteGranted bool } // // RequestVote // =========== // // Example RequestVote RPC handler // func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // followers handles an incoming RPC // Your code here (2A, 2B) rf.mux.Lock() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm // fmt.Println("1: after reply term = ", reply.Term) reply.VoteGranted = false rf.mux.Unlock() return } else { if (args.Term > rf.currentTerm) { // MODIFIED!!! ERROR means this follower needs to be updated (not in the current term yet) rf.votedFor = -1 // fmt.Println("case 1: args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm, ", ", rf.me, " changes back to follower") rf.currentTerm = args.Term rf.role = 2 rf.mux.Unlock() } else { rf.mux.Unlock() } rf.mux.Lock() // TODO: see whether the candidate's log is up-to-date upToDate := false if args.LastLogTerm > rf.log[len(rf.log)-1].Term { upToDate = true } if (args.LastLogTerm == rf.log[len(rf.log)-1].Term) && args.LastLogIndex >= rf.log[len(rf.log)-1].Index { upToDate = true } if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && upToDate { // fmt.Println(rf.me, " votes true in the leader election === , it votes for ", args.CandidateId) reply.Term = args.Term // fmt.Println("2: after reply term = ", reply.Term) reply.VoteGranted = true rf.votedFor = args.CandidateId rf.role = 2 rf.mux.Unlock() rf.votedResetTicker <- true } else { reply.Term = args.Term // fmt.Println("3: after reply term = ", reply.Term) // reply.Term = rf.currentTerm // ??????????? reply.VoteGranted = false rf.mux.Unlock() return } } } // sendRequestVote // =============== // // Example code to send a RequestVote RPC to a server // // server int -- index of the target server in // rf.peers[] // // args *RequestVoteArgs -- RPC arguments in args // // reply *RequestVoteReply -- RPC reply // // The types of args and reply passed to Call() must be // the same as the types of the arguments declared in the // handler function (including whether they are pointers) // // The rpc package simulates a lossy network, in which servers // may be unreachable, and in which requests and replies may be lost // // Call() sends a request and waits for a reply // // If a reply arrives within a timeout interval, Call() returns true; // otherwise Call() returns false // // Thus Call() may not return for a while // // A false return can be caused by a dead server, a live server that // can't be reached, a lost request, or a lost reply // // Call() is guaranteed to return (perhaps after a delay) // *except* if the handler function on the server side does not return // // Thus there // is no need to implement your own timeouts around Call() // // Please look at the comments and documentation in ../rpc/rpc.go // for more details // // If you are having trouble getting RPC to work, check that you have // capitalized all field names in the struct passed over RPC, and // that the caller passes the address of the reply struct with "&", // not the struct itself func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { // sends an RPC example (the candidate sends request to all other servers) ok := rf.peers[server].Call("Raft.RequestVote", args, reply) rf.mux.Lock() if ok == true { if (reply.VoteGranted == true) && (rf.currentTerm == reply.Term) { rf.receivedVoteCount += 1 if (rf.receivedVoteCount > len(rf.peers)/2) && (rf.role == 3) { // win election // notify the main routine that it wins the election // fmt.Println(rf.me, " -----------------------------WIN--------------------------") if reply.Term == rf.currentTerm { // means that the grant is not out of date rf.role = 1 rf.mux.Unlock() rf.becomesLeader <- true } else { rf.mux.Unlock() } } else { rf.mux.Unlock() } } else { // did not get granted if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.role = 2 // back to follower // fmt.Println("case 3:") rf.votedFor = -1 rf.mux.Unlock() } else { rf.mux.Unlock() } } } else { rf.mux.Unlock() } return ok } // self-added type AppendEntriesArgs struct { // Your data here (2A, 2B) Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry LeaderCommitIndex int } // self-added type AppendEntriesReply struct { // Your data here (2A, 2B) Term int Success bool } // self-added func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { select { default: if args != nil { // 1. rf.mux.Lock() if (args.Term < rf.currentTerm) { // ERROR!!! if leader is out of date // -------------- reply.Term = rf.currentTerm // fmt.Println("4: after reply term = ", reply.Term, ", rf.currentTerm = ", rf.currentTerm) // fmt.Println("rf.commitIndex = ", rf.commitIndex, ", args.LeaderCommitIndex = ", args.LeaderCommitIndex) reply.Success = false rf.mux.Unlock() return } if (rf.currentTerm < args.Term) { // ERROR!!! rf.currentTerm = args.Term rf.role = 2 // fmt.Println("case 4:") rf.votedFor = -1 } rf.mux.Unlock() if len(args.Entries) == 0 { // empty heartbeat // fmt.Println(rf.me, " $$$$$$$$$$$$$$$$ receive empty heartbeat") reply.Term = args.Term // fmt.Println("5: after reply term = ", reply.Term) rf.mux.Lock() rf.commitIndex = args.PrevLogIndex rf.mux.Unlock() reply.Success = true rf.othersElected <- true return } // ordinary appendEntries // fmt.Println("receive appendEntries ===") if (rf.log[len(rf.log)-1].Index != args.PrevLogIndex) || (rf.log[len(rf.log)-1].Term != args.PrevLogTerm){ // log does not match reply.Term = args.Term // fmt.Println("6: after reply term = ", reply.Term) reply.Success = false // TODO: Delete the logs that does not match (delete the last entry in the log) rf.mux.Lock() if len(rf.log) > 1 { // should not delete the default log ?????????????? rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log??? } // rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log??? // rf.currentTerm = args.Term // need this? ERROR ??? rf.mux.Unlock() } else { // leader and follower's previous log match, so just append the log directly rf.mux.Lock() // fmt.Println("^^^^^^^^^^^^^^^ before len of log = ", len(rf.log)) for i := 0; i < len(args.Entries); i++ { rf.log = append(rf.log, args.Entries[i]) } // fmt.Println("^^^^^^^^^^^^^^^ after len of log = ", len(rf.log)) reply.Term = args.Term // fmt.Println("7: after reply term = ", reply.Term) reply.Success = true // rf.currentTerm = args.Term // need this? ERROR ??? rf.mux.Unlock() } // TODO: receiver's rule 5: update commitIndex ???????? rf.mux.Lock() if args.LeaderCommitIndex > rf.commitIndex { // fmt.Println(rf.me, ", len of log = ", len(rf.log)) if args.LeaderCommitIndex <= rf.log[len(rf.log)-1].Index { rf.commitIndex = args.LeaderCommitIndex } else { rf.commitIndex = rf.log[len(rf.log)-1].Index } } // fmt.Println(rf.me, " ^^^^^^^^^^^^ commitIndex update to = ", rf.commitIndex, ", lastApplied = ", rf.lastApplied) // for i := 0; i < len(rf.log); i++ { // fmt.Println("log [", i, "] = ", rf.log[i].Command) // } rf.mux.Unlock() } } } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { rf.mux.Lock() if rf.role != 1 { // -------------- rf.mux.Unlock() // !!!!!!!!!!! ERROR return true } rf.mux.Unlock() ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) rf.mux.Lock() if ok == true { if rf.role != 1 { // is not leader anymore, should not send rf.mux.Unlock() return ok } if (reply.Term > args.Term) || (reply.Term > rf.currentTerm) { // out of date rf.currentTerm = reply.Term rf.role = 2 rf.votedFor = -1 // fmt.Println("case 1: reply.Term = ", reply.Term, ", args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm) rf.mux.Unlock() return false } if rf.currentTerm != args.Term { // already out of date rf.role = 2 // fmt.Println("case 6:") rf.votedFor = -1 // fmt.Println("case 2: rf.currentTerm = ", rf.currentTerm, ", args.Term = ", args.Term) rf.mux.Unlock() return false } // TODO if reply.Success == false { // means the log is not match, need to update the nextIndex // fmt.Println("@@@@@@@@@@@@ log does not match, so need to decrement by 1") rf.nextIndex[server] -= 1 rf.matchIndex[server] -= 1 // ERROR ????? rf.mux.Unlock() // re-send appendEntries?????? var args_2 AppendEntriesArgs // ERROR all added!!! args_2.Term = rf.currentTerm args_2.LeaderId = rf.me args_2.PrevLogIndex = rf.log[rf.nextIndex[server]-1].Index args_2.PrevLogTerm = rf.log[rf.nextIndex[server]-1].Term args_2.Entries = rf.log[rf.nextIndex[server]:] args_2.LeaderCommitIndex = rf.commitIndex var reply_2 AppendEntriesReply go rf.sendAppendEntries(server, &args_2, &reply_2) return false } else { // reply success // rf.mux.Lock() if len(args.Entries) != 0 { // if not heartbeat // TODO: update matchIndex and nextIndex ??? rf.matchIndex[server] = args.PrevLogIndex+len(args.Entries) rf.nextIndex[server] = args.PrevLogIndex+len(args.Entries)+1 // for i := 0; i < len(args.Entries); i++ { // rf.log[args.PrevLogIndex+1+i].AppendCount += 1 // // if rf.log[args.PrevLogIndex+1+i].AppendCount > len(rf.peers) { // ? len(rf.peers)/2 ??? NEED THIS? // // rf.log[args.PrevLogIndex+1+i].Committed = true // // } // } // TODO: see whether commitIndex can be updated (leaders' last rule in fig 2) for i := rf.commitIndex+1; i <= rf.log[len(rf.log)-1].Index; i++ { count := 0 for j := 0; j < len(rf.peers); j++ { if rf.matchIndex[j] >= i { count += 1 } } // count += 1 // ???????? // fmt.Println("count (how many servers have appended this log) = ", count) if (count > len(rf.peers)/2) && (rf.log[i].Term == rf.currentTerm) { rf.commitIndex = i } else { break } } rf.mux.Unlock() return true } else { // if heartbeat rf.mux.Unlock() return true } } } else { rf.mux.Unlock() } // rf.mux.Unlock() return ok } // // PutCommand // ===== // // The service using Raft (e.g. a k/v server) wants to start // agreement on the next command to be appended to Raft's log // // If this server is not the leader, return false // // Otherwise start the agreement and return immediately // // There is no guarantee that this command will ever be committed to // the Raft log, since the leader may fail or lose an election // // The first return value is the index that the command will appear at // if it is ever committed // // The second return value is the current term // // The third return value is true if this server believes it is // the leader // func (rf *Raft) PutCommand(command interface{}) (int, int, bool) { rf.mux.Lock() // index := len(rf.log)-1 index := len(rf.log) term := rf.currentTerm isLeader := false if rf.role == 1 { isLeader = true } rf.mux.Unlock() // Your code here (2B) if isLeader == true { // Append command to its log rf.mux.Lock() var newEntry LogEntry newEntry.Index = len(rf.log) newEntry.Term = rf.currentTerm newEntry.Command = command rf.log = append(rf.log, newEntry) rf.mux.Unlock() // fmt.Println("leader: ", rf.me, " appends ", newEntry.Command) // sendAppendEntries to all followers for i := 0; i < len(rf.peers); i++{ rf.mux.Lock() var args AppendEntriesArgs args.Term = rf.currentTerm // fmt.Println("########### currentTerm in PutCommand = ", rf.currentTerm) args.LeaderId = rf.me args.PrevLogIndex = rf.log[rf.nextIndex[i]-1].Index args.PrevLogTerm = rf.log[rf.nextIndex[i]-1].Term args.Entries = rf.log[rf.nextIndex[i]:] // fmt.Println("i = ", i, ", rf.nextIndex[i] = ", rf.nextIndex[i]) // fmt.Println("### len of args.Entries = ", len(args.Entries)) // fmt.Println("i = ", i, ", args.PrevLogIndex = ", args.PrevLogIndex, ", args.PrevLogTerm = ", args.PrevLogTerm) args.LeaderCommitIndex = rf.commitIndex var reply AppendEntriesReply rf.mux.Unlock() if i == rf.me { // do not need to send to itself rf.mux.Lock() rf.nextIndex[rf.me] += 1 // ?? added rf.matchIndex[rf.me] += 1 // ?? added ERROR!!! rf.mux.Unlock() continue } else { go rf.sendAppendEntries(i, &args, &reply) // another goroutines } } } return index, term, isLeader } // // Stop // ==== // // The tester calls Stop() when a Raft instance will not // be needed again // // You are not required to do anything // in Stop(), but it might be convenient to (for example) // turn off debug output from this instance // func (rf *Raft) Stop() { // Your code here, if desired rf.closeRoutine <- true } // // NewPeer // ==== // // The service or tester wants to create a Raft server // // The port numbers of all the Raft servers (including this one) // are in peers[] // // This server's port is peers[me] // // All the servers' peers[] arrays have the same order // // applyCh // ======= // // applyCh is a channel on which the tester or service expects // Raft to send ApplyCommand messages // // NewPeer() must return quickly, so it should start Goroutines // for any long-running work // func NewPeer(peers []*rpc.ClientEnd, me int, applyCh chan ApplyCommand) *Raft { rf := &Raft{} rf.mux.Lock() rf.peers = peers rf.me = me if kEnableDebugLogs { peerName := peers[me].String() logPrefix := fmt.Sprintf("%s ", peerName) if kLogToStdout { rf.logger = log.New(os.Stdout, peerName, log.Lmicroseconds|log.Lshortfile) } else { err := os.MkdirAll(kLogOutputDir, os.ModePerm) if err != nil { panic(err.Error()) } logOutputFile, err := os.OpenFile(fmt.Sprintf("%s/%s.txt", kLogOutputDir, logPrefix), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { panic(err.Error()) } rf.logger = log.New(logOutputFile, logPrefix, log.Lmicroseconds|log.Lshortfile) } rf.logger.Println("logger initialized") } else { rf.logger = log.New(ioutil.Discard, "", 0) } // Your initialization code here (2A, 2B) rf.role = 2 // default should be a follower rf.currentTerm = 0 // init to 0 rf.votedFor = -1 // not yet voted rf.receivedVoteCount = 0 rf.becomesLeader = make(chan bool) rf.othersElected = make(chan bool) rf.log = []LogEntry{} var emptyEntry LogEntry // make the log initially with size 1 (at index 0) emptyEntry.Index = 0 emptyEntry.Term = 0 // emptyEntry.Command = interface{} // ??? rf.log = append(rf.log, emptyEntry) rf.commitIndex = 0 rf.lastApplied = 0 rf.nextIndex = make([]int, len(rf.peers)) rf.matchIndex = make([]int, len(rf.peers)) rf.closeRoutine = make(chan bool) rf.votedResetTicker = make(chan bool) rf.mux.Unlock() rf.applyCh = applyCh go rf.main_routine() go rf.commit_routine() return rf } func (rf *Raft) main_routine() { for { rf.mux.Lock() curRole := rf.role rf.mux.Unlock() if curRole == 1 { // leader // rf.mux.Lock() // for i := 0; i < len(rf.peers); i++ { // fmt.Println("rf.nextIndex[", i, "] = ", rf.nextIndex[i]) // fmt.Println("rf.matchIndex[", i, "] = ", rf.matchIndex[i]) // } // rf.mux.Unlock() select { default: for i := 0; i < len(rf.peers); i++ { rf.mux.Lock() var args_1 AppendEntriesArgs // empty heartbeat args_1.Term = rf.currentTerm args_1.LeaderId = rf.me args_1.PrevLogIndex = 0 args_1.PrevLogTerm = 0 args_1.Entries = make([]LogEntry, 0) // change from string to LogEntry args_1.LeaderCommitIndex = rf.commitIndex // ERROR !!!!!!!!!!!!!!!!!!! rf.mux.Unlock() if i == rf.me { // do not need to send to itself continue } else { var reply AppendEntriesReply go rf.sendAppendEntries(i, &args_1, &reply) // another goroutines } } time.Sleep(100 * time.Millisecond) // every n time, send appendEntries messages case <-rf.closeRoutine: return } } else if curRole == 2 { // follower randomVal := math.Abs(rand.NormFloat64()) testTicker := time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal)) select { case <-rf.othersElected: testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal)) // -------------- case <-testTicker.C: rf.mux.Lock() // added here to avoid race condition rf.role = 3 rf.mux.Unlock() break case <-rf.votedResetTicker: testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal)) case <-rf.closeRoutine: return } } else if curRole == 3 { // candidate rf.mux.Lock() rf.currentTerm += 1 rf.votedFor = rf.me rf.receivedVoteCount = 1 // reset rf.mux.Unlock() candidateTicker := time.NewTicker(250 * time.Millisecond) var args RequestVoteArgs rf.mux.Lock() args.Term = rf.currentTerm args.CandidateId = rf.me args.LastLogIndex = rf.log[len(rf.log)-1].Index // ! args.LastLogTerm = rf.log[len(rf.log)-1].Term // ! rf.mux.Unlock() for i := 0; i < len(rf.peers); i++ { if i == rf.me { // do not need to send to itself continue } else { var reply RequestVoteReply go rf.sendRequestVote(i, &args, &reply) // a separate go routine } } stop := false for stop == false { select { case <-rf.becomesLeader: // this candidate wins the election rf.mux.Lock() fmt.Println(rf.me, " ******* becomes leader") rf.role = 1 // becomes the leader // TODO: init nextIndex[] and matchIndex[] for i := 0; i < len(rf.peers); i++ { rf.nextIndex[i] = rf.log[len(rf.log)-1].Index + 1 rf.matchIndex[i] = 0 // rf.matchIndex[i] = rf.nextIndex[i] - 1 // ?????????????? ERROR ? } rf.mux.Unlock() stop = true case <-rf.othersElected: // others win the election rf.mux.Lock() rf.role = 2 // change back to followers state // fmt.Println("case 7:") rf.votedFor = -1 rf.mux.Unlock() stop = true break case <-candidateTicker.C: rf.mux.Lock() rf.role = 2 // fmt.Println("case 8:") rf.votedFor = -1 rf.mux.Unlock() stop = true case <-rf.closeRoutine: return } } } } } func (rf *Raft) commit_routine() { // ensure that each server commits in order for { rf.mux.Lock() // fmt.Println(rf.me, "--------------------------------------------------------------") // fmt.Println("-------rf.commitIndex = ", rf.commitIndex, ", rf.lastApplied = ", rf.lastApplied, ", len(rf.log) = ", len(rf.log)) if (rf.commitIndex > rf.lastApplied) && (len(rf.log) > rf.lastApplied+1) { // ERROR HERE!!!!!! commitIndex >= and > // fmt.Println(rf.me, "================================================") var commitCommand ApplyCommand commitCommand.Index = rf.log[rf.lastApplied+1].Index commitCommand.Command = rf.log[rf.lastApplied+1].Command // rf.log[rf.lastApplied+1].Committed = true // ??? ERROR rf.lastApplied += 1 rf.mux.Unlock() // fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command) rf.applyCh <- commitCommand fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command) } else { rf.mux.Unlock() } time.Sleep(50 * time.Millisecond) } }
Editor is loading...