mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

This commit introduces machinery to safely apply joint consensus configuration changes to Raft. The main contribution is the new package, `confchange`, which offers the primitives `Simple`, `EnterJoint`, and `LeaveJoint`. The first two take a list of configuration changes. `Simple` only declares success if these configuration changes (applied atomically) change the set of voters by at most one (i.e. it's fine to add or remove any number of learners, but change only one voter). `EnterJoint` makes the configuration joint and then applies the changes to it, in preparation of the caller returning later and transitioning out of the joint config into the final desired configuration via `LeaveJoint()`. This commit streamlines the conversion between voters and learners, which is now generally allowed whenever the above conditions are upheld (i.e. it's not possible to demote a voter and add a new voter in the context of a Simple configuration change, but it is possible via EnterJoint). Previously, we had the artificial restriction that a voter could not be demoted to a learner, but had to be removed first. Even though demoting a learner is generally less useful than promoting a learner (the latter is used to catch up future voters), demotions could see use in improved handling of temporary node unavailability, where it is desired to remove voting power from a down node, but to preserve its data should it return. An additional change that was made in this commit is to prevent the use of empty commit quorums, which was previously possible but for no good reason; this: Closes #10884. The work left to do in a future PR is to actually expose joint configurations to the applications using Raft. This will entail mostly API design and the addition of suitable testing, which to be carried out ergonomically is likely to motivate a larger refactor. Touches #7625.
1532 lines
53 KiB
Go
1532 lines
53 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package raft
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/raft/confchange"
|
|
"go.etcd.io/etcd/raft/quorum"
|
|
pb "go.etcd.io/etcd/raft/raftpb"
|
|
"go.etcd.io/etcd/raft/tracker"
|
|
)
|
|
|
|
// None is a placeholder node ID used when there is no leader.
|
|
const None uint64 = 0
|
|
const noLimit = math.MaxUint64
|
|
|
|
// Possible values for StateType.
|
|
const (
|
|
StateFollower StateType = iota
|
|
StateCandidate
|
|
StateLeader
|
|
StatePreCandidate
|
|
numStates
|
|
)
|
|
|
|
type ReadOnlyOption int
|
|
|
|
const (
|
|
// ReadOnlySafe guarantees the linearizability of the read only request by
|
|
// communicating with the quorum. It is the default and suggested option.
|
|
ReadOnlySafe ReadOnlyOption = iota
|
|
// ReadOnlyLeaseBased ensures linearizability of the read only request by
|
|
// relying on the leader lease. It can be affected by clock drift.
|
|
// If the clock drift is unbounded, leader might keep the lease longer than it
|
|
// should (clock can move backward/pause without any bound). ReadIndex is not safe
|
|
// in that case.
|
|
ReadOnlyLeaseBased
|
|
)
|
|
|
|
// Possible values for CampaignType
|
|
const (
|
|
// campaignPreElection represents the first phase of a normal election when
|
|
// Config.PreVote is true.
|
|
campaignPreElection CampaignType = "CampaignPreElection"
|
|
// campaignElection represents a normal (time-based) election (the second phase
|
|
// of the election when Config.PreVote is true).
|
|
campaignElection CampaignType = "CampaignElection"
|
|
// campaignTransfer represents the type of leader transfer
|
|
campaignTransfer CampaignType = "CampaignTransfer"
|
|
)
|
|
|
|
// ErrProposalDropped is returned when the proposal is ignored by some cases,
|
|
// so that the proposer can be notified and fail fast.
|
|
var ErrProposalDropped = errors.New("raft proposal dropped")
|
|
|
|
// lockedRand is a small wrapper around rand.Rand to provide
|
|
// synchronization among multiple raft groups. Only the methods needed
|
|
// by the code are exposed (e.g. Intn).
|
|
type lockedRand struct {
|
|
mu sync.Mutex
|
|
rand *rand.Rand
|
|
}
|
|
|
|
func (r *lockedRand) Intn(n int) int {
|
|
r.mu.Lock()
|
|
v := r.rand.Intn(n)
|
|
r.mu.Unlock()
|
|
return v
|
|
}
|
|
|
|
var globalRand = &lockedRand{
|
|
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
}
|
|
|
|
// CampaignType represents the type of campaigning
|
|
// the reason we use the type of string instead of uint64
|
|
// is because it's simpler to compare and fill in raft entries
|
|
type CampaignType string
|
|
|
|
// StateType represents the role of a node in a cluster.
|
|
type StateType uint64
|
|
|
|
var stmap = [...]string{
|
|
"StateFollower",
|
|
"StateCandidate",
|
|
"StateLeader",
|
|
"StatePreCandidate",
|
|
}
|
|
|
|
func (st StateType) String() string {
|
|
return stmap[uint64(st)]
|
|
}
|
|
|
|
// Config contains the parameters to start a raft.
|
|
type Config struct {
|
|
// ID is the identity of the local raft. ID cannot be 0.
|
|
ID uint64
|
|
|
|
// peers contains the IDs of all nodes (including self) in the raft cluster. It
|
|
// should only be set when starting a new raft cluster. Restarting raft from
|
|
// previous configuration will panic if peers is set. peer is private and only
|
|
// used for testing right now.
|
|
peers []uint64
|
|
|
|
// learners contains the IDs of all learner nodes (including self if the
|
|
// local node is a learner) in the raft cluster. learners only receives
|
|
// entries from the leader node. It does not vote or promote itself.
|
|
learners []uint64
|
|
|
|
// ElectionTick is the number of Node.Tick invocations that must pass between
|
|
// elections. That is, if a follower does not receive any message from the
|
|
// leader of current term before ElectionTick has elapsed, it will become
|
|
// candidate and start an election. ElectionTick must be greater than
|
|
// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
|
|
// unnecessary leader switching.
|
|
ElectionTick int
|
|
// HeartbeatTick is the number of Node.Tick invocations that must pass between
|
|
// heartbeats. That is, a leader sends heartbeat messages to maintain its
|
|
// leadership every HeartbeatTick ticks.
|
|
HeartbeatTick int
|
|
|
|
// Storage is the storage for raft. raft generates entries and states to be
|
|
// stored in storage. raft reads the persisted entries and states out of
|
|
// Storage when it needs. raft reads out the previous state and configuration
|
|
// out of storage when restarting.
|
|
Storage Storage
|
|
// Applied is the last applied index. It should only be set when restarting
|
|
// raft. raft will not return entries to the application smaller or equal to
|
|
// Applied. If Applied is unset when restarting, raft might return previous
|
|
// applied entries. This is a very application dependent configuration.
|
|
Applied uint64
|
|
|
|
// MaxSizePerMsg limits the max byte size of each append message. Smaller
|
|
// value lowers the raft recovery cost(initial probing and message lost
|
|
// during normal operation). On the other side, it might affect the
|
|
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
|
|
// 0 for at most one entry per message.
|
|
MaxSizePerMsg uint64
|
|
// MaxCommittedSizePerReady limits the size of the committed entries which
|
|
// can be applied.
|
|
MaxCommittedSizePerReady uint64
|
|
// MaxUncommittedEntriesSize limits the aggregate byte size of the
|
|
// uncommitted entries that may be appended to a leader's log. Once this
|
|
// limit is exceeded, proposals will begin to return ErrProposalDropped
|
|
// errors. Note: 0 for no limit.
|
|
MaxUncommittedEntriesSize uint64
|
|
// MaxInflightMsgs limits the max number of in-flight append messages during
|
|
// optimistic replication phase. The application transportation layer usually
|
|
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
|
|
// overflowing that sending buffer. TODO (xiangli): feedback to application to
|
|
// limit the proposal rate?
|
|
MaxInflightMsgs int
|
|
|
|
// CheckQuorum specifies if the leader should check quorum activity. Leader
|
|
// steps down when quorum is not active for an electionTimeout.
|
|
CheckQuorum bool
|
|
|
|
// PreVote enables the Pre-Vote algorithm described in raft thesis section
|
|
// 9.6. This prevents disruption when a node that has been partitioned away
|
|
// rejoins the cluster.
|
|
PreVote bool
|
|
|
|
// ReadOnlyOption specifies how the read only request is processed.
|
|
//
|
|
// ReadOnlySafe guarantees the linearizability of the read only request by
|
|
// communicating with the quorum. It is the default and suggested option.
|
|
//
|
|
// ReadOnlyLeaseBased ensures linearizability of the read only request by
|
|
// relying on the leader lease. It can be affected by clock drift.
|
|
// If the clock drift is unbounded, leader might keep the lease longer than it
|
|
// should (clock can move backward/pause without any bound). ReadIndex is not safe
|
|
// in that case.
|
|
// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
|
|
ReadOnlyOption ReadOnlyOption
|
|
|
|
// Logger is the logger used for raft log. For multinode which can host
|
|
// multiple raft group, each raft group can have its own logger
|
|
Logger Logger
|
|
|
|
// DisableProposalForwarding set to true means that followers will drop
|
|
// proposals, rather than forwarding them to the leader. One use case for
|
|
// this feature would be in a situation where the Raft leader is used to
|
|
// compute the data of a proposal, for example, adding a timestamp from a
|
|
// hybrid logical clock to data in a monotonically increasing way. Forwarding
|
|
// should be disabled to prevent a follower with an inaccurate hybrid
|
|
// logical clock from assigning the timestamp and then forwarding the data
|
|
// to the leader.
|
|
DisableProposalForwarding bool
|
|
}
|
|
|
|
func (c *Config) validate() error {
|
|
if c.ID == None {
|
|
return errors.New("cannot use none as id")
|
|
}
|
|
|
|
if c.HeartbeatTick <= 0 {
|
|
return errors.New("heartbeat tick must be greater than 0")
|
|
}
|
|
|
|
if c.ElectionTick <= c.HeartbeatTick {
|
|
return errors.New("election tick must be greater than heartbeat tick")
|
|
}
|
|
|
|
if c.Storage == nil {
|
|
return errors.New("storage cannot be nil")
|
|
}
|
|
|
|
if c.MaxUncommittedEntriesSize == 0 {
|
|
c.MaxUncommittedEntriesSize = noLimit
|
|
}
|
|
|
|
// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
|
|
// previously the same parameter.
|
|
if c.MaxCommittedSizePerReady == 0 {
|
|
c.MaxCommittedSizePerReady = c.MaxSizePerMsg
|
|
}
|
|
|
|
if c.MaxInflightMsgs <= 0 {
|
|
return errors.New("max inflight messages must be greater than 0")
|
|
}
|
|
|
|
if c.Logger == nil {
|
|
c.Logger = raftLogger
|
|
}
|
|
|
|
if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
|
|
return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type raft struct {
|
|
id uint64
|
|
|
|
Term uint64
|
|
Vote uint64
|
|
|
|
readStates []ReadState
|
|
|
|
// the log
|
|
raftLog *raftLog
|
|
|
|
maxMsgSize uint64
|
|
maxUncommittedSize uint64
|
|
prs tracker.ProgressTracker
|
|
|
|
state StateType
|
|
|
|
// isLearner is true if the local raft node is a learner.
|
|
isLearner bool
|
|
|
|
msgs []pb.Message
|
|
|
|
// the leader id
|
|
lead uint64
|
|
// leadTransferee is id of the leader transfer target when its value is not zero.
|
|
// Follow the procedure defined in raft thesis 3.10.
|
|
leadTransferee uint64
|
|
// Only one conf change may be pending (in the log, but not yet
|
|
// applied) at a time. This is enforced via pendingConfIndex, which
|
|
// is set to a value >= the log index of the latest pending
|
|
// configuration change (if any). Config changes are only allowed to
|
|
// be proposed if the leader's applied index is greater than this
|
|
// value.
|
|
pendingConfIndex uint64
|
|
// an estimate of the size of the uncommitted tail of the Raft log. Used to
|
|
// prevent unbounded log growth. Only maintained by the leader. Reset on
|
|
// term changes.
|
|
uncommittedSize uint64
|
|
|
|
readOnly *readOnly
|
|
|
|
// number of ticks since it reached last electionTimeout when it is leader
|
|
// or candidate.
|
|
// number of ticks since it reached last electionTimeout or received a
|
|
// valid message from current leader when it is a follower.
|
|
electionElapsed int
|
|
|
|
// number of ticks since it reached last heartbeatTimeout.
|
|
// only leader keeps heartbeatElapsed.
|
|
heartbeatElapsed int
|
|
|
|
checkQuorum bool
|
|
preVote bool
|
|
|
|
heartbeatTimeout int
|
|
electionTimeout int
|
|
// randomizedElectionTimeout is a random number between
|
|
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
|
|
// when raft changes its state to follower or candidate.
|
|
randomizedElectionTimeout int
|
|
disableProposalForwarding bool
|
|
|
|
tick func()
|
|
step stepFunc
|
|
|
|
logger Logger
|
|
}
|
|
|
|
func newRaft(c *Config) *raft {
|
|
if err := c.validate(); err != nil {
|
|
panic(err.Error())
|
|
}
|
|
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
|
|
hs, cs, err := c.Storage.InitialState()
|
|
if err != nil {
|
|
panic(err) // TODO(bdarnell)
|
|
}
|
|
peers := c.peers
|
|
learners := c.learners
|
|
if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
|
|
if len(peers) > 0 || len(learners) > 0 {
|
|
// TODO(bdarnell): the peers argument is always nil except in
|
|
// tests; the argument should be removed and these tests should be
|
|
// updated to specify their nodes through a snapshot.
|
|
panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
|
|
}
|
|
peers = cs.Nodes
|
|
learners = cs.Learners
|
|
}
|
|
r := &raft{
|
|
id: c.ID,
|
|
lead: None,
|
|
isLearner: false,
|
|
raftLog: raftlog,
|
|
maxMsgSize: c.MaxSizePerMsg,
|
|
maxUncommittedSize: c.MaxUncommittedEntriesSize,
|
|
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
|
|
electionTimeout: c.ElectionTick,
|
|
heartbeatTimeout: c.HeartbeatTick,
|
|
logger: c.Logger,
|
|
checkQuorum: c.CheckQuorum,
|
|
preVote: c.PreVote,
|
|
readOnly: newReadOnly(c.ReadOnlyOption),
|
|
disableProposalForwarding: c.DisableProposalForwarding,
|
|
}
|
|
for _, p := range peers {
|
|
// Add node to active config.
|
|
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
|
|
}
|
|
for _, p := range learners {
|
|
// Add learner to active config.
|
|
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
|
|
}
|
|
|
|
if !isHardStateEqual(hs, emptyState) {
|
|
r.loadState(hs)
|
|
}
|
|
if c.Applied > 0 {
|
|
raftlog.appliedTo(c.Applied)
|
|
}
|
|
r.becomeFollower(r.Term, None)
|
|
|
|
var nodesStrs []string
|
|
for _, n := range r.prs.VoterNodes() {
|
|
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
|
}
|
|
|
|
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
|
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
|
return r
|
|
}
|
|
|
|
func (r *raft) hasLeader() bool { return r.lead != None }
|
|
|
|
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
|
|
|
func (r *raft) hardState() pb.HardState {
|
|
return pb.HardState{
|
|
Term: r.Term,
|
|
Vote: r.Vote,
|
|
Commit: r.raftLog.committed,
|
|
}
|
|
}
|
|
|
|
// send persists state to stable storage and then sends to its mailbox.
|
|
func (r *raft) send(m pb.Message) {
|
|
m.From = r.id
|
|
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
|
|
if m.Term == 0 {
|
|
// All {pre-,}campaign messages need to have the term set when
|
|
// sending.
|
|
// - MsgVote: m.Term is the term the node is campaigning for,
|
|
// non-zero as we increment the term when campaigning.
|
|
// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
|
|
// granted, non-zero for the same reason MsgVote is
|
|
// - MsgPreVote: m.Term is the term the node will campaign,
|
|
// non-zero as we use m.Term to indicate the next term we'll be
|
|
// campaigning for
|
|
// - MsgPreVoteResp: m.Term is the term received in the original
|
|
// MsgPreVote if the pre-vote was granted, non-zero for the
|
|
// same reasons MsgPreVote is
|
|
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
|
|
}
|
|
} else {
|
|
if m.Term != 0 {
|
|
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
|
|
}
|
|
// do not attach term to MsgProp, MsgReadIndex
|
|
// proposals are a way to forward to the leader and
|
|
// should be treated as local message.
|
|
// MsgReadIndex is also forwarded to leader.
|
|
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
|
|
m.Term = r.Term
|
|
}
|
|
}
|
|
r.msgs = append(r.msgs, m)
|
|
}
|
|
|
|
// sendAppend sends an append RPC with new entries (if any) and the
|
|
// current commit index to the given peer.
|
|
func (r *raft) sendAppend(to uint64) {
|
|
r.maybeSendAppend(to, true)
|
|
}
|
|
|
|
// maybeSendAppend sends an append RPC with new entries to the given peer,
|
|
// if necessary. Returns true if a message was sent. The sendIfEmpty
|
|
// argument controls whether messages with no entries will be sent
|
|
// ("empty" messages are useful to convey updated Commit indexes, but
|
|
// are undesirable when we're sending multiple messages in a batch).
|
|
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
|
pr := r.prs.Progress[to]
|
|
if pr.IsPaused() {
|
|
return false
|
|
}
|
|
m := pb.Message{}
|
|
m.To = to
|
|
|
|
term, errt := r.raftLog.term(pr.Next - 1)
|
|
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
|
if len(ents) == 0 && !sendIfEmpty {
|
|
return false
|
|
}
|
|
|
|
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
|
|
if !pr.RecentActive {
|
|
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
|
|
return false
|
|
}
|
|
|
|
m.Type = pb.MsgSnap
|
|
snapshot, err := r.raftLog.snapshot()
|
|
if err != nil {
|
|
if err == ErrSnapshotTemporarilyUnavailable {
|
|
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
|
|
return false
|
|
}
|
|
panic(err) // TODO(bdarnell)
|
|
}
|
|
if IsEmptySnap(snapshot) {
|
|
panic("need non-empty snapshot")
|
|
}
|
|
m.Snapshot = snapshot
|
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
|
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
|
|
pr.BecomeSnapshot(sindex)
|
|
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
|
} else {
|
|
m.Type = pb.MsgApp
|
|
m.Index = pr.Next - 1
|
|
m.LogTerm = term
|
|
m.Entries = ents
|
|
m.Commit = r.raftLog.committed
|
|
if n := len(m.Entries); n != 0 {
|
|
switch pr.State {
|
|
// optimistically increase the next when in StateReplicate
|
|
case tracker.StateReplicate:
|
|
last := m.Entries[n-1].Index
|
|
pr.OptimisticUpdate(last)
|
|
pr.Inflights.Add(last)
|
|
case tracker.StateProbe:
|
|
pr.ProbeSent = true
|
|
default:
|
|
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
|
|
}
|
|
}
|
|
}
|
|
r.send(m)
|
|
return true
|
|
}
|
|
|
|
// sendHeartbeat sends a heartbeat RPC to the given peer.
|
|
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
|
|
// Attach the commit as min(to.matched, r.committed).
|
|
// When the leader sends out heartbeat message,
|
|
// the receiver(follower) might not be matched with the leader
|
|
// or it might not have all the committed entries.
|
|
// The leader MUST NOT forward the follower's commit to
|
|
// an unmatched index.
|
|
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
|
|
m := pb.Message{
|
|
To: to,
|
|
Type: pb.MsgHeartbeat,
|
|
Commit: commit,
|
|
Context: ctx,
|
|
}
|
|
|
|
r.send(m)
|
|
}
|
|
|
|
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
|
|
// according to the progress recorded in r.prs.
|
|
func (r *raft) bcastAppend() {
|
|
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
|
|
if id == r.id {
|
|
return
|
|
}
|
|
|
|
r.sendAppend(id)
|
|
})
|
|
}
|
|
|
|
// bcastHeartbeat sends RPC, without entries to all the peers.
|
|
func (r *raft) bcastHeartbeat() {
|
|
lastCtx := r.readOnly.lastPendingRequestCtx()
|
|
if len(lastCtx) == 0 {
|
|
r.bcastHeartbeatWithCtx(nil)
|
|
} else {
|
|
r.bcastHeartbeatWithCtx([]byte(lastCtx))
|
|
}
|
|
}
|
|
|
|
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
|
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
|
|
if id == r.id {
|
|
return
|
|
}
|
|
r.sendHeartbeat(id, ctx)
|
|
})
|
|
}
|
|
|
|
// maybeCommit attempts to advance the commit index. Returns true if
|
|
// the commit index changed (in which case the caller should call
|
|
// r.bcastAppend).
|
|
func (r *raft) maybeCommit() bool {
|
|
mci := r.prs.Committed()
|
|
return r.raftLog.maybeCommit(mci, r.Term)
|
|
}
|
|
|
|
func (r *raft) reset(term uint64) {
|
|
if r.Term != term {
|
|
r.Term = term
|
|
r.Vote = None
|
|
}
|
|
r.lead = None
|
|
|
|
r.electionElapsed = 0
|
|
r.heartbeatElapsed = 0
|
|
r.resetRandomizedElectionTimeout()
|
|
|
|
r.abortLeaderTransfer()
|
|
|
|
r.prs.ResetVotes()
|
|
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
|
|
*pr = tracker.Progress{
|
|
Match: 0,
|
|
Next: r.raftLog.lastIndex() + 1,
|
|
Inflights: tracker.NewInflights(r.prs.MaxInflight),
|
|
IsLearner: pr.IsLearner,
|
|
}
|
|
if id == r.id {
|
|
pr.Match = r.raftLog.lastIndex()
|
|
}
|
|
})
|
|
|
|
r.pendingConfIndex = 0
|
|
r.uncommittedSize = 0
|
|
r.readOnly = newReadOnly(r.readOnly.option)
|
|
}
|
|
|
|
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
|
|
li := r.raftLog.lastIndex()
|
|
for i := range es {
|
|
es[i].Term = r.Term
|
|
es[i].Index = li + 1 + uint64(i)
|
|
}
|
|
// Track the size of this uncommitted proposal.
|
|
if !r.increaseUncommittedSize(es) {
|
|
r.logger.Debugf(
|
|
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
|
|
r.id,
|
|
)
|
|
// Drop the proposal.
|
|
return false
|
|
}
|
|
// use latest "last" index after truncate/append
|
|
li = r.raftLog.append(es...)
|
|
r.prs.Progress[r.id].MaybeUpdate(li)
|
|
// Regardless of maybeCommit's return, our caller will call bcastAppend.
|
|
r.maybeCommit()
|
|
return true
|
|
}
|
|
|
|
// tickElection is run by followers and candidates after r.electionTimeout.
|
|
func (r *raft) tickElection() {
|
|
r.electionElapsed++
|
|
|
|
if r.promotable() && r.pastElectionTimeout() {
|
|
r.electionElapsed = 0
|
|
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
|
|
}
|
|
}
|
|
|
|
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
|
|
func (r *raft) tickHeartbeat() {
|
|
r.heartbeatElapsed++
|
|
r.electionElapsed++
|
|
|
|
if r.electionElapsed >= r.electionTimeout {
|
|
r.electionElapsed = 0
|
|
if r.checkQuorum {
|
|
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
|
|
}
|
|
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
|
|
if r.state == StateLeader && r.leadTransferee != None {
|
|
r.abortLeaderTransfer()
|
|
}
|
|
}
|
|
|
|
if r.state != StateLeader {
|
|
return
|
|
}
|
|
|
|
if r.heartbeatElapsed >= r.heartbeatTimeout {
|
|
r.heartbeatElapsed = 0
|
|
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
|
|
}
|
|
}
|
|
|
|
func (r *raft) becomeFollower(term uint64, lead uint64) {
|
|
r.step = stepFollower
|
|
r.reset(term)
|
|
r.tick = r.tickElection
|
|
r.lead = lead
|
|
r.state = StateFollower
|
|
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) becomeCandidate() {
|
|
// TODO(xiangli) remove the panic when the raft implementation is stable
|
|
if r.state == StateLeader {
|
|
panic("invalid transition [leader -> candidate]")
|
|
}
|
|
r.step = stepCandidate
|
|
r.reset(r.Term + 1)
|
|
r.tick = r.tickElection
|
|
r.Vote = r.id
|
|
r.state = StateCandidate
|
|
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) becomePreCandidate() {
|
|
// TODO(xiangli) remove the panic when the raft implementation is stable
|
|
if r.state == StateLeader {
|
|
panic("invalid transition [leader -> pre-candidate]")
|
|
}
|
|
// Becoming a pre-candidate changes our step functions and state,
|
|
// but doesn't change anything else. In particular it does not increase
|
|
// r.Term or change r.Vote.
|
|
r.step = stepCandidate
|
|
r.prs.ResetVotes()
|
|
r.tick = r.tickElection
|
|
r.lead = None
|
|
r.state = StatePreCandidate
|
|
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) becomeLeader() {
|
|
// TODO(xiangli) remove the panic when the raft implementation is stable
|
|
if r.state == StateFollower {
|
|
panic("invalid transition [follower -> leader]")
|
|
}
|
|
r.step = stepLeader
|
|
r.reset(r.Term)
|
|
r.tick = r.tickHeartbeat
|
|
r.lead = r.id
|
|
r.state = StateLeader
|
|
// Followers enter replicate mode when they've been successfully probed
|
|
// (perhaps after having received a snapshot as a result). The leader is
|
|
// trivially in this state. Note that r.reset() has initialized this
|
|
// progress with the last index already.
|
|
r.prs.Progress[r.id].BecomeReplicate()
|
|
|
|
// Conservatively set the pendingConfIndex to the last index in the
|
|
// log. There may or may not be a pending config change, but it's
|
|
// safe to delay any future proposals until we commit all our
|
|
// pending log entries, and scanning the entire tail of the log
|
|
// could be expensive.
|
|
r.pendingConfIndex = r.raftLog.lastIndex()
|
|
|
|
emptyEnt := pb.Entry{Data: nil}
|
|
if !r.appendEntry(emptyEnt) {
|
|
// This won't happen because we just called reset() above.
|
|
r.logger.Panic("empty entry was dropped")
|
|
}
|
|
// As a special case, don't count the initial empty entry towards the
|
|
// uncommitted log quota. This is because we want to preserve the
|
|
// behavior of allowing one entry larger than quota if the current
|
|
// usage is zero.
|
|
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
|
|
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
|
|
}
|
|
|
|
// campaign transitions the raft instance to candidate state. This must only be
|
|
// called after verifying that this is a legitimate transition.
|
|
func (r *raft) campaign(t CampaignType) {
|
|
if !r.promotable() {
|
|
// This path should not be hit (callers are supposed to check), but
|
|
// better safe than sorry.
|
|
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
|
|
}
|
|
var term uint64
|
|
var voteMsg pb.MessageType
|
|
if t == campaignPreElection {
|
|
r.becomePreCandidate()
|
|
voteMsg = pb.MsgPreVote
|
|
// PreVote RPCs are sent for the next term before we've incremented r.Term.
|
|
term = r.Term + 1
|
|
} else {
|
|
r.becomeCandidate()
|
|
voteMsg = pb.MsgVote
|
|
term = r.Term
|
|
}
|
|
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
|
|
// We won the election after voting for ourselves (which must mean that
|
|
// this is a single-node cluster). Advance to the next state.
|
|
if t == campaignPreElection {
|
|
r.campaign(campaignElection)
|
|
} else {
|
|
r.becomeLeader()
|
|
}
|
|
return
|
|
}
|
|
for id := range r.prs.Voters.IDs() {
|
|
if id == r.id {
|
|
continue
|
|
}
|
|
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
|
|
|
|
var ctx []byte
|
|
if t == campaignTransfer {
|
|
ctx = []byte(t)
|
|
}
|
|
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
|
|
}
|
|
}
|
|
|
|
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
|
|
if v {
|
|
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
|
|
} else {
|
|
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
|
|
}
|
|
r.prs.RecordVote(id, v)
|
|
return r.prs.TallyVotes()
|
|
}
|
|
|
|
func (r *raft) Step(m pb.Message) error {
|
|
// Handle the message term, which may result in our stepping down to a follower.
|
|
switch {
|
|
case m.Term == 0:
|
|
// local message
|
|
case m.Term > r.Term:
|
|
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
|
|
force := bytes.Equal(m.Context, []byte(campaignTransfer))
|
|
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
|
|
if !force && inLease {
|
|
// If a server receives a RequestVote request within the minimum election timeout
|
|
// of hearing from a current leader, it does not update its term or grant its vote
|
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
|
|
return nil
|
|
}
|
|
}
|
|
switch {
|
|
case m.Type == pb.MsgPreVote:
|
|
// Never change our term in response to a PreVote
|
|
case m.Type == pb.MsgPreVoteResp && !m.Reject:
|
|
// We send pre-vote requests with a term in our future. If the
|
|
// pre-vote is granted, we will increment our term when we get a
|
|
// quorum. If it is not, the term comes from the node that
|
|
// rejected our vote so we should become a follower at the new
|
|
// term.
|
|
default:
|
|
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
|
|
r.id, r.Term, m.Type, m.From, m.Term)
|
|
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
|
|
r.becomeFollower(m.Term, m.From)
|
|
} else {
|
|
r.becomeFollower(m.Term, None)
|
|
}
|
|
}
|
|
|
|
case m.Term < r.Term:
|
|
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
|
|
// We have received messages from a leader at a lower term. It is possible
|
|
// that these messages were simply delayed in the network, but this could
|
|
// also mean that this node has advanced its term number during a network
|
|
// partition, and it is now unable to either win an election or to rejoin
|
|
// the majority on the old term. If checkQuorum is false, this will be
|
|
// handled by incrementing term numbers in response to MsgVote with a
|
|
// higher term, but if checkQuorum is true we may not advance the term on
|
|
// MsgVote and must generate other messages to advance the term. The net
|
|
// result of these two features is to minimize the disruption caused by
|
|
// nodes that have been removed from the cluster's configuration: a
|
|
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
|
|
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
|
|
// disruptive term increases, by notifying leader of this node's activeness.
|
|
// The above comments also true for Pre-Vote
|
|
//
|
|
// When follower gets isolated, it soon starts an election ending
|
|
// up with a higher term than leader, although it won't receive enough
|
|
// votes to win the election. When it regains connectivity, this response
|
|
// with "pb.MsgAppResp" of higher term would force leader to step down.
|
|
// However, this disruption is inevitable to free this stuck node with
|
|
// fresh election. This can be prevented with Pre-Vote phase.
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
|
|
} else if m.Type == pb.MsgPreVote {
|
|
// Before Pre-Vote enable, there may have candidate with higher term,
|
|
// but less log. After update to Pre-Vote, the cluster may deadlock if
|
|
// we drop messages with a lower term.
|
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
|
|
} else {
|
|
// ignore other cases
|
|
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
|
r.id, r.Term, m.Type, m.From, m.Term)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
switch m.Type {
|
|
case pb.MsgHup:
|
|
if r.state != StateLeader {
|
|
if !r.promotable() {
|
|
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
|
|
return nil
|
|
}
|
|
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
|
|
if err != nil {
|
|
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
|
|
}
|
|
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
|
|
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
|
|
return nil
|
|
}
|
|
|
|
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
|
if r.preVote {
|
|
r.campaign(campaignPreElection)
|
|
} else {
|
|
r.campaign(campaignElection)
|
|
}
|
|
} else {
|
|
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
|
}
|
|
|
|
case pb.MsgVote, pb.MsgPreVote:
|
|
if r.isLearner {
|
|
// TODO: learner may need to vote, in case of node down when confchange.
|
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
|
|
return nil
|
|
}
|
|
// We can vote if this is a repeat of a vote we've already cast...
|
|
canVote := r.Vote == m.From ||
|
|
// ...we haven't voted and we don't think there's a leader yet in this term...
|
|
(r.Vote == None && r.lead == None) ||
|
|
// ...or this is a PreVote for a future term...
|
|
(m.Type == pb.MsgPreVote && m.Term > r.Term)
|
|
// ...and we believe the candidate is up to date.
|
|
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
|
|
// When responding to Msg{Pre,}Vote messages we include the term
|
|
// from the message, not the local term. To see why, consider the
|
|
// case where a single node was previously partitioned away and
|
|
// it's local term is now out of date. If we include the local term
|
|
// (recall that for pre-votes we don't update the local term), the
|
|
// (pre-)campaigning node on the other end will proceed to ignore
|
|
// the message (it ignores all out of date messages).
|
|
// The term in the original message and current local term are the
|
|
// same in the case of regular votes, but different for pre-votes.
|
|
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
|
|
if m.Type == pb.MsgVote {
|
|
// Only record real votes.
|
|
r.electionElapsed = 0
|
|
r.Vote = m.From
|
|
}
|
|
} else {
|
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
|
|
}
|
|
|
|
default:
|
|
err := r.step(r, m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type stepFunc func(r *raft, m pb.Message) error
|
|
|
|
func stepLeader(r *raft, m pb.Message) error {
|
|
// These message types do not require any progress for m.From.
|
|
switch m.Type {
|
|
case pb.MsgBeat:
|
|
r.bcastHeartbeat()
|
|
return nil
|
|
case pb.MsgCheckQuorum:
|
|
// The leader should always see itself as active. As a precaution, handle
|
|
// the case in which the leader isn't in the configuration any more (for
|
|
// example if it just removed itself).
|
|
//
|
|
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
|
|
// leader steps down when removing itself. I might be missing something.
|
|
if pr := r.prs.Progress[r.id]; pr != nil {
|
|
pr.RecentActive = true
|
|
}
|
|
if !r.prs.QuorumActive() {
|
|
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
|
|
r.becomeFollower(r.Term, None)
|
|
}
|
|
// Mark everyone (but ourselves) as inactive in preparation for the next
|
|
// CheckQuorum.
|
|
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
|
|
if id != r.id {
|
|
pr.RecentActive = false
|
|
}
|
|
})
|
|
return nil
|
|
case pb.MsgProp:
|
|
if len(m.Entries) == 0 {
|
|
r.logger.Panicf("%x stepped empty MsgProp", r.id)
|
|
}
|
|
if r.prs.Progress[r.id] == nil {
|
|
// If we are not currently a member of the range (i.e. this node
|
|
// was removed from the configuration while serving as leader),
|
|
// drop any new proposals.
|
|
return ErrProposalDropped
|
|
}
|
|
if r.leadTransferee != None {
|
|
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
|
|
return ErrProposalDropped
|
|
}
|
|
|
|
for i := range m.Entries {
|
|
e := &m.Entries[i]
|
|
if e.Type == pb.EntryConfChange {
|
|
if r.pendingConfIndex > r.raftLog.applied {
|
|
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
|
|
e, r.pendingConfIndex, r.raftLog.applied)
|
|
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
|
|
} else {
|
|
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
|
|
}
|
|
}
|
|
}
|
|
|
|
if !r.appendEntry(m.Entries...) {
|
|
return ErrProposalDropped
|
|
}
|
|
r.bcastAppend()
|
|
return nil
|
|
case pb.MsgReadIndex:
|
|
// If more than the local vote is needed, go through a full broadcast,
|
|
// otherwise optimize.
|
|
if !r.prs.IsSingleton() {
|
|
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
|
|
// Reject read only request when this leader has not committed any log entry at its term.
|
|
return nil
|
|
}
|
|
|
|
// thinking: use an interally defined context instead of the user given context.
|
|
// We can express this in terms of the term and index instead of a user-supplied value.
|
|
// This would allow multiple reads to piggyback on the same message.
|
|
switch r.readOnly.option {
|
|
case ReadOnlySafe:
|
|
r.readOnly.addRequest(r.raftLog.committed, m)
|
|
// The local node automatically acks the request.
|
|
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
|
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
|
case ReadOnlyLeaseBased:
|
|
ri := r.raftLog.committed
|
|
if m.From == None || m.From == r.id { // from local member
|
|
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
|
} else {
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
|
}
|
|
}
|
|
} else { // only one voting member (the leader) in the cluster
|
|
if m.From == None || m.From == r.id { // from leader itself
|
|
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
|
} else { // from learner member
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// All other message types require a progress for m.From (pr).
|
|
pr := r.prs.Progress[m.From]
|
|
if pr == nil {
|
|
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
|
|
return nil
|
|
}
|
|
switch m.Type {
|
|
case pb.MsgAppResp:
|
|
pr.RecentActive = true
|
|
|
|
if m.Reject {
|
|
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
|
|
r.id, m.RejectHint, m.From, m.Index)
|
|
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
|
|
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
|
|
if pr.State == tracker.StateReplicate {
|
|
pr.BecomeProbe()
|
|
}
|
|
r.sendAppend(m.From)
|
|
}
|
|
} else {
|
|
oldPaused := pr.IsPaused()
|
|
if pr.MaybeUpdate(m.Index) {
|
|
switch {
|
|
case pr.State == tracker.StateProbe:
|
|
pr.BecomeReplicate()
|
|
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
|
|
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
// Transition back to replicating state via probing state
|
|
// (which takes the snapshot into account). If we didn't
|
|
// move to replicating state, that would only happen with
|
|
// the next round of appends (but there may not be a next
|
|
// round for a while, exposing an inconsistent RaftStatus).
|
|
pr.BecomeProbe()
|
|
pr.BecomeReplicate()
|
|
case pr.State == tracker.StateReplicate:
|
|
pr.Inflights.FreeLE(m.Index)
|
|
}
|
|
|
|
if r.maybeCommit() {
|
|
r.bcastAppend()
|
|
} else if oldPaused {
|
|
// If we were paused before, this node may be missing the
|
|
// latest commit index, so send it.
|
|
r.sendAppend(m.From)
|
|
}
|
|
// We've updated flow control information above, which may
|
|
// allow us to send multiple (size-limited) in-flight messages
|
|
// at once (such as when transitioning from probe to
|
|
// replicate, or when freeTo() covers multiple messages). If
|
|
// we have more entries to send, send as many messages as we
|
|
// can (without sending empty messages for the commit index)
|
|
for r.maybeSendAppend(m.From, false) {
|
|
}
|
|
// Transfer leadership is in progress.
|
|
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
|
|
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
|
|
r.sendTimeoutNow(m.From)
|
|
}
|
|
}
|
|
}
|
|
case pb.MsgHeartbeatResp:
|
|
pr.RecentActive = true
|
|
pr.ProbeSent = false
|
|
|
|
// free one slot for the full inflights window to allow progress.
|
|
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
|
|
pr.Inflights.FreeFirstOne()
|
|
}
|
|
if pr.Match < r.raftLog.lastIndex() {
|
|
r.sendAppend(m.From)
|
|
}
|
|
|
|
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
|
|
return nil
|
|
}
|
|
|
|
rss := r.readOnly.advance(m)
|
|
for _, rs := range rss {
|
|
req := rs.req
|
|
if req.From == None || req.From == r.id { // from local member
|
|
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
|
|
} else {
|
|
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
|
|
}
|
|
}
|
|
case pb.MsgSnapStatus:
|
|
if pr.State != tracker.StateSnapshot {
|
|
return nil
|
|
}
|
|
// TODO(tbg): this code is very similar to the snapshot handling in
|
|
// MsgAppResp above. In fact, the code there is more correct than the
|
|
// code here and should likely be updated to match (or even better, the
|
|
// logic pulled into a newly created Progress state machine handler).
|
|
if !m.Reject {
|
|
pr.BecomeProbe()
|
|
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
} else {
|
|
// NB: the order here matters or we'll be probing erroneously from
|
|
// the snapshot index, but the snapshot never applied.
|
|
pr.PendingSnapshot = 0
|
|
pr.BecomeProbe()
|
|
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
}
|
|
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
|
// out the next msgApp.
|
|
// If snapshot failure, wait for a heartbeat interval before next try
|
|
pr.ProbeSent = true
|
|
case pb.MsgUnreachable:
|
|
// During optimistic replication, if the remote becomes unreachable,
|
|
// there is huge probability that a MsgApp is lost.
|
|
if pr.State == tracker.StateReplicate {
|
|
pr.BecomeProbe()
|
|
}
|
|
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
|
|
case pb.MsgTransferLeader:
|
|
if pr.IsLearner {
|
|
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
|
|
return nil
|
|
}
|
|
leadTransferee := m.From
|
|
lastLeadTransferee := r.leadTransferee
|
|
if lastLeadTransferee != None {
|
|
if lastLeadTransferee == leadTransferee {
|
|
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
|
|
r.id, r.Term, leadTransferee, leadTransferee)
|
|
return nil
|
|
}
|
|
r.abortLeaderTransfer()
|
|
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
|
|
}
|
|
if leadTransferee == r.id {
|
|
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
|
|
return nil
|
|
}
|
|
// Transfer leadership to third party.
|
|
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
|
|
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
|
|
r.electionElapsed = 0
|
|
r.leadTransferee = leadTransferee
|
|
if pr.Match == r.raftLog.lastIndex() {
|
|
r.sendTimeoutNow(leadTransferee)
|
|
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
|
|
} else {
|
|
r.sendAppend(leadTransferee)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
|
|
// whether they respond to MsgVoteResp or MsgPreVoteResp.
|
|
func stepCandidate(r *raft, m pb.Message) error {
|
|
// Only handle vote responses corresponding to our candidacy (while in
|
|
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
|
|
// our pre-candidate state).
|
|
var myVoteRespType pb.MessageType
|
|
if r.state == StatePreCandidate {
|
|
myVoteRespType = pb.MsgPreVoteResp
|
|
} else {
|
|
myVoteRespType = pb.MsgVoteResp
|
|
}
|
|
switch m.Type {
|
|
case pb.MsgProp:
|
|
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
|
return ErrProposalDropped
|
|
case pb.MsgApp:
|
|
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
|
|
r.handleAppendEntries(m)
|
|
case pb.MsgHeartbeat:
|
|
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
|
|
r.handleHeartbeat(m)
|
|
case pb.MsgSnap:
|
|
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
|
|
r.handleSnapshot(m)
|
|
case myVoteRespType:
|
|
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
|
|
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
|
|
switch res {
|
|
case quorum.VoteWon:
|
|
if r.state == StatePreCandidate {
|
|
r.campaign(campaignElection)
|
|
} else {
|
|
r.becomeLeader()
|
|
r.bcastAppend()
|
|
}
|
|
case quorum.VoteLost:
|
|
// pb.MsgPreVoteResp contains future term of pre-candidate
|
|
// m.Term > r.Term; reuse r.Term
|
|
r.becomeFollower(r.Term, None)
|
|
}
|
|
case pb.MsgTimeoutNow:
|
|
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func stepFollower(r *raft, m pb.Message) error {
|
|
switch m.Type {
|
|
case pb.MsgProp:
|
|
if r.lead == None {
|
|
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
|
return ErrProposalDropped
|
|
} else if r.disableProposalForwarding {
|
|
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
|
|
return ErrProposalDropped
|
|
}
|
|
m.To = r.lead
|
|
r.send(m)
|
|
case pb.MsgApp:
|
|
r.electionElapsed = 0
|
|
r.lead = m.From
|
|
r.handleAppendEntries(m)
|
|
case pb.MsgHeartbeat:
|
|
r.electionElapsed = 0
|
|
r.lead = m.From
|
|
r.handleHeartbeat(m)
|
|
case pb.MsgSnap:
|
|
r.electionElapsed = 0
|
|
r.lead = m.From
|
|
r.handleSnapshot(m)
|
|
case pb.MsgTransferLeader:
|
|
if r.lead == None {
|
|
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
|
|
return nil
|
|
}
|
|
m.To = r.lead
|
|
r.send(m)
|
|
case pb.MsgTimeoutNow:
|
|
if r.promotable() {
|
|
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
|
|
// Leadership transfers never use pre-vote even if r.preVote is true; we
|
|
// know we are not recovering from a partition so there is no need for the
|
|
// extra round trip.
|
|
r.campaign(campaignTransfer)
|
|
} else {
|
|
r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
|
|
}
|
|
case pb.MsgReadIndex:
|
|
if r.lead == None {
|
|
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
|
|
return nil
|
|
}
|
|
m.To = r.lead
|
|
r.send(m)
|
|
case pb.MsgReadIndexResp:
|
|
if len(m.Entries) != 1 {
|
|
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
|
|
return nil
|
|
}
|
|
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *raft) handleAppendEntries(m pb.Message) {
|
|
if m.Index < r.raftLog.committed {
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
return
|
|
}
|
|
|
|
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
|
} else {
|
|
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
|
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
|
}
|
|
}
|
|
|
|
func (r *raft) handleHeartbeat(m pb.Message) {
|
|
r.raftLog.commitTo(m.Commit)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
|
|
}
|
|
|
|
func (r *raft) handleSnapshot(m pb.Message) {
|
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
|
if r.restore(m.Snapshot) {
|
|
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
|
r.id, r.raftLog.committed, sindex, sterm)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
|
} else {
|
|
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
|
r.id, r.raftLog.committed, sindex, sterm)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
}
|
|
}
|
|
|
|
// restore recovers the state machine from a snapshot. It restores the log and the
|
|
// configuration of state machine. If this method returns false, the snapshot was
|
|
// ignored, either because it was obsolete or because of an error.
|
|
func (r *raft) restore(s pb.Snapshot) bool {
|
|
if s.Metadata.Index <= r.raftLog.committed {
|
|
return false
|
|
}
|
|
if r.state != StateFollower {
|
|
// This is defense-in-depth: if the leader somehow ended up applying a
|
|
// snapshot, it could move into a new term without moving into a
|
|
// follower state. This should never fire, but if it did, we'd have
|
|
// prevented damage by returning early, so log only a loud warning.
|
|
//
|
|
// At the time of writing, the instance is guaranteed to be in follower
|
|
// state when this method is called.
|
|
r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
|
|
r.becomeFollower(r.Term+1, None)
|
|
return false
|
|
}
|
|
|
|
// More defense-in-depth: throw away snapshot if recipient is not in the
|
|
// config. This shouuldn't ever happen (at the time of writing) but lots of
|
|
// code here and there assumes that r.id is in the progress tracker.
|
|
found := false
|
|
cs := s.Metadata.ConfState
|
|
for _, set := range [][]uint64{
|
|
cs.Nodes,
|
|
cs.Learners,
|
|
} {
|
|
for _, id := range set {
|
|
if id == r.id {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
r.logger.Warningf(
|
|
"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
|
|
r.id, cs,
|
|
)
|
|
return false
|
|
}
|
|
|
|
// Now go ahead and actually restore.
|
|
|
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
|
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
r.raftLog.commitTo(s.Metadata.Index)
|
|
return false
|
|
}
|
|
|
|
r.raftLog.restore(s)
|
|
|
|
// Reset the configuration and add the (potentially updated) peers in anew.
|
|
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
|
|
for _, id := range s.Metadata.ConfState.Nodes {
|
|
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
|
|
}
|
|
for _, id := range s.Metadata.ConfState.Learners {
|
|
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
|
|
}
|
|
|
|
pr := r.prs.Progress[r.id]
|
|
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
|
|
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
|
|
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
return true
|
|
}
|
|
|
|
// promotable indicates whether state machine can be promoted to leader,
|
|
// which is true when its own id is in progress list.
|
|
func (r *raft) promotable() bool {
|
|
pr := r.prs.Progress[r.id]
|
|
return pr != nil && !pr.IsLearner
|
|
}
|
|
|
|
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
|
|
cfg, prs, err := confchange.Changer{
|
|
Tracker: r.prs,
|
|
LastIndex: r.raftLog.lastIndex(),
|
|
}.Simple(cc)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
r.prs.Config = cfg
|
|
r.prs.Progress = prs
|
|
|
|
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
|
|
// Now that the configuration is updated, handle any side effects.
|
|
|
|
cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
|
|
pr, ok := r.prs.Progress[r.id]
|
|
|
|
// Update whether the node itself is a learner, resetting to false when the
|
|
// node is removed.
|
|
r.isLearner = ok && pr.IsLearner
|
|
|
|
if (!ok || r.isLearner) && r.state == StateLeader {
|
|
// This node is leader and was removed or demoted. We prevent demotions
|
|
// at the time writing but hypothetically we handle them the same way as
|
|
// removing the leader: stepping down into the next Term.
|
|
//
|
|
// TODO(tbg): step down (for sanity) and ask follower with largest Match
|
|
// to TimeoutNow (to avoid interruption). This might still drop some
|
|
// proposals but it's better than nothing.
|
|
//
|
|
// TODO(tbg): test this branch. It is untested at the time of writing.
|
|
return cs
|
|
}
|
|
|
|
// The remaining steps only make sense if this node is the leader and there
|
|
// are other nodes.
|
|
if r.state != StateLeader || len(cs.Nodes) == 0 {
|
|
return cs
|
|
}
|
|
if r.maybeCommit() {
|
|
// The quorum size may have been reduced (but not to zero), so see if
|
|
// any pending entries can be committed.
|
|
r.bcastAppend()
|
|
}
|
|
// If the the leadTransferee was removed, abort the leadership transfer.
|
|
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
|
|
r.abortLeaderTransfer()
|
|
}
|
|
|
|
return cs
|
|
}
|
|
|
|
func (r *raft) loadState(state pb.HardState) {
|
|
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
|
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
|
}
|
|
r.raftLog.committed = state.Commit
|
|
r.Term = state.Term
|
|
r.Vote = state.Vote
|
|
}
|
|
|
|
// pastElectionTimeout returns true iff r.electionElapsed is greater
|
|
// than or equal to the randomized election timeout in
|
|
// [electiontimeout, 2 * electiontimeout - 1].
|
|
func (r *raft) pastElectionTimeout() bool {
|
|
return r.electionElapsed >= r.randomizedElectionTimeout
|
|
}
|
|
|
|
func (r *raft) resetRandomizedElectionTimeout() {
|
|
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
|
|
}
|
|
|
|
func (r *raft) sendTimeoutNow(to uint64) {
|
|
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
|
|
}
|
|
|
|
func (r *raft) abortLeaderTransfer() {
|
|
r.leadTransferee = None
|
|
}
|
|
|
|
// increaseUncommittedSize computes the size of the proposed entries and
|
|
// determines whether they would push leader over its maxUncommittedSize limit.
|
|
// If the new entries would exceed the limit, the method returns false. If not,
|
|
// the increase in uncommitted entry size is recorded and the method returns
|
|
// true.
|
|
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
|
|
var s uint64
|
|
for _, e := range ents {
|
|
s += uint64(PayloadSize(e))
|
|
}
|
|
|
|
if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
|
|
// If the uncommitted tail of the Raft log is empty, allow any size
|
|
// proposal. Otherwise, limit the size of the uncommitted tail of the
|
|
// log and drop any proposal that would push the size over the limit.
|
|
return false
|
|
}
|
|
r.uncommittedSize += s
|
|
return true
|
|
}
|
|
|
|
// reduceUncommittedSize accounts for the newly committed entries by decreasing
|
|
// the uncommitted entry size limit.
|
|
func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
|
|
if r.uncommittedSize == 0 {
|
|
// Fast-path for followers, who do not track or enforce the limit.
|
|
return
|
|
}
|
|
|
|
var s uint64
|
|
for _, e := range ents {
|
|
s += uint64(PayloadSize(e))
|
|
}
|
|
if s > r.uncommittedSize {
|
|
// uncommittedSize may underestimate the size of the uncommitted Raft
|
|
// log tail but will never overestimate it. Saturate at 0 instead of
|
|
// allowing overflow.
|
|
r.uncommittedSize = 0
|
|
} else {
|
|
r.uncommittedSize -= s
|
|
}
|
|
}
|
|
|
|
func numOfPendingConf(ents []pb.Entry) int {
|
|
n := 0
|
|
for i := range ents {
|
|
if ents[i].Type == pb.EntryConfChange {
|
|
n++
|
|
}
|
|
}
|
|
return n
|
|
}
|