diff --git a/raft/node.go b/raft/node.go index 364f40219..83f788a44 100644 --- a/raft/node.go +++ b/raft/node.go @@ -253,7 +253,10 @@ func (n *node) run(r *raft) { m.From = r.id r.Step(m) case m := <-n.recvc: - r.Step(m) // raft never returns an error + // filter out response message from unknow From. + if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) { + r.Step(m) // raft never returns an error + } case cc := <-n.confc: if cc.NodeID == None { r.resetPendingConf() @@ -322,9 +325,7 @@ func (n *node) Tick() { } } -func (n *node) Campaign(ctx context.Context) error { - return n.step(ctx, pb.Message{Type: pb.MsgHup}) -} +func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) @@ -332,7 +333,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error { func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network - if m.Type == pb.MsgHup || m.Type == pb.MsgBeat { + if IsLocalMsg(m) { // TODO: return an error? return nil } @@ -365,9 +366,7 @@ func (n *node) step(ctx context.Context, m pb.Message) error { } } -func (n *node) Ready() <-chan Ready { - return n.readyc -} +func (n *node) Ready() <-chan Ready { return n.readyc } func (n *node) Advance() { select { diff --git a/raft/util.go b/raft/util.go index 95af25f3a..5dd585774 100644 --- a/raft/util.go +++ b/raft/util.go @@ -44,6 +44,10 @@ func max(a, b uint64) uint64 { return b } +func IsLocalMsg(m pb.Message) bool { return m.Type == pb.MsgHup || m.Type == pb.MsgBeat } + +func IsResponseMsg(m pb.Message) bool { return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp } + // DescribeMessage returns a concise human-readable description of a // Message for debugging. func DescribeMessage(m pb.Message) string {