hack(server): notify removed peers when they try to become candidates

A peer might be removed during a network partiton. When it comes back it
will not have received any of the log entries that would have notified
it of its removal and go onto propose a vote. This will disrupt the
cluster and the cluster should give the machine feedback that it is no
longer a member.

The term of a denied vote is MaxUint64. The notification of the removal
is a raft event. These two modification are quick heck.

In reaction to this notification the machine should shutdown. In this
case the shutdown just moves it towards becoming a standby server.
This commit is contained in:
Xiang Li 2014-05-20 10:11:04 -07:00
parent cbcf55dabb
commit 189fece683
3 changed files with 23 additions and 1 deletions

View File

@ -122,6 +122,8 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent)
s.raftServer = raftServer
s.removedInLog = false
@ -644,6 +646,14 @@ func (s *PeerServer) PeerStats() []byte {
return nil
}
// removedEvent handles the case where a machine has been removed from the
// cluster and is notified when it tries to become a candidate.
func (s *PeerServer) removedEvent(event raft.Event) {
// HACK(philips): we need to find a better notification for this.
log.Infof("removed during cluster re-configuration")
s.asyncRemove()
}
// raftEventLogger converts events from the Raft server into log messages.
func (s *PeerServer) raftEventLogger(event raft.Event) {
value := event.Value()

View File

@ -4,9 +4,10 @@ const (
StateChangeEventType = "stateChange"
LeaderChangeEventType = "leaderChange"
TermChangeEventType = "termChange"
CommitEventType = "commit"
CommitEventType = "commit"
AddPeerEventType = "addPeer"
RemovePeerEventType = "removePeer"
RemovedEventType = "removed"
HeartbeatIntervalEventType = "heartbeatInterval"
ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"

View File

@ -6,6 +6,7 @@ import (
"fmt"
"hash/crc32"
"io/ioutil"
"math"
"os"
"path"
"sort"
@ -1040,6 +1041,11 @@ func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
return true
}
if resp.Term == math.MaxUint64 {
s.debugln("got a removal notification, stopping")
s.DispatchEvent(newEvent(RemovedEventType, nil, nil))
}
if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.updateCurrentTerm(resp.Term, "")
@ -1064,6 +1070,11 @@ func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
// Processes a "request vote" request.
func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
// Deny the vote quest if the candidate is not in the current cluster
if _, ok := s.peers[req.CandidateName]; !ok {
s.debugln("server.rv.deny.vote: unknown peer ", req.CandidateName)
return newRequestVoteResponse(math.MaxUint64, false), false
}
// If the request is coming from an old term then reject it.
if req.Term < s.Term() {