Untitled

 avatar
unknown
golang
2 years ago
24 kB
24
Indexable
// this version can pass all up to basic agree 2B. But cannot pass fail agree 2B
// after reconnection, it fails

//
// raft.go
// =======
// Write your code in this file
// We will use the original version of all other
// files for testing
//

package raft

//
// API
// ===
// This is an outline of the API that your raft implementation should
// expose.
//
// rf = NewPeer(...)
//   Create a new Raft server.
//
// rf.PutCommand(command interface{}) (index, term, isleader)
//   PutCommand agreement on a new log entry
//
// rf.GetState() (me, term, isLeader)
//   Ask a Raft peer for "me", its current term, and whether it thinks it
//   is a leader
//
// ApplyCommand
//   Each time a new entry is committed to the log, each Raft peer
//   should send an ApplyCommand to the service (e.g. tester) on the
//   same server, via the applyCh channel passed to NewPeer()
//

import (
	"fmt"
	"io/ioutil"
	"log"
	"math"      
	"math/rand" 
	"os"
	"sync"
	"time" 

	"github.com/cmu440/rpc"
)

// Set to false to disable debug logs completely
// Make sure to set kEnableDebugLogs to false before submitting
const kEnableDebugLogs = false

// Set to true to log to stdout instead of file
const kLogToStdout = true

// Change this to output logs to a different directory
const kLogOutputDir = "./raftlogs/"

//
// ApplyCommand
// ========
//
// As each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyCommand to the service (or
// tester) on the same server, via the applyCh passed to NewPeer()
//
type ApplyCommand struct {
	Index   int
	Command interface{}
}


type LogEntry struct {
	Index int
	Term int
	Command interface{}
	// AppendCount int
	Committed bool
}


//
// Raft struct
// ===========
//
// A Go object implementing a single Raft peer
//
type Raft struct {
	mux   sync.Mutex       // Lock to protect shared access to this peer's state
	peers []*rpc.ClientEnd // RPC end points of all peers
	me    int              // this peer's index into peers[]
	// You are expected to create reasonably clear log files before asking a
	// debugging question on Piazza or OH. Use of this logger is optional, and
	// you are free to remove it completely.
	logger *log.Logger // We provide you with a separate logger per peer.

	// Your data here (2A, 2B).
	// Look at the Raft paper's Figure 2 for a description of what
	// state a Raft peer should maintain

	role              int // 1 for leader, 2 for follower, and 3 for candidate
	currentTerm       int
	votedFor          int // -1 if not yet voted
	receivedVoteCount int // receive how many grants from all servers
	becomesLeader     chan bool
	othersElected     chan bool
	log []LogEntry // list or slice or other data structures?
	commitIndex int 
	lastApplied int 
	nextIndex  []int // ?? leader maintains a nextIndex for each follower, which is the index of the next log entry the leader will send to the follower
	matchIndex []int // ??
	closeRoutine chan bool
	votedResetTicker chan bool
	applyCh chan ApplyCommand
}

//
// GetState()
// ==========
//
// Return "me", current term and whether this peer
// believes it is the leader
//
func (rf *Raft) GetState() (int, int, bool) {
	var me int
	var term int
	var isleader bool
	// Your code here (2A)

	// fmt.Println("In GetState!")

	rf.mux.Lock() // deadlock here!
	me = rf.me
	term = rf.currentTerm

	if rf.role == 1 {
		isleader = true
	} else {
		isleader = false
	}
	rf.mux.Unlock()

	// fmt.Println("GetState returns! me = ", me, ", term = ", term, ", isleader = ", isleader)

	return me, term, isleader
}

