From 5587e0d73fecfabc98293f1f566c6f190923c90c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 6 Oct 2014 10:12:31 +0800 Subject: [PATCH] raft: compact takes index and nodes parameters Before this commit, compact always compact log at current appliedindex of raft. This prevents us from doing non-blocking snapshot since we have to make snapshot and compact atomically. To prepare for non-blocking snapshot, this commit make compact supports index and nodes parameters. After completing snapshot, the applier should call compact with the snapshot index and the nodes at snapshot index to do a compaction at snapsohot index. --- etcdserver/server.go | 32 +++++++++++++++++--------------- etcdserver/server_test.go | 6 +++--- raft/node.go | 25 +++++++++++++++++-------- raft/node_test.go | 5 +++-- raft/raft.go | 10 +++++++--- raft/raft_test.go | 2 +- 6 files changed, 48 insertions(+), 32 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 36236d34a..f9995519e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -229,6 +229,7 @@ func (s *EtcdServer) run() { var syncC <-chan time.Time // snapi indicates the index of the last submitted snapshot request var snapi, appliedi int64 + var nodes []int64 for { select { case <-s.ticker: @@ -265,6 +266,19 @@ func (s *EtcdServer) run() { appliedi = e.Index } + if rd.SoftState != nil { + nodes = rd.SoftState.Nodes + if rd.RaftState == raft.StateLeader { + syncC = s.syncTicker + } else { + syncC = nil + } + if rd.SoftState.ShouldStop { + s.Stop() + return + } + } + if rd.Snapshot.Index > snapi { snapi = rd.Snapshot.Index } @@ -278,21 +292,9 @@ func (s *EtcdServer) run() { } if appliedi-snapi > s.snapCount { - s.snapshot() + s.snapshot(appliedi, nodes) snapi = appliedi } - - if rd.SoftState != nil { - if rd.RaftState == raft.StateLeader { - syncC = s.syncTicker - } else { - syncC = nil - } - if rd.SoftState.ShouldStop { - s.Stop() - return - } - } case <-syncC: s.sync(defaultSyncTimeout) case <-s.done: @@ -517,14 +519,14 @@ func (s *EtcdServer) apply(r pb.Request) Response { } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot() { +func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) { d, err := s.store.Save() // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? if err != nil { panic("TODO: this is bad, what do we do about it?") } - s.node.Compact(d) + s.node.Compact(snapi, snapnodes, d) s.storage.Cut() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 22dae63fe..8ac6b99bc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) { node: n, } - s.snapshot() + s.snapshot(0, []int64{1}) gaction := st.Action() if len(gaction) != 1 { t.Fatalf("len(action) = %d, want 1", len(gaction)) @@ -1129,7 +1129,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} -func (n *readyNode) Compact(d []byte) {} +func (n *readyNode) Compact(index int64, nodes []int64, d []byte) {} type nodeRecorder struct { recorder @@ -1161,7 +1161,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { func (n *nodeRecorder) Stop() { n.record(action{name: "Stop"}) } -func (n *nodeRecorder) Compact(d []byte) { +func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) { n.record(action{name: "Compact"}) } diff --git a/raft/node.go b/raft/node.go index 28c3e797c..8e166a608 100644 --- a/raft/node.go +++ b/raft/node.go @@ -3,6 +3,7 @@ package raft import ( "errors" "log" + "reflect" pb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" @@ -18,11 +19,13 @@ var ( type SoftState struct { Lead int64 RaftState StateType + Nodes []int64 ShouldStop bool } func (a *SoftState) equal(b *SoftState) bool { - return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop + nodeeq := reflect.DeepEqual(a.Nodes, b.Nodes) + return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop && nodeeq } // Ready encapsulates the entries and messages that are ready to read, @@ -56,6 +59,12 @@ type Ready struct { Messages []pb.Message } +type compact struct { + index int64 + nodes []int64 + data []byte +} + func isHardStateEqual(a, b pb.HardState) bool { return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit } @@ -96,7 +105,7 @@ type Node interface { // Stop performs any necessary termination of the Node Stop() // Compact - Compact(d []byte) + Compact(index int64, nodes []int64, d []byte) } // StartNode returns a new Node given a unique raft id, a list of raft peers, and @@ -141,7 +150,7 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb. type node struct { propc chan pb.Message recvc chan pb.Message - compactc chan []byte + compactc chan compact confc chan pb.ConfChange readyc chan Ready tickc chan struct{} @@ -152,7 +161,7 @@ func newNode() node { return node{ propc: make(chan pb.Message), recvc: make(chan pb.Message), - compactc: make(chan []byte), + compactc: make(chan compact), confc: make(chan pb.ConfChange), readyc: make(chan Ready), tickc: make(chan struct{}), @@ -200,8 +209,8 @@ func (n *node) run(r *raft) { r.Step(m) case m := <-n.recvc: r.Step(m) // raft never returns an error - case d := <-n.compactc: - r.compact(d) + case c := <-n.compactc: + r.compact(c.index, c.nodes, c.data) case cc := <-n.confc: switch cc.Type { case pb.ConfChangeAddNode: @@ -299,9 +308,9 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) { } } -func (n *node) Compact(d []byte) { +func (n *node) Compact(index int64, nodes []int64, d []byte) { select { - case n.compactc <- d: + case n.compactc <- compact{index, nodes, d}: case <-n.done: } } diff --git a/raft/node_test.go b/raft/node_test.go index 0822c1e91..dd2ee787d 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -156,7 +156,7 @@ func TestNode(t *testing.T) { } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + SoftState: &SoftState{Lead: 1, Nodes: []int64{1}, RaftState: StateLeader}, HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{ {}, @@ -244,7 +244,7 @@ func TestCompact(t *testing.T) { t.Fatalf("unexpected proposal failure: unable to commit entry") } - n.Compact(w.Data) + n.Compact(w.Index, w.Nodes, w.Data) pkg.ForceGosched() select { case rd := <-n.Ready(): @@ -278,6 +278,7 @@ func TestSoftStateEqual(t *testing.T) { {&SoftState{Lead: 1}, false}, {&SoftState{RaftState: StateLeader}, false}, {&SoftState{ShouldStop: true}, false}, + {&SoftState{Nodes: []int64{1, 2}}, false}, } for i, tt := range tests { if g := tt.st.equal(&SoftState{}); g != tt.we { diff --git a/raft/raft.go b/raft/raft.go index 47a07031e..56e7f1d93 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -153,7 +153,7 @@ func (r *raft) hasLeader() bool { return r.lead != None } func (r *raft) shouldStop() bool { return r.removed[r.id] } func (r *raft) softState() *SoftState { - return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()} + return &SoftState{Lead: r.lead, Nodes: r.nodes(), RaftState: r.state, ShouldStop: r.shouldStop()} } func (r *raft) String() string { @@ -515,9 +515,13 @@ func stepFollower(r *raft, m pb.Message) { } } -func (r *raft) compact(d []byte) { - r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), r.nodes()) +func (r *raft) compact(index int64, nodes []int64, d []byte) error { + if index > r.raftLog.applied { + return fmt.Errorf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied) + } + r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), nodes) r.raftLog.compact(r.raftLog.applied) + return nil } // restore recovers the statemachine from a snapshot. It restores the log and the diff --git a/raft/raft_test.go b/raft/raft_test.go index 7a517545a..edbcd47e8 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -897,7 +897,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead) - lead.compact(nil) + lead.compact(lead.raftLog.applied, lead.nodes(), nil) nt.recover() // trigger a snapshot