From f87a6f3c1f0ff613c0253441be06f2be05ec746a Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Mon, 25 Aug 2014 12:10:10 -0700 Subject: [PATCH] raft: sift proposals from other message types in Step --- raft/node.go | 29 +++++++++++++++-------------- raft/raft.go | 4 ---- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/raft/node.go b/raft/node.go index d81021fac..2f24d4e61 100644 --- a/raft/node.go +++ b/raft/node.go @@ -19,7 +19,7 @@ func (sr stateResp) containsUpdates(prev stateResp) bool { type Node struct { ctx context.Context - propc chan []byte + propc chan Message recvc chan Message statec chan stateResp tickc chan struct{} @@ -28,7 +28,7 @@ type Node struct { func Start(ctx context.Context, id int64, peers []int64) *Node { n := &Node{ ctx: ctx, - propc: make(chan []byte), + propc: make(chan Message), recvc: make(chan Message), statec: make(chan stateResp), tickc: make(chan struct{}), @@ -67,8 +67,9 @@ func (n *Node) run(r *raft) { } select { - case p := <-propc: - r.propose(p) + case m := <-propc: + m.From = r.id + r.Step(m) case m := <-n.recvc: r.Step(m) // raft never returns an error case <-n.tickc: @@ -94,21 +95,21 @@ func (n *Node) Tick() error { // Propose proposes data be appended to the log. func (n *Node) Propose(ctx context.Context, data []byte) error { - select { - case n.propc <- data: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-n.ctx.Done(): - return n.ctx.Err() - } + return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Data: data}}}) } // Step advances the state machine using m. -func (n *Node) Step(m Message) error { +func (n *Node) Step(ctx context.Context, m Message) error { + ch := n.recvc + if m.Type == msgProp { + ch = n.propc + } + select { - case n.recvc <- m: + case ch <- m: return nil + case <-ctx.Done(): + return ctx.Err() case <-n.ctx.Done(): return n.ctx.Err() } diff --git a/raft/raft.go b/raft/raft.go index 170ca6cda..7c2748116 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -152,10 +152,6 @@ func newRaft(id int64, peers []int64) *raft { func (r *raft) hasLeader() bool { return r.lead != none } -func (r *raft) propose(data []byte) { - r.Step(Message{From: r.id, Type: msgProp, Entries: []Entry{{Data: data}}}) -} - func (r *raft) String() string { s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term) switch r.state {