//
// RequestVoteArgs
// ===============
//
// Example RequestVote RPC arguments structure
//
// Please note
// ===========
// Field names must start with capital letters!
//
type RequestVoteArgs struct {
	// Your data here (2A, 2B)
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

//
// RequestVoteReply
// ================
//
// Example RequestVote RPC reply structure.
//
// Please note
// ===========
// Field names must start with capital letters!
//
//
type RequestVoteReply struct {
	// Your data here (2A)
	Term        int
	VoteGranted bool
}

//
// RequestVote
// ===========
//
// Example RequestVote RPC handler
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // followers handles an incoming RPC
	// Your code here (2A, 2B)
	rf.mux.Lock() 
	if args.Term < rf.currentTerm {

		reply.Term = rf.currentTerm
		// fmt.Println("1: after reply term = ", reply.Term)
		reply.VoteGranted = false
		rf.mux.Unlock() 
		return
	} else {

		if (args.Term > rf.currentTerm) { // MODIFIED!!! ERROR means this follower needs to be updated (not in the current term yet)
			rf.votedFor = -1 
			// fmt.Println("case 1: args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm, ", ", rf.me, " changes back to follower")
			rf.currentTerm = args.Term
			rf.role = 2
			
			rf.mux.Unlock()
		}  else {
			rf.mux.Unlock()
		}

		rf.mux.Lock() 

		// TODO: see whether the candidate's log is up-to-date
		upToDate := false
		if args.LastLogTerm > rf.log[len(rf.log)-1].Term {
			upToDate = true
		}
		if (args.LastLogTerm == rf.log[len(rf.log)-1].Term) && args.LastLogIndex >= rf.log[len(rf.log)-1].Index {
			upToDate = true
		}
		

		if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && upToDate {
			// fmt.Println(rf.me, " votes true in the leader election === , it votes for ", args.CandidateId)
			reply.Term = args.Term 

			// fmt.Println("2: after reply term = ", reply.Term)

			reply.VoteGranted = true
			rf.votedFor = args.CandidateId
			rf.role = 2


			rf.mux.Unlock() 
			rf.votedResetTicker <- true 

		} else {
			
			reply.Term = args.Term 

			// fmt.Println("3: after reply term = ", reply.Term)

			// reply.Term = rf.currentTerm // ???????????
			reply.VoteGranted = false
			rf.mux.Unlock() 
			return
		}

	}
}

// sendRequestVote
// ===============
//
// Example code to send a RequestVote RPC to a server
//
// server int -- index of the target server in
// rf.peers[]
//
// args *RequestVoteArgs -- RPC arguments in args
//
// reply *RequestVoteReply -- RPC reply
//
// The types of args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers)
//
// The rpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost
//
// Call() sends a request and waits for a reply
//
// If a reply arrives within a timeout interval, Call() returns true;
// otherwise Call() returns false
//
// Thus Call() may not return for a while
//
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply
//
// Call() is guaranteed to return (perhaps after a delay)
// *except* if the handler function on the server side does not return
//
// Thus there
// is no need to implement your own timeouts around Call()
//
// Please look at the comments and documentation in ../rpc/rpc.go
// for more details
//
// If you are having trouble getting RPC to work, check that you have
// capitalized all field names in the struct passed over RPC, and
// that the caller passes the address of the reply struct with "&",
// not the struct itself
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { // sends an RPC example (the candidate sends request to all other servers)
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)

	rf.mux.Lock()
	if ok == true {
		if (reply.VoteGranted == true) && (rf.currentTerm == reply.Term) { 
			rf.receivedVoteCount += 1
			if (rf.receivedVoteCount > len(rf.peers)/2) && (rf.role == 3) { // win election
				// notify the main routine that it wins the election
				// fmt.Println(rf.me, " -----------------------------WIN--------------------------")
				if reply.Term == rf.currentTerm { // means that the grant is not out of date
					rf.role = 1 
					rf.mux.Unlock() 
					rf.becomesLeader <- true
				} else {
					rf.mux.Unlock()
				}
			} else {
				rf.mux.Unlock()
			}

		} else { // did not get granted
			if reply.Term > rf.currentTerm {
				rf.currentTerm = reply.Term
				rf.role = 2 // back to follower

				// fmt.Println("case 3:")

				rf.votedFor = -1
				rf.mux.Unlock()
			} else {
				rf.mux.Unlock()
			}
		}
	} else {
		rf.mux.Unlock()
	}

	return ok
}

// self-added
type AppendEntriesArgs struct {
	// Your data here (2A, 2B)
	Term              int
	LeaderId          int
	PrevLogIndex      int
	PrevLogTerm       int
	Entries           []LogEntry 
	LeaderCommitIndex int
}

// self-added
type AppendEntriesReply struct {
	// Your data here (2A, 2B)
	Term    int
	Success bool
}

