From 0cb304ec61019c5ff2f6260245ef30fb6dc9711a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Feb 2016 20:51:36 -0800 Subject: [PATCH 1/2] contrib/raftexample: fix restart path The ConfChange fix crashes WAL replay because it assumed the node always exists. Additionally, restart on a single node cluster would deadlock because it had the wrong raft HardState. To fix, replay path now goes through the regular raft loop. Fixes #4474 --- contrib/raftexample/raft.go | 38 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 736e9886b..1ac56824a 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -41,10 +41,11 @@ type raftNode struct { commitC chan *string // entries committed to log (k,v) errorC chan error // errors from raft session - id int // client ID for raft session - peers []string // raft peer URLs - join bool // node is joining an existing cluster - waldir string // path to WAL directory + id int // client ID for raft session + peers []string // raft peer URLs + join bool // node is joining an existing cluster + waldir string // path to WAL directory + lastIndex uint64 // index of log at start // raft backing for the commit/error channel node raft.Node @@ -90,8 +91,8 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { switch ents[i].Type { case raftpb.EntryNormal: if len(ents[i].Data) == 0 { - // ignore conf changes and empty messages - continue + // ignore empty messages + break } s := string(ents[i].Data) select { @@ -103,7 +104,6 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { case raftpb.EntryConfChange: var cc raftpb.ConfChange cc.Unmarshal(ents[i].Data) - rc.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: @@ -118,6 +118,15 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { rc.transport.RemovePeer(types.ID(cc.NodeID)) } } + + // special nil commit to signal replay has finished + if ents[i].Index == rc.lastIndex { + select { + case rc.commitC <- nil: + case <-rc.stopc: + return false + } + } } return true } @@ -144,19 +153,22 @@ func (rc *raftNode) openWAL() *wal.WAL { return w } -// replayWAL replays WAL entries into the raft instance and the commit -// channel and returns an appendable WAL. +// replayWAL replays WAL entries into the raft instance. func (rc *raftNode) replayWAL() *wal.WAL { w := rc.openWAL() - _, _, ents, err := w.ReadAll() + _, st, ents, err := w.ReadAll() if err != nil { log.Fatalf("raftexample: failed to read WAL (%v)", err) } // append to storage so raft starts at the right place in log rc.raftStorage.Append(ents) - rc.publishEntries(ents) - // send nil value so client knows commit channel is current - rc.commitC <- nil + // send nil once lastIndex is published so client knows commit channel is current + if len(ents) > 0 { + rc.lastIndex = ents[len(ents)-1].Index + } else { + rc.commitC <- nil + } + rc.raftStorage.SetHardState(st) return w } From 02b24c58fd1d325c127af8e696886bebbd2c6412 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 10 Feb 2016 10:21:34 -0800 Subject: [PATCH 2/2] contrib/raftexample: fix tests os.Exit() on raft stop breaks out of the test fixture; instead, monitor the error channel and exit on close --- contrib/raftexample/httpapi.go | 9 +++++++++ contrib/raftexample/raft.go | 2 -- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/contrib/raftexample/httpapi.go b/contrib/raftexample/httpapi.go index 9c1af96bb..dee2a4666 100644 --- a/contrib/raftexample/httpapi.go +++ b/contrib/raftexample/httpapi.go @@ -18,6 +18,7 @@ import ( "io/ioutil" "log" "net/http" + "os" "strconv" "github.com/coreos/etcd/raft/raftpb" @@ -104,6 +105,14 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange, commitC <-chan *string, errorC <-chan error) { + // exit when raft goes down + go func() { + if err, ok := <-errorC; ok { + log.Fatal(err) + } + os.Exit(0) + }() + srv := http.Server{ Addr: ":" + strconv.Itoa(port), Handler: &httpKVAPI{ diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 1ac56824a..493e31ec6 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -236,8 +236,6 @@ func (rc *raftNode) stop() { close(rc.commitC) close(rc.errorC) rc.node.Stop() - - os.Exit(0) } func (rc *raftNode) stopHTTP() {