mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raftexample: load snapshot when opening WAL
Fix https://github.com/coreos/etcd/issues/7056. Previously we don't load snapshot when replaying WAL.
This commit is contained in:
parent
24601ca24b
commit
0af1679b61
@ -77,6 +77,7 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
|
||||
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var dataKv kv
|
||||
|
@ -94,7 +94,6 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
|
||||
waldir: fmt.Sprintf("raftexample-%d", id),
|
||||
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
|
||||
getSnapshot: getSnapshot,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
snapCount: defaultSnapCount,
|
||||
stopc: make(chan struct{}),
|
||||
httpstopc: make(chan struct{}),
|
||||
@ -185,8 +184,16 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
|
||||
snapshot, err := rc.snapshotter.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatalf("raftexample: error loading snapshot (%v)", err)
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
|
||||
// openWAL returns a WAL ready for reading.
|
||||
func (rc *raftNode) openWAL() *wal.WAL {
|
||||
func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
|
||||
if !wal.Exist(rc.waldir) {
|
||||
if err := os.Mkdir(rc.waldir, 0750); err != nil {
|
||||
log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
|
||||
@ -199,7 +206,12 @@ func (rc *raftNode) openWAL() *wal.WAL {
|
||||
w.Close()
|
||||
}
|
||||
|
||||
w, err := wal.Open(rc.waldir, walpb.Snapshot{})
|
||||
walsnap := walpb.Snapshot{}
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index)
|
||||
w, err := wal.Open(rc.waldir, walsnap)
|
||||
if err != nil {
|
||||
log.Fatalf("raftexample: error loading wal (%v)", err)
|
||||
}
|
||||
@ -209,11 +221,19 @@ func (rc *raftNode) openWAL() *wal.WAL {
|
||||
|
||||
// replayWAL replays WAL entries into the raft instance.
|
||||
func (rc *raftNode) replayWAL() *wal.WAL {
|
||||
w := rc.openWAL()
|
||||
log.Printf("replaying WAL of member %d", rc.id)
|
||||
snapshot := rc.loadSnapshot()
|
||||
w := rc.openWAL(snapshot)
|
||||
_, st, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
log.Fatalf("raftexample: failed to read WAL (%v)", err)
|
||||
}
|
||||
rc.raftStorage = raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
rc.raftStorage.ApplySnapshot(*snapshot)
|
||||
}
|
||||
rc.raftStorage.SetHardState(st)
|
||||
|
||||
// append to storage so raft starts at the right place in log
|
||||
rc.raftStorage.Append(ents)
|
||||
// send nil once lastIndex is published so client knows commit channel is current
|
||||
@ -222,7 +242,6 @@ func (rc *raftNode) replayWAL() *wal.WAL {
|
||||
} else {
|
||||
rc.commitC <- nil
|
||||
}
|
||||
rc.raftStorage.SetHardState(st)
|
||||
return w
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user