// self-added
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	select {
	default:
		if args != nil {
			// 1.
			rf.mux.Lock()
			if (args.Term < rf.currentTerm) { // ERROR!!! if leader is out of date // --------------
				reply.Term = rf.currentTerm 

				// fmt.Println("4: after reply term = ", reply.Term, ", rf.currentTerm = ", rf.currentTerm)
				// fmt.Println("rf.commitIndex = ", rf.commitIndex, ", args.LeaderCommitIndex = ", args.LeaderCommitIndex)


				reply.Success = false
				rf.mux.Unlock()
				return
			}

			if (rf.currentTerm < args.Term) { // ERROR!!! 
				rf.currentTerm = args.Term
				rf.role = 2 
				// fmt.Println("case 4:")
				rf.votedFor = -1
			}
			rf.mux.Unlock()
			
			if len(args.Entries) == 0 { // empty heartbeat 

				// fmt.Println(rf.me, " $$$$$$$$$$$$$$$$ receive empty heartbeat")

				reply.Term = args.Term

				// fmt.Println("5: after reply term = ", reply.Term)

				rf.mux.Lock()
				rf.commitIndex = args.PrevLogIndex
				rf.mux.Unlock()

				reply.Success = true 
				rf.othersElected <- true

				return
			} 

			// ordinary appendEntries

			// fmt.Println("receive appendEntries ===")
			
			if (rf.log[len(rf.log)-1].Index != args.PrevLogIndex) || (rf.log[len(rf.log)-1].Term != args.PrevLogTerm){ // log does not match
				reply.Term = args.Term

				// fmt.Println("6: after reply term = ", reply.Term)

				reply.Success = false

				// TODO: Delete the logs that does not match (delete the last entry in the log)
				rf.mux.Lock()

				if len(rf.log) > 1 { // should not delete the default log ??????????????
					rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log???
				}
				// rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log???
				// rf.currentTerm = args.Term // need this? ERROR ???
				rf.mux.Unlock()

			} else { // leader and follower's previous log match, so just append the log directly
				rf.mux.Lock()

				// fmt.Println("^^^^^^^^^^^^^^^ before len of log = ", len(rf.log))

				for i := 0; i < len(args.Entries); i++ {
					rf.log = append(rf.log, args.Entries[i])
				}

				// fmt.Println("^^^^^^^^^^^^^^^ after len of log = ", len(rf.log))

				reply.Term = args.Term

				// fmt.Println("7: after reply term = ", reply.Term)

				reply.Success = true 
				// rf.currentTerm = args.Term  // need this? ERROR ???
				rf.mux.Unlock()
			}

			
			// TODO: receiver's rule 5: update commitIndex ????????
			rf.mux.Lock()
			if args.LeaderCommitIndex > rf.commitIndex {

				// fmt.Println(rf.me, ", len of log = ", len(rf.log))

				if args.LeaderCommitIndex <= rf.log[len(rf.log)-1].Index {
					rf.commitIndex = args.LeaderCommitIndex
				} else {
					rf.commitIndex = rf.log[len(rf.log)-1].Index
				}
			}

			// fmt.Println(rf.me, " ^^^^^^^^^^^^ commitIndex update to = ", rf.commitIndex, ", lastApplied = ", rf.lastApplied)
			// for i := 0; i < len(rf.log); i++ {
			// 	fmt.Println("log [", i, "] = ", rf.log[i].Command)
			// }

			rf.mux.Unlock()
			
		}
	}
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {

	rf.mux.Lock()
	if rf.role != 1 { // --------------
		rf.mux.Unlock() // !!!!!!!!!!! ERROR
		return true 
	}
	rf.mux.Unlock()
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)

	rf.mux.Lock()
	if ok == true {
		if rf.role != 1 { // is not leader anymore, should not send
			rf.mux.Unlock() 
			return ok       
		}

		if (reply.Term > args.Term) || (reply.Term > rf.currentTerm) { // out of date 
			rf.currentTerm = reply.Term
			rf.role = 2
			
			rf.votedFor = -1

			// fmt.Println("case 1: reply.Term = ", reply.Term, ", args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm)

			rf.mux.Unlock() 
			return false 
		}

		if rf.currentTerm != args.Term { // already out of date
			rf.role = 2 
			// fmt.Println("case 6:")
			rf.votedFor = -1 

			// fmt.Println("case 2: rf.currentTerm = ", rf.currentTerm, ", args.Term = ", args.Term)

			rf.mux.Unlock() 
			return false    
		}

		// TODO
		if reply.Success == false { // means the log is not match, need to update the nextIndex

			// fmt.Println("@@@@@@@@@@@@ log does not match, so need to decrement by 1")

			rf.nextIndex[server] -= 1
			rf.matchIndex[server] -= 1 // ERROR ?????

			rf.mux.Unlock()

			// re-send appendEntries??????
			var args_2 AppendEntriesArgs  // ERROR all added!!!
			args_2.Term = rf.currentTerm

			args_2.LeaderId = rf.me
			args_2.PrevLogIndex = rf.log[rf.nextIndex[server]-1].Index
			args_2.PrevLogTerm = rf.log[rf.nextIndex[server]-1].Term
			args_2.Entries = rf.log[rf.nextIndex[server]:]
			args_2.LeaderCommitIndex = rf.commitIndex
			var reply_2 AppendEntriesReply

			go rf.sendAppendEntries(server, &args_2, &reply_2)



			return false

		} else { // reply success
			// rf.mux.Lock()

			if len(args.Entries) != 0 { // if not heartbeat
				// TODO: update matchIndex and nextIndex ???

				rf.matchIndex[server] = args.PrevLogIndex+len(args.Entries)
				rf.nextIndex[server] = args.PrevLogIndex+len(args.Entries)+1

				// for i := 0; i < len(args.Entries); i++ {
				// 	rf.log[args.PrevLogIndex+1+i].AppendCount += 1
				// 	// if rf.log[args.PrevLogIndex+1+i].AppendCount > len(rf.peers) { // ? len(rf.peers)/2 ??? NEED THIS?
				// 	// 	rf.log[args.PrevLogIndex+1+i].Committed = true
				// 	// }
				// }


				// TODO: see whether commitIndex can be updated (leaders' last rule in fig 2)
				for i := rf.commitIndex+1; i <= rf.log[len(rf.log)-1].Index; i++ {
					count := 0
					for j := 0; j < len(rf.peers); j++ {
						if rf.matchIndex[j] >= i {
							count += 1
						}
					}

					// count += 1 // ????????
					// fmt.Println("count (how many servers have appended this log) = ", count)

					if (count > len(rf.peers)/2) && (rf.log[i].Term == rf.currentTerm) {
						rf.commitIndex = i
					} else {
						break
					}
				}
				rf.mux.Unlock()
				return true
			} else { // if heartbeat
				rf.mux.Unlock()
				return true
			}
		}
	} else {
		rf.mux.Unlock()
	}

	// rf.mux.Unlock()

	return ok
}

