raft: filter out messages from unknow sender.

If we cannot find the `m.from` from current peers in the raft and it is a response
message, we should filter it out or raft panics. We are not targetting to avoid
malicious peers.

It has to be done in the raft node layer syncchronously. Although we can check
it at the application layer asynchronously, but after the checking and before
the message going into raft, the raft state machine might make progress and
unfortunately remove the `m.from` peer.
This commit is contained in:
Xiang Li 2014-12-05 10:45:05 -08:00
parent 15aed05071
commit 6409a8bf0d
2 changed files with 11 additions and 8 deletions

View File

@ -253,7 +253,10 @@ func (n *node) run(r *raft) {
m.From = r.id m.From = r.id
r.Step(m) r.Step(m)
case m := <-n.recvc: case m := <-n.recvc:
r.Step(m) // raft never returns an error // filter out response message from unknow From.
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) {
r.Step(m) // raft never returns an error
}
case cc := <-n.confc: case cc := <-n.confc:
if cc.NodeID == None { if cc.NodeID == None {
r.resetPendingConf() r.resetPendingConf()
@ -322,9 +325,7 @@ func (n *node) Tick() {
} }
} }
func (n *node) Campaign(ctx context.Context) error { func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
return n.step(ctx, pb.Message{Type: pb.MsgHup})
}
func (n *node) Propose(ctx context.Context, data []byte) error { func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
@ -332,7 +333,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
func (n *node) Step(ctx context.Context, m pb.Message) error { func (n *node) Step(ctx context.Context, m pb.Message) error {
// ignore unexpected local messages receiving over network // ignore unexpected local messages receiving over network
if m.Type == pb.MsgHup || m.Type == pb.MsgBeat { if IsLocalMsg(m) {
// TODO: return an error? // TODO: return an error?
return nil return nil
} }
@ -365,9 +366,7 @@ func (n *node) step(ctx context.Context, m pb.Message) error {
} }
} }
func (n *node) Ready() <-chan Ready { func (n *node) Ready() <-chan Ready { return n.readyc }
return n.readyc
}
func (n *node) Advance() { func (n *node) Advance() {
select { select {

View File

@ -44,6 +44,10 @@ func max(a, b uint64) uint64 {
return b return b
} }
func IsLocalMsg(m pb.Message) bool { return m.Type == pb.MsgHup || m.Type == pb.MsgBeat }
func IsResponseMsg(m pb.Message) bool { return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp }
// DescribeMessage returns a concise human-readable description of a // DescribeMessage returns a concise human-readable description of a
// Message for debugging. // Message for debugging.
func DescribeMessage(m pb.Message) string { func DescribeMessage(m pb.Message) string {