Merge pull request #1962 from xiang90/raft

raft: leader waits for the reply of previous message
This commit is contained in:
Xiang Li 2014-12-18 15:05:41 -08:00
commit 0a40e18f68
2 changed files with 117 additions and 15 deletions

View File

@ -56,9 +56,13 @@ func (st StateType) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", st.String())), nil
}
type progress struct{ match, next uint64 }
type progress struct {
match, next uint64
wait int
}
func (pr *progress) update(n uint64) {
pr.waitReset()
if pr.match < n {
pr.match = n
}
@ -72,6 +76,7 @@ func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index and returns true.
func (pr *progress) maybeDecrTo(to uint64) bool {
pr.waitReset()
if pr.match != 0 {
// the rejection must be stale if the progress has matched and "to"
// is smaller than "match".
@ -94,7 +99,19 @@ func (pr *progress) maybeDecrTo(to uint64) bool {
return true
}
func (pr *progress) String() string { return fmt.Sprintf("next = %d, match = %d", pr.next, pr.match) }
func (pr *progress) waitDecr(i int) {
pr.wait -= i
if pr.wait < 0 {
pr.wait = 0
}
}
func (pr *progress) waitSet(w int) { pr.wait = w }
func (pr *progress) waitReset() { pr.wait = 0 }
func (pr *progress) shouldWait() bool { return pr.match == 0 && pr.wait > 0 }
func (pr *progress) String() string {
return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.next, pr.match, pr.wait)
}
type raft struct {
pb.HardState
@ -203,6 +220,10 @@ func (r *raft) send(m pb.Message) {
// sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
if pr.shouldWait() {
log.Printf("raft: %x ignored sending %s to %x [%s]", r.id, pb.MsgApp, to, pr)
return
}
m := pb.Message{}
m.To = to
if r.needSnapshot(pr.next) {
@ -218,6 +239,7 @@ func (r *raft) sendAppend(to uint64) {
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.waitSet(r.electionTimeout)
} else {
m.Type = pb.MsgApp
m.Index = pr.next - 1
@ -228,6 +250,11 @@ func (r *raft) sendAppend(to uint64) {
// has been matched.
if n := len(m.Entries); pr.match != 0 && n != 0 {
pr.optimisticUpdate(m.Entries[n-1].Index)
} else if pr.match == 0 {
// TODO (xiangli): better way to find out if the follwer is in good path or not
// a follower might be in bad path even if match != 0, since we optmistically
// increase the next.
pr.waitSet(r.heartbeatTimeout)
}
}
r.send(m)
@ -268,6 +295,7 @@ func (r *raft) bcastHeartbeat() {
continue
}
r.sendHeartbeat(i)
r.prs[i].waitDecr(r.heartbeatTimeout)
}
}

View File

@ -70,10 +70,10 @@ func TestProgressUpdate(t *testing.T) {
}
p.update(tt.update)
if p.match != tt.wm {
t.Errorf("#%d: match=%d, want %d", i, p.match, tt.wm)
t.Errorf("#%d: match= %d, want %d", i, p.match, tt.wm)
}
if p.next != tt.wn {
t.Errorf("#%d: next=%d, want %d", i, p.next, tt.wn)
t.Errorf("#%d: next= %d, want %d", i, p.next, tt.wn)
}
}
}
@ -132,17 +132,85 @@ func TestProgressMaybeDecr(t *testing.T) {
next: tt.n,
}
if g := p.maybeDecrTo(tt.to); g != tt.w {
t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
}
if gm := p.match; gm != tt.m {
t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
}
if gn := p.next; gn != tt.wn {
t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
}
}
}
func TestProgressShouldWait(t *testing.T) {
tests := []struct {
m uint64
wait int
w bool
}{
// match != 0 is always not wait
{1, 0, false},
{1, 1, false},
{0, 1, true},
{0, 0, false},
}
for i, tt := range tests {
p := &progress{
match: tt.m,
wait: tt.wait,
}
if g := p.shouldWait(); g != tt.w {
t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
}
}
}
// TestProgressWaitReset ensures that progress.Update and progress.DercTo
// will reset progress.wait.
func TestProgressWaitReset(t *testing.T) {
p := &progress{
wait: 1,
}
p.maybeDecrTo(1)
if p.wait != 0 {
t.Errorf("wait= %d, want 0", p.wait)
}
p.wait = 1
p.update(2)
if p.wait != 0 {
t.Errorf("wait= %d, want 0", p.wait)
}
}
// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
func TestProgressDecr(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r.becomeCandidate()
r.becomeLeader()
r.prs[2].wait = r.heartbeatTimeout * 2
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if r.prs[2].wait != r.heartbeatTimeout*(2-1) {
t.Errorf("wait = %d, want %d", r.prs[2].wait, r.heartbeatTimeout*(2-1))
}
}
func TestProgressWait(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r.becomeCandidate()
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 {
t.Errorf("len(ms) = %d, want 1", len(ms))
}
}
func TestLeaderElection(t *testing.T) {
tests := []struct {
*network
@ -269,7 +337,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
// avoid committing ChangeTerm proposal
tt.ignore(pb.MsgApp)
// elect 1 as the new leader with term 2
// elect 2 as the new leader with term 2
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
// no log entries from previous term should be committed
@ -279,10 +347,11 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
}
tt.recover()
// still be able to append a entry
// send heartbeat; reset wait
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
// append an entry at current term
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
// expect the committed to be advanced
if sm.raftLog.committed != 5 {
t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
}
@ -378,6 +447,8 @@ func TestCandidateConcede(t *testing.T) {
// heal the partition
tt.recover()
// send heartbeat; reset wait
tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
data := []byte("force follower")
// send a proposal to 2 to flush out a MsgApp to 0
@ -425,18 +496,21 @@ func TestOldMessages(t *testing.T) {
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
// pretend we're an old leader trying to make progress
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
// pretend we're an old leader trying to make progress; this entry is expected to be ignored.
tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}})
// commit a new entry
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
l := &raftLog{
storage: &MemoryStorage{
ents: []pb.Entry{
{}, {Data: nil, Term: 1, Index: 1},
{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
{Data: []byte("somedata"), Term: 3, Index: 4},
},
},
unstable: unstable{offset: 4},
committed: 3,
unstable: unstable{offset: 5},
committed: 4,
}
base := ltoa(l)
for i, p := range tt.peers {