diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 711979c8f..5c144b313 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -68,6 +68,8 @@ type raftNode struct { stopc chan struct{} // signals proposal channel closed httpstopc chan struct{} // signals http server to shutdown httpdonec chan struct{} // signals http server shutdown complete + + logger *zap.Logger } var defaultSnapshotCount uint64 = 10000 @@ -99,6 +101,8 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), + logger: zap.NewExample(), + snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } @@ -107,17 +111,17 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, } func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { - // must save the snapshot index to the WAL before saving the - // snapshot to maintain the invariant that we only Open the - // wal at previously-saved snapshot indexes. walSnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - if err := rc.wal.SaveSnapshot(walSnap); err != nil { + // save the snapshot file before writing the snapshot to the wal. + // This makes it possible for the snapshot file to become orphaned, but prevents + // a WAL snapshot entry from having no corresponding snapshot file. + if err := rc.snapshotter.SaveSnap(snap); err != nil { return err } - if err := rc.snapshotter.SaveSnap(snap); err != nil { + if err := rc.wal.SaveSnapshot(walSnap); err != nil { return err } return rc.wal.ReleaseLockTo(snap.Metadata.Index) @@ -179,11 +183,18 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { } func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { - snapshot, err := rc.snapshotter.Load() - if err != nil && err != snap.ErrNoSnapshot { - log.Fatalf("raftexample: error loading snapshot (%v)", err) + if wal.Exist(rc.waldir) { + walSnaps, err := wal.ValidSnapshotEntries(rc.logger, rc.waldir) + if err != nil { + log.Fatalf("raftexample: error listing snapshots (%v)", err) + } + snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + log.Fatalf("raftexample: error loading snapshot (%v)", err) + } + return snapshot } - return snapshot + return &raftpb.Snapshot{} } // openWAL returns a WAL ready for reading. @@ -281,7 +292,7 @@ func (rc *raftNode) startRaft() { } rc.transport = &rafthttp.Transport{ - Logger: zap.NewExample(), + Logger: rc.logger, ID: types.ID(rc.id), ClusterID: 0x1000, Raft: rc,