//
// PutCommand
// =====
//
// The service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log
//
// If this server is not the leader, return false
//
// Otherwise start the agreement and return immediately
//
// There is no guarantee that this command will ever be committed to
// the Raft log, since the leader may fail or lose an election
//
// The first return value is the index that the command will appear at
// if it is ever committed
//
// The second return value is the current term
//
// The third return value is true if this server believes it is
// the leader
//
func (rf *Raft) PutCommand(command interface{}) (int, int, bool) {
	rf.mux.Lock()
	// index := len(rf.log)-1
	index := len(rf.log)
	term := rf.currentTerm
	isLeader := false
	
	if rf.role == 1 {
		isLeader = true
	}
	rf.mux.Unlock()

	// Your code here (2B)

	if isLeader == true {
		// Append command to its log
		rf.mux.Lock()
		var newEntry LogEntry
		newEntry.Index = len(rf.log)
		newEntry.Term = rf.currentTerm
		newEntry.Command = command
		rf.log = append(rf.log, newEntry)
		rf.mux.Unlock()

		// fmt.Println("leader: ", rf.me, " appends ", newEntry.Command)

		// sendAppendEntries to all followers
		for i := 0; i < len(rf.peers); i++{
			rf.mux.Lock()
			var args AppendEntriesArgs 
			args.Term = rf.currentTerm

			// fmt.Println("########### currentTerm in PutCommand = ", rf.currentTerm)

			args.LeaderId = rf.me
			args.PrevLogIndex = rf.log[rf.nextIndex[i]-1].Index
			args.PrevLogTerm = rf.log[rf.nextIndex[i]-1].Term
			args.Entries = rf.log[rf.nextIndex[i]:]

			// fmt.Println("i = ", i, ", rf.nextIndex[i] = ", rf.nextIndex[i])
			// fmt.Println("### len of args.Entries = ", len(args.Entries))
			// fmt.Println("i = ", i,  ", args.PrevLogIndex = ", args.PrevLogIndex, ", args.PrevLogTerm = ", args.PrevLogTerm)

			args.LeaderCommitIndex = rf.commitIndex
			var reply AppendEntriesReply
			rf.mux.Unlock()

			if i == rf.me { // do not need to send to itself
				rf.mux.Lock()
				rf.nextIndex[rf.me] += 1 // ?? added
				rf.matchIndex[rf.me] += 1 // ?? added ERROR!!!
				rf.mux.Unlock()
				continue
			} else {
				go rf.sendAppendEntries(i, &args, &reply) // another goroutines
			}
		}
	}

	return index, term, isLeader
}

