contrib/raftexample: fix restart path

The ConfChange fix crashes WAL replay because it assumed the node
always exists. Additionally, restart on a single node cluster
would deadlock because it had the wrong raft HardState.

To fix, replay path now goes through the regular raft loop.

Fixes #4474
This commit is contained in:
Anthony Romano 2016-02-09 20:51:36 -08:00
parent 5e47ba3fad
commit 0cb304ec61

View File

@ -41,10 +41,11 @@ type raftNode struct {
commitC chan *string // entries committed to log (k,v)
errorC chan error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
lastIndex uint64 // index of log at start
// raft backing for the commit/error channel
node raft.Node
@ -90,8 +91,8 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore conf changes and empty messages
continue
// ignore empty messages
break
}
s := string(ents[i].Data)
select {
@ -103,7 +104,6 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
@ -118,6 +118,15 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
// special nil commit to signal replay has finished
if ents[i].Index == rc.lastIndex {
select {
case rc.commitC <- nil:
case <-rc.stopc:
return false
}
}
}
return true
}
@ -144,19 +153,22 @@ func (rc *raftNode) openWAL() *wal.WAL {
return w
}
// replayWAL replays WAL entries into the raft instance and the commit
// channel and returns an appendable WAL.
// replayWAL replays WAL entries into the raft instance.
func (rc *raftNode) replayWAL() *wal.WAL {
w := rc.openWAL()
_, _, ents, err := w.ReadAll()
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
rc.publishEntries(ents)
// send nil value so client knows commit channel is current
rc.commitC <- nil
// send nil once lastIndex is published so client knows commit channel is current
if len(ents) > 0 {
rc.lastIndex = ents[len(ents)-1].Index
} else {
rc.commitC <- nil
}
rc.raftStorage.SetHardState(st)
return w
}