From 0cb304ec61019c5ff2f6260245ef30fb6dc9711a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Feb 2016 20:51:36 -0800 Subject: [PATCH] contrib/raftexample: fix restart path The ConfChange fix crashes WAL replay because it assumed the node always exists. Additionally, restart on a single node cluster would deadlock because it had the wrong raft HardState. To fix, replay path now goes through the regular raft loop. Fixes #4474 --- contrib/raftexample/raft.go | 38 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 736e9886b..1ac56824a 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -41,10 +41,11 @@ type raftNode struct { commitC chan *string // entries committed to log (k,v) errorC chan error // errors from raft session - id int // client ID for raft session - peers []string // raft peer URLs - join bool // node is joining an existing cluster - waldir string // path to WAL directory + id int // client ID for raft session + peers []string // raft peer URLs + join bool // node is joining an existing cluster + waldir string // path to WAL directory + lastIndex uint64 // index of log at start // raft backing for the commit/error channel node raft.Node @@ -90,8 +91,8 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { switch ents[i].Type { case raftpb.EntryNormal: if len(ents[i].Data) == 0 { - // ignore conf changes and empty messages - continue + // ignore empty messages + break } s := string(ents[i].Data) select { @@ -103,7 +104,6 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { case raftpb.EntryConfChange: var cc raftpb.ConfChange cc.Unmarshal(ents[i].Data) - rc.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: @@ -118,6 +118,15 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { rc.transport.RemovePeer(types.ID(cc.NodeID)) } } + + // special nil commit to signal replay has finished + if ents[i].Index == rc.lastIndex { + select { + case rc.commitC <- nil: + case <-rc.stopc: + return false + } + } } return true } @@ -144,19 +153,22 @@ func (rc *raftNode) openWAL() *wal.WAL { return w } -// replayWAL replays WAL entries into the raft instance and the commit -// channel and returns an appendable WAL. +// replayWAL replays WAL entries into the raft instance. func (rc *raftNode) replayWAL() *wal.WAL { w := rc.openWAL() - _, _, ents, err := w.ReadAll() + _, st, ents, err := w.ReadAll() if err != nil { log.Fatalf("raftexample: failed to read WAL (%v)", err) } // append to storage so raft starts at the right place in log rc.raftStorage.Append(ents) - rc.publishEntries(ents) - // send nil value so client knows commit channel is current - rc.commitC <- nil + // send nil once lastIndex is published so client knows commit channel is current + if len(ents) > 0 { + rc.lastIndex = ents[len(ents)-1].Index + } else { + rc.commitC <- nil + } + rc.raftStorage.SetHardState(st) return w }