raftexample: add index fields to filter entries

This commit is contained in:
Gyu-Ho Lee 2016-09-21 03:53:48 -07:00
parent 666d555450
commit 4b83f40618

View File

@ -50,6 +50,10 @@ type raftNode struct {
snapdir string // path to snapshot directory
lastIndex uint64 // index of log at start
confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
@ -106,6 +110,20 @@ func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
return rc.wal.ReleaseLockTo(snap.Metadata.Index)
}
func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
return
}
firstIdx := ents[0].Index
if firstIdx > rc.appliedIndex+1 {
log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rc.appliedIndex)
}
if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) {
nents = ents[rc.appliedIndex-firstIdx+1:]
}
return
}
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
@ -141,6 +159,9 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
}
}
// after commit, update appliedIndex
rc.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == rc.lastIndex {
select {
@ -273,6 +294,14 @@ func (rc *raftNode) stopHTTP() {
}
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
@ -321,10 +350,12 @@ func (rc *raftNode) serveChannels() {
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rd.CommittedEntries); !ok {
// TODO: apply snapshot
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
// TODO: trigger snapshot
rc.node.Advance()
case err := <-rc.transport.ErrorC: