diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 4bd5d1001..7cb6aa029 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -50,6 +50,10 @@ type raftNode struct { snapdir string // path to snapshot directory lastIndex uint64 // index of log at start + confState raftpb.ConfState + snapshotIndex uint64 + appliedIndex uint64 + // raft backing for the commit/error channel node raft.Node raftStorage *raft.MemoryStorage @@ -106,6 +110,20 @@ func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { return rc.wal.ReleaseLockTo(snap.Metadata.Index) } +func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { + if len(ents) == 0 { + return + } + firstIdx := ents[0].Index + if firstIdx > rc.appliedIndex+1 { + log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rc.appliedIndex) + } + if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) { + nents = ents[rc.appliedIndex-firstIdx+1:] + } + return +} + // publishEntries writes committed log entries to commit channel and returns // whether all entries could be published. func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { @@ -141,6 +159,9 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { } } + // after commit, update appliedIndex + rc.appliedIndex = ents[i].Index + // special nil commit to signal replay has finished if ents[i].Index == rc.lastIndex { select { @@ -273,6 +294,14 @@ func (rc *raftNode) stopHTTP() { } func (rc *raftNode) serveChannels() { + snap, err := rc.raftStorage.Snapshot() + if err != nil { + panic(err) + } + rc.confState = snap.Metadata.ConfState + rc.snapshotIndex = snap.Metadata.Index + rc.appliedIndex = snap.Metadata.Index + defer rc.wal.Close() ticker := time.NewTicker(100 * time.Millisecond) @@ -321,10 +350,12 @@ func (rc *raftNode) serveChannels() { } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) - if ok := rc.publishEntries(rd.CommittedEntries); !ok { + // TODO: apply snapshot + if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } + // TODO: trigger snapshot rc.node.Advance() case err := <-rc.transport.ErrorC: