diff --git a/etcdserver/server.go b/etcdserver/server.go index fad9136f0..19a61b342 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -338,6 +338,8 @@ func (s *EtcdServer) run() { appliedi = rd.Snapshot.Index } + s.node.Advance() + if appliedi-snapi > s.snapCount { s.snapshot(appliedi, nodes) snapi = appliedi diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1e7c06a4c..142dc213f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -814,6 +814,7 @@ func TestTriggerSnap(t *testing.T) { ctx := context.Background() n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1) <-n.Ready() + n.Advance() n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0}) n.Campaign(ctx) st := &storeRecorder{} @@ -1252,6 +1253,7 @@ func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChang } func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) Advance() {} func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {} @@ -1260,9 +1262,8 @@ type nodeRecorder struct { recorder } -func (n *nodeRecorder) Tick() { - n.record(action{name: "Tick"}) -} +func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) } + func (n *nodeRecorder) Campaign(ctx context.Context) error { n.record(action{name: "Campaign"}) return nil @@ -1280,6 +1281,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) Advance() {} func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { n.record(action{name: "ApplyConfChange", params: []interface{}{conf}}) } diff --git a/raft/doc.go b/raft/doc.go index 4032c45fd..8e3c6b759 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -51,8 +51,9 @@ The total state machine handling loop will look something like this: n.Tick() case rd := <-s.Node.Ready(): saveToStable(rd.State, rd.Entries) - process(rd.CommittedEntries) send(rd.Messages) + process(rd.CommittedEntries) + s.Node.Advance() case <-s.done: return } diff --git a/raft/log.go b/raft/log.go index 2f302c48f..13a763db7 100644 --- a/raft/log.go +++ b/raft/log.go @@ -18,6 +18,7 @@ package raft import ( "fmt" + "log" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -132,6 +133,23 @@ func (l *raftLog) resetNextEnts() { } } +func (l *raftLog) appliedTo(i uint64) { + if i == 0 { + return + } + if l.committed < i || i < l.applied { + log.Panicf("applied[%d] is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + } + l.applied = i +} + +func (l *raftLog) stableTo(i uint64) { + if i == 0 { + return + } + l.unstable = i + 1 +} + func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset } func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) } diff --git a/raft/node.go b/raft/node.go index 85fce43ed..95d0b14a5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -116,7 +116,11 @@ type Node interface { // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state + // Users of the Node must call Advance after applying the state returned by Ready Ready() <-chan Ready + // Advance notifies the Node that the application has applied and saved progress up to the last Ready. + // It prepares the node to return the next available Ready. + Advance() // ApplyConfChange applies config change to the local node. // TODO: reject existing node when add node // TODO: reject non-existant node when remove node @@ -181,6 +185,7 @@ type node struct { compactc chan compact confc chan pb.ConfChange readyc chan Ready + advancec chan struct{} tickc chan struct{} done chan struct{} } @@ -192,6 +197,7 @@ func newNode() node { compactc: make(chan compact), confc: make(chan pb.ConfChange), readyc: make(chan Ready), + advancec: make(chan struct{}), tickc: make(chan struct{}), done: make(chan struct{}), } @@ -204,6 +210,9 @@ func (n *node) Stop() { func (n *node) run(r *raft) { var propc chan pb.Message var readyc chan Ready + var advancec chan struct{} + var prevLastUnstablei uint64 + var rd Ready lead := None prevSoftSt := r.softState() @@ -211,26 +220,30 @@ func (n *node) run(r *raft) { prevSnapi := r.raftLog.snapshot.Index for { - rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi) - if rd.containsUpdates() { - readyc = n.readyc - } else { + if advancec != nil { readyc = nil - } - - if rd.SoftState != nil && lead != rd.SoftState.Lead { - if r.hasLeader() { - if lead == None { - log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term) - } else { - log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term) - } - propc = n.propc + } else { + rd = newReady(r, prevSoftSt, prevHardSt, prevSnapi) + if rd.containsUpdates() { + readyc = n.readyc } else { - log.Printf("raft: lost leader %x at term %d", lead, r.Term) - propc = nil + readyc = nil + } + + if rd.SoftState != nil && lead != rd.SoftState.Lead { + if r.hasLeader() { + if lead == None { + log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term) + } else { + log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term) + } + propc = n.propc + } else { + log.Printf("raft: lost leader %x at term %d", lead, r.Term) + propc = nil + } + lead = rd.SoftState.Lead } - lead = rd.SoftState.Lead } select { @@ -263,19 +276,28 @@ func (n *node) run(r *raft) { if rd.SoftState != nil { prevSoftSt = rd.SoftState } + if len(rd.Entries) > 0 { + prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index + } if !IsEmptyHardState(rd.HardState) { prevHardSt = rd.HardState } if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Index + if prevSnapi > prevLastUnstablei { + prevLastUnstablei = prevSnapi + } } - // TODO(yichengq): we assume that all committed config - // entries will be applied to make things easy for now. - // TODO(yichengq): it may have race because applied is set - // before entries are applied. - r.raftLog.resetNextEnts() - r.raftLog.resetUnstable() r.msgs = nil + advancec = n.advancec + case <-advancec: + if prevHardSt.Commit != 0 { + r.raftLog.appliedTo(prevHardSt.Commit) + } + if prevLastUnstablei != 0 { + r.raftLog.stableTo(prevLastUnstablei) + } + advancec = nil case <-n.done: return } @@ -338,6 +360,13 @@ func (n *node) Ready() <-chan Ready { return n.readyc } +func (n *node) Advance() { + select { + case n.advancec <- struct{}{}: + case <-n.done: + } +} + func (n *node) ApplyConfChange(cc pb.ConfChange) { select { case n.confc <- cc: diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 96270be2b..a48b7529a 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -10,12 +10,16 @@ func BenchmarkOneNode(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n := StartNode(1, []Peer{{ID: 1}}, 0, 0) + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + go n.run(r) + defer n.Stop() n.Campaign(ctx) for i := 0; i < b.N; i++ { <-n.Ready() + n.Advance() n.Propose(ctx, []byte("foo")) } rd := <-n.Ready() diff --git a/raft/node_test.go b/raft/node_test.go index f2cf637bc..e25513f67 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -195,11 +195,15 @@ func TestNode(t *testing.T) { n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) + } else { + n.Advance() } n.Propose(ctx, []byte("foo")) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) { t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1]) + } else { + n.Advance() } select { @@ -226,6 +230,8 @@ func TestNodeRestart(t *testing.T) { n := RestartNode(1, 10, 1, nil, st, entries) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) + } else { + n.Advance() } select { @@ -256,6 +262,7 @@ func TestNodeCompact(t *testing.T) { testutil.ForceGosched() select { case <-n.Ready(): + n.Advance() default: t.Fatalf("unexpected proposal failure: unable to commit entry") } @@ -267,6 +274,7 @@ func TestNodeCompact(t *testing.T) { if !reflect.DeepEqual(rd.Snapshot, w) { t.Errorf("snap = %+v, want %+v", rd.Snapshot, w) } + n.Advance() default: t.Fatalf("unexpected compact failure: unable to create a snapshot") } @@ -285,6 +293,28 @@ func TestNodeCompact(t *testing.T) { } } +func TestNodeAdvance(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + n := StartNode(1, []Peer{{ID: 1}}, 10, 1) + n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}) + n.Campaign(ctx) + <-n.Ready() + n.Propose(ctx, []byte("foo")) + select { + case rd := <-n.Ready(): + t.Fatalf("unexpected Ready before Advance: %+v", rd) + default: + } + n.Advance() + select { + case <-n.Ready(): + default: + t.Errorf("expect Ready after Advance, but there is no Ready available") + } +} + func TestSoftStateEqual(t *testing.T) { tests := []struct { st *SoftState