From 0d7c43d885192b0e088abe81b1ae71219dc5b396 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 3 Nov 2014 12:26:13 -0800 Subject: [PATCH] *: add a Advance interface to raft.Node Node set the applied to committed right after it sends out Ready to application. This is not correct since the application has not actually applied the entries at that point. We add a Advance interface to Node. Application needs to call Advance to tell raft Node its progress. Also this change can avoid unnecessary copying when application is still applying entires but there are more entries to be applied. --- etcdserver/server.go | 2 ++ etcdserver/server_test.go | 8 +++-- raft/doc.go | 3 +- raft/log.go | 18 ++++++++++ raft/node.go | 75 +++++++++++++++++++++++++++------------ raft/node_bench_test.go | 6 +++- raft/node_test.go | 30 ++++++++++++++++ 7 files changed, 114 insertions(+), 28 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index fad9136f0..19a61b342 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -338,6 +338,8 @@ func (s *EtcdServer) run() { appliedi = rd.Snapshot.Index } + s.node.Advance() + if appliedi-snapi > s.snapCount { s.snapshot(appliedi, nodes) snapi = appliedi diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1e7c06a4c..142dc213f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -814,6 +814,7 @@ func TestTriggerSnap(t *testing.T) { ctx := context.Background() n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1) <-n.Ready() + n.Advance() n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0}) n.Campaign(ctx) st := &storeRecorder{} @@ -1252,6 +1253,7 @@ func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChang } func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) Advance() {} func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {} @@ -1260,9 +1262,8 @@ type nodeRecorder struct { recorder } -func (n *nodeRecorder) Tick() { - n.record(action{name: "Tick"}) -} +func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) } + func (n *nodeRecorder) Campaign(ctx context.Context) error { n.record(action{name: "Campaign"}) return nil @@ -1280,6 +1281,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) Advance() {} func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { n.record(action{name: "ApplyConfChange", params: []interface{}{conf}}) } diff --git a/raft/doc.go b/raft/doc.go index 4032c45fd..8e3c6b759 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -51,8 +51,9 @@ The total state machine handling loop will look something like this: n.Tick() case rd := <-s.Node.Ready(): saveToStable(rd.State, rd.Entries) - process(rd.CommittedEntries) send(rd.Messages) + process(rd.CommittedEntries) + s.Node.Advance() case <-s.done: return } diff --git a/raft/log.go b/raft/log.go index 2f302c48f..13a763db7 100644 --- a/raft/log.go +++ b/raft/log.go @@ -18,6 +18,7 @@ package raft import ( "fmt" + "log" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -132,6 +133,23 @@ func (l *raftLog) resetNextEnts() { } } +func (l *raftLog) appliedTo(i uint64) { + if i == 0 { + return + } + if l.committed < i || i < l.applied { + log.Panicf("applied[%d] is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + } + l.applied = i +} + +func (l *raftLog) stableTo(i uint64) { + if i == 0 { + return + } + l.unstable = i + 1 +} + func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset } func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) } diff --git a/raft/node.go b/raft/node.go index 85fce43ed..95d0b14a5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -116,7 +116,11 @@ type Node interface { // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state + // Users of the Node must call Advance after applying the state returned by Ready Ready() <-chan Ready + // Advance notifies the Node that the application has applied and saved progress up to the last Ready. + // It prepares the node to return the next available Ready. + Advance() // ApplyConfChange applies config change to the local node. // TODO: reject existing node when add node // TODO: reject non-existant node when remove node @@ -181,6 +185,7 @@ type node struct { compactc chan compact confc chan pb.ConfChange readyc chan Ready + advancec chan struct{} tickc chan struct{} done chan struct{} } @@ -192,6 +197,7 @@ func newNode() node { compactc: make(chan compact), confc: make(chan pb.ConfChange), readyc: make(chan Ready), + advancec: make(chan struct{}), tickc: make(chan struct{}), done: make(chan struct{}), } @@ -204,6 +210,9 @@ func (n *node) Stop() { func (n *node) run(r *raft) { var propc chan pb.Message var readyc chan Ready + var advancec chan struct{} + var prevLastUnstablei uint64 + var rd Ready lead := None prevSoftSt := r.softState() @@ -211,26 +220,30 @@ func (n *node) run(r *raft) { prevSnapi := r.raftLog.snapshot.Index for { - rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi) - if rd.containsUpdates() { - readyc = n.readyc - } else { + if advancec != nil { readyc = nil - } - - if rd.SoftState != nil && lead != rd.SoftState.Lead { - if r.hasLeader() { - if lead == None { - log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term) - } else { - log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term) - } - propc = n.propc + } else { + rd = newReady(r, prevSoftSt, prevHardSt, prevSnapi) + if rd.containsUpdates() { + readyc = n.readyc } else { - log.Printf("raft: lost leader %x at term %d", lead, r.Term) - propc = nil + readyc = nil + } + + if rd.SoftState != nil && lead != rd.SoftState.Lead { + if r.hasLeader() { + if lead == None { + log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term) + } else { + log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term) + } + propc = n.propc + } else { + log.Printf("raft: lost leader %x at term %d", lead, r.Term) + propc = nil + } + lead = rd.SoftState.Lead } - lead = rd.SoftState.Lead } select { @@ -263,19 +276,28 @@ func (n *node) run(r *raft) { if rd.SoftState != nil { prevSoftSt = rd.SoftState } + if len(rd.Entries) > 0 { + prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index + } if !IsEmptyHardState(rd.HardState) { prevHardSt = rd.HardState } if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Index + if prevSnapi > prevLastUnstablei { + prevLastUnstablei = prevSnapi + } } - // TODO(yichengq): we assume that all committed config - // entries will be applied to make things easy for now. - // TODO(yichengq): it may have race because applied is set - // before entries are applied. - r.raftLog.resetNextEnts() - r.raftLog.resetUnstable() r.msgs = nil + advancec = n.advancec + case <-advancec: + if prevHardSt.Commit != 0 { + r.raftLog.appliedTo(prevHardSt.Commit) + } + if prevLastUnstablei != 0 { + r.raftLog.stableTo(prevLastUnstablei) + } + advancec = nil case <-n.done: return } @@ -338,6 +360,13 @@ func (n *node) Ready() <-chan Ready { return n.readyc } +func (n *node) Advance() { + select { + case n.advancec <- struct{}{}: + case <-n.done: + } +} + func (n *node) ApplyConfChange(cc pb.ConfChange) { select { case n.confc <- cc: diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 96270be2b..a48b7529a 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -10,12 +10,16 @@ func BenchmarkOneNode(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n := StartNode(1, []Peer{{ID: 1}}, 0, 0) + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + go n.run(r) + defer n.Stop() n.Campaign(ctx) for i := 0; i < b.N; i++ { <-n.Ready() + n.Advance() n.Propose(ctx, []byte("foo")) } rd := <-n.Ready() diff --git a/raft/node_test.go b/raft/node_test.go index f2cf637bc..e25513f67 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -195,11 +195,15 @@ func TestNode(t *testing.T) { n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) + } else { + n.Advance() } n.Propose(ctx, []byte("foo")) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) { t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1]) + } else { + n.Advance() } select { @@ -226,6 +230,8 @@ func TestNodeRestart(t *testing.T) { n := RestartNode(1, 10, 1, nil, st, entries) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) + } else { + n.Advance() } select { @@ -256,6 +262,7 @@ func TestNodeCompact(t *testing.T) { testutil.ForceGosched() select { case <-n.Ready(): + n.Advance() default: t.Fatalf("unexpected proposal failure: unable to commit entry") } @@ -267,6 +274,7 @@ func TestNodeCompact(t *testing.T) { if !reflect.DeepEqual(rd.Snapshot, w) { t.Errorf("snap = %+v, want %+v", rd.Snapshot, w) } + n.Advance() default: t.Fatalf("unexpected compact failure: unable to create a snapshot") } @@ -285,6 +293,28 @@ func TestNodeCompact(t *testing.T) { } } +func TestNodeAdvance(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + n := StartNode(1, []Peer{{ID: 1}}, 10, 1) + n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}) + n.Campaign(ctx) + <-n.Ready() + n.Propose(ctx, []byte("foo")) + select { + case rd := <-n.Ready(): + t.Fatalf("unexpected Ready before Advance: %+v", rd) + default: + } + n.Advance() + select { + case <-n.Ready(): + default: + t.Errorf("expect Ready after Advance, but there is no Ready available") + } +} + func TestSoftStateEqual(t *testing.T) { tests := []struct { st *SoftState