//
// Stop
// ====
//
// The tester calls Stop() when a Raft instance will not
// be needed again
//
// You are not required to do anything
// in Stop(), but it might be convenient to (for example)
// turn off debug output from this instance
//
func (rf *Raft) Stop() {
	// Your code here, if desired
	rf.closeRoutine <- true
}

//
// NewPeer
// ====
//
// The service or tester wants to create a Raft server
//
// The port numbers of all the Raft servers (including this one)
// are in peers[]
//
// This server's port is peers[me]
//
// All the servers' peers[] arrays have the same order
//
// applyCh
// =======
//
// applyCh is a channel on which the tester or service expects
// Raft to send ApplyCommand messages
//
// NewPeer() must return quickly, so it should start Goroutines
// for any long-running work
//
func NewPeer(peers []*rpc.ClientEnd, me int, applyCh chan ApplyCommand) *Raft {
	rf := &Raft{}
	rf.mux.Lock() 
	rf.peers = peers
	rf.me = me

	if kEnableDebugLogs {
		peerName := peers[me].String()
		logPrefix := fmt.Sprintf("%s ", peerName)
		if kLogToStdout {
			rf.logger = log.New(os.Stdout, peerName, log.Lmicroseconds|log.Lshortfile)
		} else {
			err := os.MkdirAll(kLogOutputDir, os.ModePerm)
			if err != nil {
				panic(err.Error())
			}
			logOutputFile, err := os.OpenFile(fmt.Sprintf("%s/%s.txt", kLogOutputDir, logPrefix), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
			if err != nil {
				panic(err.Error())
			}
			rf.logger = log.New(logOutputFile, logPrefix, log.Lmicroseconds|log.Lshortfile)
		}
		rf.logger.Println("logger initialized")
	} else {
		rf.logger = log.New(ioutil.Discard, "", 0)
	}

	// Your initialization code here (2A, 2B)
	rf.role = 2        // default should be a follower
	rf.currentTerm = 0 // init to 0
	rf.votedFor = -1   // not yet voted
	rf.receivedVoteCount = 0
	rf.becomesLeader = make(chan bool)
	rf.othersElected = make(chan bool)
	rf.log = []LogEntry{}

	var emptyEntry LogEntry // make the log initially with size 1 (at index 0)
	emptyEntry.Index = 0
	emptyEntry.Term = 0
	// emptyEntry.Command = interface{} // ???
	rf.log = append(rf.log, emptyEntry)

	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.nextIndex = make([]int, len(rf.peers))
	rf.matchIndex = make([]int, len(rf.peers))
	rf.closeRoutine = make(chan bool) 
	rf.votedResetTicker = make(chan bool)
	rf.mux.Unlock() 
	rf.applyCh = applyCh

	go rf.main_routine()

	go rf.commit_routine()

	return rf 
}

func (rf *Raft) main_routine() {
	for {
		rf.mux.Lock()
		curRole := rf.role
		rf.mux.Unlock()

		if curRole == 1 { // leader

			// rf.mux.Lock()
			// for i := 0; i < len(rf.peers); i++ {
			// 	fmt.Println("rf.nextIndex[", i, "] = ", rf.nextIndex[i])
			// 	fmt.Println("rf.matchIndex[", i, "] = ", rf.matchIndex[i])
			// }
			// rf.mux.Unlock()

			select {
			default:
				for i := 0; i < len(rf.peers); i++ {
					rf.mux.Lock()
					var args_1 AppendEntriesArgs // empty heartbeat 
					args_1.Term = rf.currentTerm
					args_1.LeaderId = rf.me
					args_1.PrevLogIndex = 0 
					args_1.PrevLogTerm = 0
					args_1.Entries = make([]LogEntry, 0) // change from string to LogEntry
					args_1.LeaderCommitIndex = rf.commitIndex // ERROR !!!!!!!!!!!!!!!!!!!
					rf.mux.Unlock()

					if i == rf.me { // do not need to send to itself
						continue
					} else {
						var reply AppendEntriesReply
						go rf.sendAppendEntries(i, &args_1, &reply) // another goroutines
					}
				}

				time.Sleep(100 * time.Millisecond)  // every n time, send appendEntries messages

			case <-rf.closeRoutine:
				return
			}

		} else if curRole == 2 { // follower
			randomVal := math.Abs(rand.NormFloat64())
			testTicker := time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal))

			select {
			case <-rf.othersElected:
				testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal)) // --------------

			case <-testTicker.C:
				rf.mux.Lock() // added here to avoid race condition
				rf.role = 3
				rf.mux.Unlock()
				break

			case <-rf.votedResetTicker: 
				testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal))

			case <-rf.closeRoutine:
				return
			}

		} else if curRole == 3 { // candidate
			rf.mux.Lock() 
			rf.currentTerm += 1
			rf.votedFor = rf.me
			rf.receivedVoteCount = 1 // reset

			rf.mux.Unlock() 

			candidateTicker := time.NewTicker(250 * time.Millisecond)

			var args RequestVoteArgs
			rf.mux.Lock()
			args.Term = rf.currentTerm
			args.CandidateId = rf.me
			args.LastLogIndex = rf.log[len(rf.log)-1].Index // !
			args.LastLogTerm = rf.log[len(rf.log)-1].Term  // !
			rf.mux.Unlock()

			for i := 0; i < len(rf.peers); i++ {
				if i == rf.me { // do not need to send to itself
					continue
				} else {
					var reply RequestVoteReply
					go rf.sendRequestVote(i, &args, &reply) // a separate go routine 
				}
			}

			stop := false
			for stop == false {
				select { 
				case <-rf.becomesLeader: // this candidate wins the election
					rf.mux.Lock() 
					fmt.Println(rf.me, " ******* becomes leader")
					rf.role = 1 // becomes the leader

					// TODO: init nextIndex[] and matchIndex[]
					for i := 0; i < len(rf.peers); i++ {
						rf.nextIndex[i] = rf.log[len(rf.log)-1].Index + 1
						rf.matchIndex[i] = 0
						// rf.matchIndex[i] = rf.nextIndex[i] - 1 // ?????????????? ERROR ?
					}

					rf.mux.Unlock()
					stop = true
					
				case <-rf.othersElected: // others win the election 
					rf.mux.Lock()
					rf.role = 2 // change back to followers state
					// fmt.Println("case 7:")
					rf.votedFor = -1
					rf.mux.Unlock()
					stop = true
					break 

				case <-candidateTicker.C:
					rf.mux.Lock()
					rf.role = 2  
					// fmt.Println("case 8:")
					rf.votedFor = -1 
					rf.mux.Unlock()
					stop = true

				case <-rf.closeRoutine:
					return
				}
			}
		}
	}
}


func (rf *Raft) commit_routine() { // ensure that each server commits in order

	for {
		rf.mux.Lock()

		// fmt.Println(rf.me, "--------------------------------------------------------------")

		// fmt.Println("-------rf.commitIndex = ", rf.commitIndex, ", rf.lastApplied = ", rf.lastApplied, ", len(rf.log) = ", len(rf.log))

		if (rf.commitIndex > rf.lastApplied) && (len(rf.log) > rf.lastApplied+1)  { // ERROR HERE!!!!!! commitIndex >= and >
			
			// fmt.Println(rf.me, "================================================")

			var commitCommand ApplyCommand
			commitCommand.Index = rf.log[rf.lastApplied+1].Index
			commitCommand.Command = rf.log[rf.lastApplied+1].Command
			// rf.log[rf.lastApplied+1].Committed = true // ??? ERROR
			rf.lastApplied += 1
			rf.mux.Unlock()
			// fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command)
			rf.applyCh <- commitCommand
			fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command)
		} else {
			rf.mux.Unlock()
		}

		time.Sleep(50 * time.Millisecond)
	}
}

Editor is loading...