diff --git a/etcdserver/server.go b/etcdserver/server.go index d7d8e3fab..a36ae47ee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -394,6 +394,7 @@ func (s *EtcdServer) run() { if err := s.storage.SaveSnap(rd.Snapshot); err != nil { log.Fatalf("etcdserver: create snapshot error: %v", err) } + s.raftStorage.ApplySnapshot(rd.Snapshot) snapi = rd.Snapshot.Metadata.Index } diff --git a/raft/log.go b/raft/log.go index ccd6122d0..e6f1ff989 100644 --- a/raft/log.go +++ b/raft/log.go @@ -26,6 +26,9 @@ import ( type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage + + // the incoming unstable snapshot, if any. + unstableSnapshot *pb.Snapshot // unstableEnts contains all entries that have not yet been written // to storage. unstableEnts []pb.Entry @@ -149,7 +152,17 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) { return nil } +func (l *raftLog) snapshot() (pb.Snapshot, error) { + if l.unstableSnapshot != nil { + return *l.unstableSnapshot, nil + } + return l.storage.Snapshot() +} + func (l *raftLog) firstIndex() uint64 { + if l.unstableSnapshot != nil { + return l.unstableSnapshot.Metadata.Index + 1 + } index, err := l.storage.FirstIndex() if err != nil { panic(err) // TODO(bdarnell) @@ -199,6 +212,12 @@ func (l *raftLog) term(i uint64) uint64 { case i > l.lastIndex(): return 0 case i < l.unstable: + if snap := l.unstableSnapshot; snap != nil { + if i == snap.Metadata.Index { + return snap.Metadata.Term + } + return 0 + } t, err := l.storage.Term(i) switch err { case nil: @@ -245,15 +264,10 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { } func (l *raftLog) restore(s pb.Snapshot) { - // TODO: rethink restore logic. - // This breaks the rule that raft never modifies storage. - err := l.storage.ApplySnapshot(s) - if err != nil { - panic(err) // TODO(bdarnell) - } l.committed = s.Metadata.Index l.unstable = l.committed + 1 l.unstableEnts = nil + l.unstableSnapshot = &s } // slice returns a slice of log entries from lo through hi-1, inclusive. diff --git a/raft/node.go b/raft/node.go index f22477ae5..827f9a9b4 100644 --- a/raft/node.go +++ b/raft/node.go @@ -306,8 +306,8 @@ func (n *node) run(r *raft) { r.raftLog.stableTo(prevLastUnstablei) havePrevLastUnstablei = false } - if r.snapshot != nil && r.snapshot.Metadata.Index == prevSnapi { - r.snapshot = nil + if r.raftLog.unstableSnapshot != nil && r.raftLog.unstableSnapshot.Metadata.Index == prevSnapi { + r.raftLog.unstableSnapshot = nil } advancec = nil case <-n.stop: @@ -405,8 +405,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if !isHardStateEqual(r.HardState, prevHardSt) { rd.HardState = r.HardState } - if r.snapshot != nil { - rd.Snapshot = *r.snapshot + if r.raftLog.unstableSnapshot != nil { + rd.Snapshot = *r.raftLog.unstableSnapshot } return rd } diff --git a/raft/raft.go b/raft/raft.go index f64e14b60..92275561b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -116,9 +116,6 @@ type raft struct { msgs []pb.Message - // the incoming snapshot, if any. - snapshot *pb.Snapshot - // the leader id lead uint64 @@ -222,7 +219,7 @@ func (r *raft) sendAppend(to uint64) { m.To = to if r.needSnapshot(pr.next) { m.Type = pb.MsgSnap - snapshot, err := r.raftLog.storage.Snapshot() + snapshot, err := r.raftLog.snapshot() if err != nil { panic(err) // TODO(bdarnell) } @@ -438,7 +435,6 @@ func (r *raft) handleHeartbeat(m pb.Message) { func (r *raft) handleSnapshot(m pb.Message) { if r.restore(m.Snapshot) { - r.snapshot = &m.Snapshot r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})