raftexample: Save snapshot file before writing to wal

This commit is contained in:
Shintaro Murakami 2021-02-16 12:15:36 +09:00
parent 719f6ce06f
commit 1302e1edb2

View File

@ -68,6 +68,8 @@ type raftNode struct {
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
logger *zap.Logger
}
var defaultSnapshotCount uint64 = 10000
@ -99,6 +101,8 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
logger: zap.NewExample(),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
@ -107,17 +111,17 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
}
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
// must save the snapshot index to the WAL before saving the
// snapshot to maintain the invariant that we only Open the
// wal at previously-saved snapshot indexes.
walSnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
if err := rc.wal.SaveSnapshot(walSnap); err != nil {
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
if err := rc.snapshotter.SaveSnap(snap); err != nil {
return err
}
if err := rc.snapshotter.SaveSnap(snap); err != nil {
if err := rc.wal.SaveSnapshot(walSnap); err != nil {
return err
}
return rc.wal.ReleaseLockTo(snap.Metadata.Index)
@ -179,11 +183,18 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
}
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
snapshot, err := rc.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Fatalf("raftexample: error loading snapshot (%v)", err)
if wal.Exist(rc.waldir) {
walSnaps, err := wal.ValidSnapshotEntries(rc.logger, rc.waldir)
if err != nil {
log.Fatalf("raftexample: error listing snapshots (%v)", err)
}
snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
log.Fatalf("raftexample: error loading snapshot (%v)", err)
}
return snapshot
}
return snapshot
return &raftpb.Snapshot{}
}
// openWAL returns a WAL ready for reading.
@ -281,7 +292,7 @@ func (rc *raftNode) startRaft() {
}
rc.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
Logger: rc.logger,
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,