mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: leader only sends append to the unsynced peer when get a bad appResp
This commit is contained in:
35
raft/raft.go
35
raft/raft.go
@@ -139,21 +139,26 @@ func (sm *stateMachine) send(m Message) {
|
||||
sm.msgs = append(sm.msgs, m)
|
||||
}
|
||||
|
||||
// sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
|
||||
func (sm *stateMachine) sendAppend() {
|
||||
// sendAppend sends RRPC, with entries to the given peer
|
||||
func (sm *stateMachine) sendAppend(to int) {
|
||||
in := sm.ins[to]
|
||||
m := Message{}
|
||||
m.Type = msgApp
|
||||
m.To = to
|
||||
m.Index = in.next - 1
|
||||
m.LogTerm = sm.log.term(in.next - 1)
|
||||
m.Entries = sm.log.entries(in.next)
|
||||
m.Commit = sm.log.committed
|
||||
sm.send(m)
|
||||
}
|
||||
|
||||
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
|
||||
func (sm *stateMachine) bcastAppend() {
|
||||
for i := 0; i < sm.k; i++ {
|
||||
if i == sm.addr {
|
||||
continue
|
||||
}
|
||||
in := sm.ins[i]
|
||||
m := Message{}
|
||||
m.Type = msgApp
|
||||
m.To = i
|
||||
m.Index = in.next - 1
|
||||
m.LogTerm = sm.log.term(in.next - 1)
|
||||
m.Entries = sm.log.entries(in.next)
|
||||
m.Commit = sm.log.committed
|
||||
sm.send(m)
|
||||
sm.sendAppend(i)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +248,7 @@ func (sm *stateMachine) Step(m Message) {
|
||||
switch sm.lead {
|
||||
case sm.addr:
|
||||
sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data})
|
||||
sm.sendAppend()
|
||||
sm.bcastAppend()
|
||||
case none:
|
||||
panic("msgProp given without leader")
|
||||
default:
|
||||
@@ -275,11 +280,11 @@ func (sm *stateMachine) Step(m Message) {
|
||||
case msgAppResp:
|
||||
if m.Index < 0 {
|
||||
sm.ins[m.From].decr()
|
||||
sm.sendAppend()
|
||||
sm.sendAppend(m.From)
|
||||
} else {
|
||||
sm.ins[m.From].update(m.Index)
|
||||
if sm.maybeCommit() {
|
||||
sm.sendAppend()
|
||||
sm.bcastAppend()
|
||||
}
|
||||
}
|
||||
case msgVote:
|
||||
@@ -297,7 +302,7 @@ func (sm *stateMachine) Step(m Message) {
|
||||
switch sm.q() {
|
||||
case gr:
|
||||
sm.becomeLeader()
|
||||
sm.sendAppend()
|
||||
sm.bcastAppend()
|
||||
case len(sm.votes) - gr:
|
||||
sm.becomeFollower(sm.term, none)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user