diff --git a/etcd/participant.go b/etcd/participant.go index 261194963..a55d6f84c 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -152,22 +152,24 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura log.Printf("id=%x participant.snapload err=%s\n", p.id, err) return nil, err } + var logIndex int64 if s != nil { - p.node.Restore(*s) - if err := p.Recovery(s.Data); err != nil { - panic(err) - } - log.Printf("id=%x recovered index=%d\n", p.id, s.Index) + logIndex = s.Index } - - n, err := wal.Read(walDir, 0) + n, err := wal.Read(walDir, logIndex) if err != nil { return nil, err } p.id = n.Id - p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection) + p.node.Node = raft.Recover(s, n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection) p.apply(p.node.Next()) - log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walDir, n.State, len(n.Ents)) + log.Printf("id=%x participant.load path=%s snap=%+v state=\"%+v\" len(ents)=%d", p.id, p.cfg.DataDir, s, n.State, len(n.Ents)) + if s != nil { + if err := p.Recovery(s.Data); err != nil { + panic(err) + } + log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index) + } if w, err = wal.Open(walDir); err != nil { return nil, err } diff --git a/raft/log.go b/raft/log.go index e0f2888e9..3f5743a09 100644 --- a/raft/log.go +++ b/raft/log.go @@ -171,7 +171,6 @@ func (l *raftLog) restore(s Snapshot) { l.applied = s.Index l.offset = s.Index l.snapshot = s - l.unstableSnapshot = s } func (l *raftLog) at(i int64) *Entry { diff --git a/raft/node.go b/raft/node.go index 064b8d5b1..4fc6112ba 100644 --- a/raft/node.go +++ b/raft/node.go @@ -52,10 +52,15 @@ func New(id int64, heartbeat, election tick) *Node { return n } -func Recover(id int64, ents []Entry, state State, heartbeat, election tick) *Node { +func Recover(s *Snapshot, id int64, ents []Entry, state State, heartbeat, election tick) *Node { n := New(id, heartbeat, election) + if s != nil { + n.sm.restore(*s) + } n.sm.loadEnts(ents) - n.sm.loadState(state) + if !state.IsEmpty() { + n.sm.loadState(state) + } return n } @@ -231,10 +236,6 @@ func (n *Node) UpdateConf(t int64, c *Config) { n.propose(t, data) } -func (n *Node) Restore(s Snapshot) bool { - return n.sm.restore(s) -} - // UnstableEnts retuens all the entries that need to be persistent. // The first return value is offset, and the second one is unstable entries. func (n *Node) UnstableEnts() []Entry { diff --git a/raft/node_test.go b/raft/node_test.go index 193f810ef..80b914e53 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -192,7 +192,7 @@ func TestRecover(t *testing.T) { ents := []Entry{{Term: 1}, {Term: 2}, {Term: 3}} state := State{Term: 500, Vote: 1, Commit: 3} - n := Recover(0, ents, state, defaultHeartbeat, defaultElection) + n := Recover(nil, 0, ents, state, defaultHeartbeat, defaultElection) if g := n.Next(); !reflect.DeepEqual(g, ents) { t.Errorf("ents = %+v, want %+v", g, ents) } diff --git a/raft/raft.go b/raft/raft.go index 6df7d2f34..85cd8e748 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -388,6 +388,7 @@ func (sm *stateMachine) handleAppendEntries(m Message) { func (sm *stateMachine) handleSnapshot(m Message) { if sm.restore(m.Snapshot) { + sm.raftLog.unstableSnapshot = m.Snapshot sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()}) } else { sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed}) @@ -574,10 +575,7 @@ func (sm *stateMachine) setState(vote, term, commit int64) { } func (sm *stateMachine) loadEnts(ents []Entry) { - if !sm.raftLog.isEmpty() { - panic("cannot load entries when log is not empty") - } - sm.raftLog.append(0, ents...) + sm.raftLog.append(sm.raftLog.lastIndex(), ents...) sm.raftLog.unstable = sm.raftLog.lastIndex() + 1 }