Untitled
unknown
golang
3 years ago
24 kB
27
Indexable
// this version can pass all up to basic agree 2B. But cannot pass fail agree 2B
// after reconnection, it fails
//
// raft.go
// =======
// Write your code in this file
// We will use the original version of all other
// files for testing
//
package raft
//
// API
// ===
// This is an outline of the API that your raft implementation should
// expose.
//
// rf = NewPeer(...)
// Create a new Raft server.
//
// rf.PutCommand(command interface{}) (index, term, isleader)
// PutCommand agreement on a new log entry
//
// rf.GetState() (me, term, isLeader)
// Ask a Raft peer for "me", its current term, and whether it thinks it
// is a leader
//
// ApplyCommand
// Each time a new entry is committed to the log, each Raft peer
// should send an ApplyCommand to the service (e.g. tester) on the
// same server, via the applyCh channel passed to NewPeer()
//
import (
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"os"
"sync"
"time"
"github.com/cmu440/rpc"
)
// Set to false to disable debug logs completely
// Make sure to set kEnableDebugLogs to false before submitting
const kEnableDebugLogs = false
// Set to true to log to stdout instead of file
const kLogToStdout = true
// Change this to output logs to a different directory
const kLogOutputDir = "./raftlogs/"
//
// ApplyCommand
// ========
//
// As each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyCommand to the service (or
// tester) on the same server, via the applyCh passed to NewPeer()
//
type ApplyCommand struct {
Index int
Command interface{}
}
type LogEntry struct {
Index int
Term int
Command interface{}
// AppendCount int
Committed bool
}
//
// Raft struct
// ===========
//
// A Go object implementing a single Raft peer
//
type Raft struct {
mux sync.Mutex // Lock to protect shared access to this peer's state
peers []*rpc.ClientEnd // RPC end points of all peers
me int // this peer's index into peers[]
// You are expected to create reasonably clear log files before asking a
// debugging question on Piazza or OH. Use of this logger is optional, and
// you are free to remove it completely.
logger *log.Logger // We provide you with a separate logger per peer.
// Your data here (2A, 2B).
// Look at the Raft paper's Figure 2 for a description of what
// state a Raft peer should maintain
role int // 1 for leader, 2 for follower, and 3 for candidate
currentTerm int
votedFor int // -1 if not yet voted
receivedVoteCount int // receive how many grants from all servers
becomesLeader chan bool
othersElected chan bool
log []LogEntry // list or slice or other data structures?
commitIndex int
lastApplied int
nextIndex []int // ?? leader maintains a nextIndex for each follower, which is the index of the next log entry the leader will send to the follower
matchIndex []int // ??
closeRoutine chan bool
votedResetTicker chan bool
applyCh chan ApplyCommand
}
//
// GetState()
// ==========
//
// Return "me", current term and whether this peer
// believes it is the leader
//
func (rf *Raft) GetState() (int, int, bool) {
var me int
var term int
var isleader bool
// Your code here (2A)
// fmt.Println("In GetState!")
rf.mux.Lock() // deadlock here!
me = rf.me
term = rf.currentTerm
if rf.role == 1 {
isleader = true
} else {
isleader = false
}
rf.mux.Unlock()
// fmt.Println("GetState returns! me = ", me, ", term = ", term, ", isleader = ", isleader)
return me, term, isleader
}
//
// RequestVoteArgs
// ===============
//
// Example RequestVote RPC arguments structure
//
// Please note
// ===========
// Field names must start with capital letters!
//
type RequestVoteArgs struct {
// Your data here (2A, 2B)
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
//
// RequestVoteReply
// ================
//
// Example RequestVote RPC reply structure.
//
// Please note
// ===========
// Field names must start with capital letters!
//
//
type RequestVoteReply struct {
// Your data here (2A)
Term int
VoteGranted bool
}
//
// RequestVote
// ===========
//
// Example RequestVote RPC handler
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // followers handles an incoming RPC
// Your code here (2A, 2B)
rf.mux.Lock()
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
// fmt.Println("1: after reply term = ", reply.Term)
reply.VoteGranted = false
rf.mux.Unlock()
return
} else {
if (args.Term > rf.currentTerm) { // MODIFIED!!! ERROR means this follower needs to be updated (not in the current term yet)
rf.votedFor = -1
// fmt.Println("case 1: args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm, ", ", rf.me, " changes back to follower")
rf.currentTerm = args.Term
rf.role = 2
rf.mux.Unlock()
} else {
rf.mux.Unlock()
}
rf.mux.Lock()
// TODO: see whether the candidate's log is up-to-date
upToDate := false
if args.LastLogTerm > rf.log[len(rf.log)-1].Term {
upToDate = true
}
if (args.LastLogTerm == rf.log[len(rf.log)-1].Term) && args.LastLogIndex >= rf.log[len(rf.log)-1].Index {
upToDate = true
}
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && upToDate {
// fmt.Println(rf.me, " votes true in the leader election === , it votes for ", args.CandidateId)
reply.Term = args.Term
// fmt.Println("2: after reply term = ", reply.Term)
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.role = 2
rf.mux.Unlock()
rf.votedResetTicker <- true
} else {
reply.Term = args.Term
// fmt.Println("3: after reply term = ", reply.Term)
// reply.Term = rf.currentTerm // ???????????
reply.VoteGranted = false
rf.mux.Unlock()
return
}
}
}
// sendRequestVote
// ===============
//
// Example code to send a RequestVote RPC to a server
//
// server int -- index of the target server in
// rf.peers[]
//
// args *RequestVoteArgs -- RPC arguments in args
//
// reply *RequestVoteReply -- RPC reply
//
// The types of args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers)
//
// The rpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost
//
// Call() sends a request and waits for a reply
//
// If a reply arrives within a timeout interval, Call() returns true;
// otherwise Call() returns false
//
// Thus Call() may not return for a while
//
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply
//
// Call() is guaranteed to return (perhaps after a delay)
// *except* if the handler function on the server side does not return
//
// Thus there
// is no need to implement your own timeouts around Call()
//
// Please look at the comments and documentation in ../rpc/rpc.go
// for more details
//
// If you are having trouble getting RPC to work, check that you have
// capitalized all field names in the struct passed over RPC, and
// that the caller passes the address of the reply struct with "&",
// not the struct itself
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { // sends an RPC example (the candidate sends request to all other servers)
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
rf.mux.Lock()
if ok == true {
if (reply.VoteGranted == true) && (rf.currentTerm == reply.Term) {
rf.receivedVoteCount += 1
if (rf.receivedVoteCount > len(rf.peers)/2) && (rf.role == 3) { // win election
// notify the main routine that it wins the election
// fmt.Println(rf.me, " -----------------------------WIN--------------------------")
if reply.Term == rf.currentTerm { // means that the grant is not out of date
rf.role = 1
rf.mux.Unlock()
rf.becomesLeader <- true
} else {
rf.mux.Unlock()
}
} else {
rf.mux.Unlock()
}
} else { // did not get granted
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.role = 2 // back to follower
// fmt.Println("case 3:")
rf.votedFor = -1
rf.mux.Unlock()
} else {
rf.mux.Unlock()
}
}
} else {
rf.mux.Unlock()
}
return ok
}
// self-added
type AppendEntriesArgs struct {
// Your data here (2A, 2B)
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommitIndex int
}
// self-added
type AppendEntriesReply struct {
// Your data here (2A, 2B)
Term int
Success bool
}
// self-added
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
select {
default:
if args != nil {
// 1.
rf.mux.Lock()
if (args.Term < rf.currentTerm) { // ERROR!!! if leader is out of date // --------------
reply.Term = rf.currentTerm
// fmt.Println("4: after reply term = ", reply.Term, ", rf.currentTerm = ", rf.currentTerm)
// fmt.Println("rf.commitIndex = ", rf.commitIndex, ", args.LeaderCommitIndex = ", args.LeaderCommitIndex)
reply.Success = false
rf.mux.Unlock()
return
}
if (rf.currentTerm < args.Term) { // ERROR!!!
rf.currentTerm = args.Term
rf.role = 2
// fmt.Println("case 4:")
rf.votedFor = -1
}
rf.mux.Unlock()
if len(args.Entries) == 0 { // empty heartbeat
// fmt.Println(rf.me, " $$$$$$$$$$$$$$$$ receive empty heartbeat")
reply.Term = args.Term
// fmt.Println("5: after reply term = ", reply.Term)
rf.mux.Lock()
rf.commitIndex = args.PrevLogIndex
rf.mux.Unlock()
reply.Success = true
rf.othersElected <- true
return
}
// ordinary appendEntries
// fmt.Println("receive appendEntries ===")
if (rf.log[len(rf.log)-1].Index != args.PrevLogIndex) || (rf.log[len(rf.log)-1].Term != args.PrevLogTerm){ // log does not match
reply.Term = args.Term
// fmt.Println("6: after reply term = ", reply.Term)
reply.Success = false
// TODO: Delete the logs that does not match (delete the last entry in the log)
rf.mux.Lock()
if len(rf.log) > 1 { // should not delete the default log ??????????????
rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log???
}
// rf.log = rf.log[:len(rf.log)-1] // delete the last entry in the log???
// rf.currentTerm = args.Term // need this? ERROR ???
rf.mux.Unlock()
} else { // leader and follower's previous log match, so just append the log directly
rf.mux.Lock()
// fmt.Println("^^^^^^^^^^^^^^^ before len of log = ", len(rf.log))
for i := 0; i < len(args.Entries); i++ {
rf.log = append(rf.log, args.Entries[i])
}
// fmt.Println("^^^^^^^^^^^^^^^ after len of log = ", len(rf.log))
reply.Term = args.Term
// fmt.Println("7: after reply term = ", reply.Term)
reply.Success = true
// rf.currentTerm = args.Term // need this? ERROR ???
rf.mux.Unlock()
}
// TODO: receiver's rule 5: update commitIndex ????????
rf.mux.Lock()
if args.LeaderCommitIndex > rf.commitIndex {
// fmt.Println(rf.me, ", len of log = ", len(rf.log))
if args.LeaderCommitIndex <= rf.log[len(rf.log)-1].Index {
rf.commitIndex = args.LeaderCommitIndex
} else {
rf.commitIndex = rf.log[len(rf.log)-1].Index
}
}
// fmt.Println(rf.me, " ^^^^^^^^^^^^ commitIndex update to = ", rf.commitIndex, ", lastApplied = ", rf.lastApplied)
// for i := 0; i < len(rf.log); i++ {
// fmt.Println("log [", i, "] = ", rf.log[i].Command)
// }
rf.mux.Unlock()
}
}
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
rf.mux.Lock()
if rf.role != 1 { // --------------
rf.mux.Unlock() // !!!!!!!!!!! ERROR
return true
}
rf.mux.Unlock()
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
rf.mux.Lock()
if ok == true {
if rf.role != 1 { // is not leader anymore, should not send
rf.mux.Unlock()
return ok
}
if (reply.Term > args.Term) || (reply.Term > rf.currentTerm) { // out of date
rf.currentTerm = reply.Term
rf.role = 2
rf.votedFor = -1
// fmt.Println("case 1: reply.Term = ", reply.Term, ", args.Term = ", args.Term, ", rf.currentTerm = ", rf.currentTerm)
rf.mux.Unlock()
return false
}
if rf.currentTerm != args.Term { // already out of date
rf.role = 2
// fmt.Println("case 6:")
rf.votedFor = -1
// fmt.Println("case 2: rf.currentTerm = ", rf.currentTerm, ", args.Term = ", args.Term)
rf.mux.Unlock()
return false
}
// TODO
if reply.Success == false { // means the log is not match, need to update the nextIndex
// fmt.Println("@@@@@@@@@@@@ log does not match, so need to decrement by 1")
rf.nextIndex[server] -= 1
rf.matchIndex[server] -= 1 // ERROR ?????
rf.mux.Unlock()
// re-send appendEntries??????
var args_2 AppendEntriesArgs // ERROR all added!!!
args_2.Term = rf.currentTerm
args_2.LeaderId = rf.me
args_2.PrevLogIndex = rf.log[rf.nextIndex[server]-1].Index
args_2.PrevLogTerm = rf.log[rf.nextIndex[server]-1].Term
args_2.Entries = rf.log[rf.nextIndex[server]:]
args_2.LeaderCommitIndex = rf.commitIndex
var reply_2 AppendEntriesReply
go rf.sendAppendEntries(server, &args_2, &reply_2)
return false
} else { // reply success
// rf.mux.Lock()
if len(args.Entries) != 0 { // if not heartbeat
// TODO: update matchIndex and nextIndex ???
rf.matchIndex[server] = args.PrevLogIndex+len(args.Entries)
rf.nextIndex[server] = args.PrevLogIndex+len(args.Entries)+1
// for i := 0; i < len(args.Entries); i++ {
// rf.log[args.PrevLogIndex+1+i].AppendCount += 1
// // if rf.log[args.PrevLogIndex+1+i].AppendCount > len(rf.peers) { // ? len(rf.peers)/2 ??? NEED THIS?
// // rf.log[args.PrevLogIndex+1+i].Committed = true
// // }
// }
// TODO: see whether commitIndex can be updated (leaders' last rule in fig 2)
for i := rf.commitIndex+1; i <= rf.log[len(rf.log)-1].Index; i++ {
count := 0
for j := 0; j < len(rf.peers); j++ {
if rf.matchIndex[j] >= i {
count += 1
}
}
// count += 1 // ????????
// fmt.Println("count (how many servers have appended this log) = ", count)
if (count > len(rf.peers)/2) && (rf.log[i].Term == rf.currentTerm) {
rf.commitIndex = i
} else {
break
}
}
rf.mux.Unlock()
return true
} else { // if heartbeat
rf.mux.Unlock()
return true
}
}
} else {
rf.mux.Unlock()
}
// rf.mux.Unlock()
return ok
}
//
// PutCommand
// =====
//
// The service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log
//
// If this server is not the leader, return false
//
// Otherwise start the agreement and return immediately
//
// There is no guarantee that this command will ever be committed to
// the Raft log, since the leader may fail or lose an election
//
// The first return value is the index that the command will appear at
// if it is ever committed
//
// The second return value is the current term
//
// The third return value is true if this server believes it is
// the leader
//
func (rf *Raft) PutCommand(command interface{}) (int, int, bool) {
rf.mux.Lock()
// index := len(rf.log)-1
index := len(rf.log)
term := rf.currentTerm
isLeader := false
if rf.role == 1 {
isLeader = true
}
rf.mux.Unlock()
// Your code here (2B)
if isLeader == true {
// Append command to its log
rf.mux.Lock()
var newEntry LogEntry
newEntry.Index = len(rf.log)
newEntry.Term = rf.currentTerm
newEntry.Command = command
rf.log = append(rf.log, newEntry)
rf.mux.Unlock()
// fmt.Println("leader: ", rf.me, " appends ", newEntry.Command)
// sendAppendEntries to all followers
for i := 0; i < len(rf.peers); i++{
rf.mux.Lock()
var args AppendEntriesArgs
args.Term = rf.currentTerm
// fmt.Println("########### currentTerm in PutCommand = ", rf.currentTerm)
args.LeaderId = rf.me
args.PrevLogIndex = rf.log[rf.nextIndex[i]-1].Index
args.PrevLogTerm = rf.log[rf.nextIndex[i]-1].Term
args.Entries = rf.log[rf.nextIndex[i]:]
// fmt.Println("i = ", i, ", rf.nextIndex[i] = ", rf.nextIndex[i])
// fmt.Println("### len of args.Entries = ", len(args.Entries))
// fmt.Println("i = ", i, ", args.PrevLogIndex = ", args.PrevLogIndex, ", args.PrevLogTerm = ", args.PrevLogTerm)
args.LeaderCommitIndex = rf.commitIndex
var reply AppendEntriesReply
rf.mux.Unlock()
if i == rf.me { // do not need to send to itself
rf.mux.Lock()
rf.nextIndex[rf.me] += 1 // ?? added
rf.matchIndex[rf.me] += 1 // ?? added ERROR!!!
rf.mux.Unlock()
continue
} else {
go rf.sendAppendEntries(i, &args, &reply) // another goroutines
}
}
}
return index, term, isLeader
}
//
// Stop
// ====
//
// The tester calls Stop() when a Raft instance will not
// be needed again
//
// You are not required to do anything
// in Stop(), but it might be convenient to (for example)
// turn off debug output from this instance
//
func (rf *Raft) Stop() {
// Your code here, if desired
rf.closeRoutine <- true
}
//
// NewPeer
// ====
//
// The service or tester wants to create a Raft server
//
// The port numbers of all the Raft servers (including this one)
// are in peers[]
//
// This server's port is peers[me]
//
// All the servers' peers[] arrays have the same order
//
// applyCh
// =======
//
// applyCh is a channel on which the tester or service expects
// Raft to send ApplyCommand messages
//
// NewPeer() must return quickly, so it should start Goroutines
// for any long-running work
//
func NewPeer(peers []*rpc.ClientEnd, me int, applyCh chan ApplyCommand) *Raft {
rf := &Raft{}
rf.mux.Lock()
rf.peers = peers
rf.me = me
if kEnableDebugLogs {
peerName := peers[me].String()
logPrefix := fmt.Sprintf("%s ", peerName)
if kLogToStdout {
rf.logger = log.New(os.Stdout, peerName, log.Lmicroseconds|log.Lshortfile)
} else {
err := os.MkdirAll(kLogOutputDir, os.ModePerm)
if err != nil {
panic(err.Error())
}
logOutputFile, err := os.OpenFile(fmt.Sprintf("%s/%s.txt", kLogOutputDir, logPrefix), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
panic(err.Error())
}
rf.logger = log.New(logOutputFile, logPrefix, log.Lmicroseconds|log.Lshortfile)
}
rf.logger.Println("logger initialized")
} else {
rf.logger = log.New(ioutil.Discard, "", 0)
}
// Your initialization code here (2A, 2B)
rf.role = 2 // default should be a follower
rf.currentTerm = 0 // init to 0
rf.votedFor = -1 // not yet voted
rf.receivedVoteCount = 0
rf.becomesLeader = make(chan bool)
rf.othersElected = make(chan bool)
rf.log = []LogEntry{}
var emptyEntry LogEntry // make the log initially with size 1 (at index 0)
emptyEntry.Index = 0
emptyEntry.Term = 0
// emptyEntry.Command = interface{} // ???
rf.log = append(rf.log, emptyEntry)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
rf.closeRoutine = make(chan bool)
rf.votedResetTicker = make(chan bool)
rf.mux.Unlock()
rf.applyCh = applyCh
go rf.main_routine()
go rf.commit_routine()
return rf
}
func (rf *Raft) main_routine() {
for {
rf.mux.Lock()
curRole := rf.role
rf.mux.Unlock()
if curRole == 1 { // leader
// rf.mux.Lock()
// for i := 0; i < len(rf.peers); i++ {
// fmt.Println("rf.nextIndex[", i, "] = ", rf.nextIndex[i])
// fmt.Println("rf.matchIndex[", i, "] = ", rf.matchIndex[i])
// }
// rf.mux.Unlock()
select {
default:
for i := 0; i < len(rf.peers); i++ {
rf.mux.Lock()
var args_1 AppendEntriesArgs // empty heartbeat
args_1.Term = rf.currentTerm
args_1.LeaderId = rf.me
args_1.PrevLogIndex = 0
args_1.PrevLogTerm = 0
args_1.Entries = make([]LogEntry, 0) // change from string to LogEntry
args_1.LeaderCommitIndex = rf.commitIndex // ERROR !!!!!!!!!!!!!!!!!!!
rf.mux.Unlock()
if i == rf.me { // do not need to send to itself
continue
} else {
var reply AppendEntriesReply
go rf.sendAppendEntries(i, &args_1, &reply) // another goroutines
}
}
time.Sleep(100 * time.Millisecond) // every n time, send appendEntries messages
case <-rf.closeRoutine:
return
}
} else if curRole == 2 { // follower
randomVal := math.Abs(rand.NormFloat64())
testTicker := time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal))
select {
case <-rf.othersElected:
testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal)) // --------------
case <-testTicker.C:
rf.mux.Lock() // added here to avoid race condition
rf.role = 3
rf.mux.Unlock()
break
case <-rf.votedResetTicker:
testTicker = time.NewTicker(250*time.Millisecond + 100*time.Millisecond*time.Duration(randomVal))
case <-rf.closeRoutine:
return
}
} else if curRole == 3 { // candidate
rf.mux.Lock()
rf.currentTerm += 1
rf.votedFor = rf.me
rf.receivedVoteCount = 1 // reset
rf.mux.Unlock()
candidateTicker := time.NewTicker(250 * time.Millisecond)
var args RequestVoteArgs
rf.mux.Lock()
args.Term = rf.currentTerm
args.CandidateId = rf.me
args.LastLogIndex = rf.log[len(rf.log)-1].Index // !
args.LastLogTerm = rf.log[len(rf.log)-1].Term // !
rf.mux.Unlock()
for i := 0; i < len(rf.peers); i++ {
if i == rf.me { // do not need to send to itself
continue
} else {
var reply RequestVoteReply
go rf.sendRequestVote(i, &args, &reply) // a separate go routine
}
}
stop := false
for stop == false {
select {
case <-rf.becomesLeader: // this candidate wins the election
rf.mux.Lock()
fmt.Println(rf.me, " ******* becomes leader")
rf.role = 1 // becomes the leader
// TODO: init nextIndex[] and matchIndex[]
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = rf.log[len(rf.log)-1].Index + 1
rf.matchIndex[i] = 0
// rf.matchIndex[i] = rf.nextIndex[i] - 1 // ?????????????? ERROR ?
}
rf.mux.Unlock()
stop = true
case <-rf.othersElected: // others win the election
rf.mux.Lock()
rf.role = 2 // change back to followers state
// fmt.Println("case 7:")
rf.votedFor = -1
rf.mux.Unlock()
stop = true
break
case <-candidateTicker.C:
rf.mux.Lock()
rf.role = 2
// fmt.Println("case 8:")
rf.votedFor = -1
rf.mux.Unlock()
stop = true
case <-rf.closeRoutine:
return
}
}
}
}
}
func (rf *Raft) commit_routine() { // ensure that each server commits in order
for {
rf.mux.Lock()
// fmt.Println(rf.me, "--------------------------------------------------------------")
// fmt.Println("-------rf.commitIndex = ", rf.commitIndex, ", rf.lastApplied = ", rf.lastApplied, ", len(rf.log) = ", len(rf.log))
if (rf.commitIndex > rf.lastApplied) && (len(rf.log) > rf.lastApplied+1) { // ERROR HERE!!!!!! commitIndex >= and >
// fmt.Println(rf.me, "================================================")
var commitCommand ApplyCommand
commitCommand.Index = rf.log[rf.lastApplied+1].Index
commitCommand.Command = rf.log[rf.lastApplied+1].Command
// rf.log[rf.lastApplied+1].Committed = true // ??? ERROR
rf.lastApplied += 1
rf.mux.Unlock()
// fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command)
rf.applyCh <- commitCommand
fmt.Println(rf.me, " committed, Index = ", commitCommand.Index, ", Command = ", commitCommand.Command)
} else {
rf.mux.Unlock()
}
time.Sleep(50 * time.Millisecond)
}
}
Editor is loading...