Merge pull request #1871 from xiang90/fix_node

raft: filter out messages from unknown sender.
This commit is contained in:
Xiang Li 2014-12-05 11:50:34 -08:00
commit c03da80330
2 changed files with 11 additions and 8 deletions

View File

@ -253,7 +253,10 @@ func (n *node) run(r *raft) {
m.From = r.id
r.Step(m)
case m := <-n.recvc:
r.Step(m) // raft never returns an error
// filter out response message from unknow From.
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) {
r.Step(m) // raft never returns an error
}
case cc := <-n.confc:
if cc.NodeID == None {
r.resetPendingConf()
@ -322,9 +325,7 @@ func (n *node) Tick() {
}
}
func (n *node) Campaign(ctx context.Context) error {
return n.step(ctx, pb.Message{Type: pb.MsgHup})
}
func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
@ -332,7 +333,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
func (n *node) Step(ctx context.Context, m pb.Message) error {
// ignore unexpected local messages receiving over network
if m.Type == pb.MsgHup || m.Type == pb.MsgBeat {
if IsLocalMsg(m) {
// TODO: return an error?
return nil
}
@ -365,9 +366,7 @@ func (n *node) step(ctx context.Context, m pb.Message) error {
}
}
func (n *node) Ready() <-chan Ready {
return n.readyc
}
func (n *node) Ready() <-chan Ready { return n.readyc }
func (n *node) Advance() {
select {

View File

@ -44,6 +44,10 @@ func max(a, b uint64) uint64 {
return b
}
func IsLocalMsg(m pb.Message) bool { return m.Type == pb.MsgHup || m.Type == pb.MsgBeat }
func IsResponseMsg(m pb.Message) bool { return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp }
// DescribeMessage returns a concise human-readable description of a
// Message for debugging.
func DescribeMessage(m pb.Message) string {