diff --git a/etcdserver/server.go b/etcdserver/server.go index 36236d34a..f9995519e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -229,6 +229,7 @@ func (s *EtcdServer) run() { var syncC <-chan time.Time // snapi indicates the index of the last submitted snapshot request var snapi, appliedi int64 + var nodes []int64 for { select { case <-s.ticker: @@ -265,6 +266,19 @@ func (s *EtcdServer) run() { appliedi = e.Index } + if rd.SoftState != nil { + nodes = rd.SoftState.Nodes + if rd.RaftState == raft.StateLeader { + syncC = s.syncTicker + } else { + syncC = nil + } + if rd.SoftState.ShouldStop { + s.Stop() + return + } + } + if rd.Snapshot.Index > snapi { snapi = rd.Snapshot.Index } @@ -278,21 +292,9 @@ func (s *EtcdServer) run() { } if appliedi-snapi > s.snapCount { - s.snapshot() + s.snapshot(appliedi, nodes) snapi = appliedi } - - if rd.SoftState != nil { - if rd.RaftState == raft.StateLeader { - syncC = s.syncTicker - } else { - syncC = nil - } - if rd.SoftState.ShouldStop { - s.Stop() - return - } - } case <-syncC: s.sync(defaultSyncTimeout) case <-s.done: @@ -517,14 +519,14 @@ func (s *EtcdServer) apply(r pb.Request) Response { } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot() { +func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) { d, err := s.store.Save() // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? if err != nil { panic("TODO: this is bad, what do we do about it?") } - s.node.Compact(d) + s.node.Compact(snapi, snapnodes, d) s.storage.Cut() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 22dae63fe..8ac6b99bc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) { node: n, } - s.snapshot() + s.snapshot(0, []int64{1}) gaction := st.Action() if len(gaction) != 1 { t.Fatalf("len(action) = %d, want 1", len(gaction)) @@ -1129,7 +1129,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} -func (n *readyNode) Compact(d []byte) {} +func (n *readyNode) Compact(index int64, nodes []int64, d []byte) {} type nodeRecorder struct { recorder @@ -1161,7 +1161,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { func (n *nodeRecorder) Stop() { n.record(action{name: "Stop"}) } -func (n *nodeRecorder) Compact(d []byte) { +func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) { n.record(action{name: "Compact"}) } diff --git a/raft/node.go b/raft/node.go index 28c3e797c..8e166a608 100644 --- a/raft/node.go +++ b/raft/node.go @@ -3,6 +3,7 @@ package raft import ( "errors" "log" + "reflect" pb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" @@ -18,11 +19,13 @@ var ( type SoftState struct { Lead int64 RaftState StateType + Nodes []int64 ShouldStop bool } func (a *SoftState) equal(b *SoftState) bool { - return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop + nodeeq := reflect.DeepEqual(a.Nodes, b.Nodes) + return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop && nodeeq } // Ready encapsulates the entries and messages that are ready to read, @@ -56,6 +59,12 @@ type Ready struct { Messages []pb.Message } +type compact struct { + index int64 + nodes []int64 + data []byte +} + func isHardStateEqual(a, b pb.HardState) bool { return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit } @@ -96,7 +105,7 @@ type Node interface { // Stop performs any necessary termination of the Node Stop() // Compact - Compact(d []byte) + Compact(index int64, nodes []int64, d []byte) } // StartNode returns a new Node given a unique raft id, a list of raft peers, and @@ -141,7 +150,7 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb. type node struct { propc chan pb.Message recvc chan pb.Message - compactc chan []byte + compactc chan compact confc chan pb.ConfChange readyc chan Ready tickc chan struct{} @@ -152,7 +161,7 @@ func newNode() node { return node{ propc: make(chan pb.Message), recvc: make(chan pb.Message), - compactc: make(chan []byte), + compactc: make(chan compact), confc: make(chan pb.ConfChange), readyc: make(chan Ready), tickc: make(chan struct{}), @@ -200,8 +209,8 @@ func (n *node) run(r *raft) { r.Step(m) case m := <-n.recvc: r.Step(m) // raft never returns an error - case d := <-n.compactc: - r.compact(d) + case c := <-n.compactc: + r.compact(c.index, c.nodes, c.data) case cc := <-n.confc: switch cc.Type { case pb.ConfChangeAddNode: @@ -299,9 +308,9 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) { } } -func (n *node) Compact(d []byte) { +func (n *node) Compact(index int64, nodes []int64, d []byte) { select { - case n.compactc <- d: + case n.compactc <- compact{index, nodes, d}: case <-n.done: } } diff --git a/raft/node_test.go b/raft/node_test.go index 0822c1e91..dd2ee787d 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -156,7 +156,7 @@ func TestNode(t *testing.T) { } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + SoftState: &SoftState{Lead: 1, Nodes: []int64{1}, RaftState: StateLeader}, HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{ {}, @@ -244,7 +244,7 @@ func TestCompact(t *testing.T) { t.Fatalf("unexpected proposal failure: unable to commit entry") } - n.Compact(w.Data) + n.Compact(w.Index, w.Nodes, w.Data) pkg.ForceGosched() select { case rd := <-n.Ready(): @@ -278,6 +278,7 @@ func TestSoftStateEqual(t *testing.T) { {&SoftState{Lead: 1}, false}, {&SoftState{RaftState: StateLeader}, false}, {&SoftState{ShouldStop: true}, false}, + {&SoftState{Nodes: []int64{1, 2}}, false}, } for i, tt := range tests { if g := tt.st.equal(&SoftState{}); g != tt.we { diff --git a/raft/raft.go b/raft/raft.go index 47a07031e..56e7f1d93 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -153,7 +153,7 @@ func (r *raft) hasLeader() bool { return r.lead != None } func (r *raft) shouldStop() bool { return r.removed[r.id] } func (r *raft) softState() *SoftState { - return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()} + return &SoftState{Lead: r.lead, Nodes: r.nodes(), RaftState: r.state, ShouldStop: r.shouldStop()} } func (r *raft) String() string { @@ -515,9 +515,13 @@ func stepFollower(r *raft, m pb.Message) { } } -func (r *raft) compact(d []byte) { - r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), r.nodes()) +func (r *raft) compact(index int64, nodes []int64, d []byte) error { + if index > r.raftLog.applied { + return fmt.Errorf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied) + } + r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), nodes) r.raftLog.compact(r.raftLog.applied) + return nil } // restore recovers the statemachine from a snapshot. It restores the log and the diff --git a/raft/raft_test.go b/raft/raft_test.go index 7a517545a..edbcd47e8 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -897,7 +897,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead) - lead.compact(nil) + lead.compact(lead.raftLog.applied, lead.nodes(), nil) nt.recover() // trigger a snapshot