mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

sendApp accesses the storage several times. Perviously, we assume that the storage will not be modified during the read opeartions. The assumption is not true since the storage can be compacted between the read operations. If a compaction causes a read entries error, we should not painc. Instead, we can simply retry the sendApp logic until succeed.
777 lines
22 KiB
Go
777 lines
22 KiB
Go
// Copyright 2015 CoreOS, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package raft
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"strings"
|
|
|
|
pb "github.com/coreos/etcd/raft/raftpb"
|
|
)
|
|
|
|
// None is a placeholder node ID used when there is no leader.
|
|
const None uint64 = 0
|
|
const noLimit = math.MaxUint64
|
|
|
|
var errNoLeader = errors.New("no leader")
|
|
|
|
// Possible values for StateType.
|
|
const (
|
|
StateFollower StateType = iota
|
|
StateCandidate
|
|
StateLeader
|
|
)
|
|
|
|
// StateType represents the role of a node in a cluster.
|
|
type StateType uint64
|
|
|
|
var stmap = [...]string{
|
|
"StateFollower",
|
|
"StateCandidate",
|
|
"StateLeader",
|
|
}
|
|
|
|
func (st StateType) String() string {
|
|
return stmap[uint64(st)]
|
|
}
|
|
|
|
// Config contains the parameters to start a raft.
|
|
type Config struct {
|
|
// ID is the identity of the local raft. ID cannot be 0.
|
|
ID uint64
|
|
|
|
// peers contains the IDs of all nodes (including self) in
|
|
// the raft cluster. It should only be set when starting a new
|
|
// raft cluster.
|
|
// Restarting raft from previous configuration will panic if
|
|
// peers is set.
|
|
// peer is private and only used for testing right now.
|
|
peers []uint64
|
|
|
|
// ElectionTick is the election timeout. If a follower does not
|
|
// receive any message from the leader of current term during
|
|
// ElectionTick, it will become candidate and start an election.
|
|
// ElectionTick must be greater than HeartbeatTick. We suggest
|
|
// to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary
|
|
// leader switching.
|
|
ElectionTick int
|
|
// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
|
|
// message to maintain the leadership every heartbeat interval.
|
|
HeartbeatTick int
|
|
|
|
// Storage is the storage for raft. raft generates entires and
|
|
// states to be stored in storage. raft reads the persisted entires
|
|
// and states out of Storage when it needs. raft reads out the previous
|
|
// state and configuration out of storage when restarting.
|
|
Storage Storage
|
|
// Applied is the last applied index. It should only be set when restarting
|
|
// raft. raft will not return entries to the application smaller or equal to Applied.
|
|
// If Applied is unset when restarting, raft might return previous applied entries.
|
|
// This is a very application dependent configuration.
|
|
Applied uint64
|
|
|
|
// MaxSizePerMsg limits the max size of each append message. Smaller value lowers
|
|
// the raft recovery cost(initial probing and message lost during normal operation).
|
|
// On the other side, it might affect the throughput during normal replication.
|
|
// Note: math.MaxUint64 for unlimited, 0 for at most one entry per message.
|
|
MaxSizePerMsg uint64
|
|
// MaxInflightMsgs limits the max number of in-flight append messages during optimistic
|
|
// replication phase. The application transportation layer usually has its own sending
|
|
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
|
|
// TODO (xiangli): feedback to application to limit the proposal rate?
|
|
MaxInflightMsgs int
|
|
}
|
|
|
|
func (c *Config) validate() error {
|
|
if c.ID == None {
|
|
return errors.New("cannot use none as id")
|
|
}
|
|
|
|
if c.HeartbeatTick <= 0 {
|
|
return errors.New("heartbeat tick must be greater than 0")
|
|
}
|
|
|
|
if c.ElectionTick <= c.HeartbeatTick {
|
|
return errors.New("election tick must be greater than heartbeat tick")
|
|
}
|
|
|
|
if c.Storage == nil {
|
|
return errors.New("storage cannot be nil")
|
|
}
|
|
|
|
if c.MaxInflightMsgs <= 0 {
|
|
return errors.New("max inflight messages must be greater than 0")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type raft struct {
|
|
pb.HardState
|
|
|
|
id uint64
|
|
|
|
// the log
|
|
raftLog *raftLog
|
|
|
|
maxInflight int
|
|
maxMsgSize uint64
|
|
prs map[uint64]*Progress
|
|
|
|
state StateType
|
|
|
|
votes map[uint64]bool
|
|
|
|
msgs []pb.Message
|
|
|
|
// the leader id
|
|
lead uint64
|
|
|
|
// New configuration is ignored if there exists unapplied configuration.
|
|
pendingConf bool
|
|
|
|
elapsed int // number of ticks since the last msg
|
|
heartbeatTimeout int
|
|
electionTimeout int
|
|
rand *rand.Rand
|
|
tick func()
|
|
step stepFunc
|
|
}
|
|
|
|
func newRaft(c *Config) *raft {
|
|
if err := c.validate(); err != nil {
|
|
panic(err.Error())
|
|
}
|
|
raftlog := newLog(c.Storage)
|
|
hs, cs, err := c.Storage.InitialState()
|
|
if err != nil {
|
|
panic(err) // TODO(bdarnell)
|
|
}
|
|
peers := c.peers
|
|
if len(cs.Nodes) > 0 {
|
|
if len(peers) > 0 {
|
|
// TODO(bdarnell): the peers argument is always nil except in
|
|
// tests; the argument should be removed and these tests should be
|
|
// updated to specify their nodes through a snapshot.
|
|
panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
|
|
}
|
|
peers = cs.Nodes
|
|
}
|
|
r := &raft{
|
|
id: c.ID,
|
|
lead: None,
|
|
raftLog: raftlog,
|
|
// 4MB for now and hard code it
|
|
// TODO(xiang): add a config arguement into newRaft after we add
|
|
// the max inflight message field.
|
|
maxMsgSize: c.MaxSizePerMsg,
|
|
maxInflight: c.MaxInflightMsgs,
|
|
prs: make(map[uint64]*Progress),
|
|
electionTimeout: c.ElectionTick,
|
|
heartbeatTimeout: c.HeartbeatTick,
|
|
}
|
|
r.rand = rand.New(rand.NewSource(int64(c.ID)))
|
|
for _, p := range peers {
|
|
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
|
}
|
|
if !isHardStateEqual(hs, emptyState) {
|
|
r.loadState(hs)
|
|
}
|
|
if c.Applied > 0 {
|
|
raftlog.appliedTo(c.Applied)
|
|
}
|
|
r.becomeFollower(r.Term, None)
|
|
|
|
nodesStrs := make([]string, 0)
|
|
for _, n := range r.nodes() {
|
|
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
|
}
|
|
|
|
raftLogger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
|
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
|
return r
|
|
}
|
|
|
|
func (r *raft) hasLeader() bool { return r.lead != None }
|
|
|
|
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
|
|
|
func (r *raft) q() int { return len(r.prs)/2 + 1 }
|
|
|
|
func (r *raft) nodes() []uint64 {
|
|
nodes := make([]uint64, 0, len(r.prs))
|
|
for k := range r.prs {
|
|
nodes = append(nodes, k)
|
|
}
|
|
sort.Sort(uint64Slice(nodes))
|
|
return nodes
|
|
}
|
|
|
|
// send persists state to stable storage and then sends to its mailbox.
|
|
func (r *raft) send(m pb.Message) {
|
|
m.From = r.id
|
|
// do not attach term to MsgProp
|
|
// proposals are a way to forward to the leader and
|
|
// should be treated as local message.
|
|
if m.Type != pb.MsgProp {
|
|
m.Term = r.Term
|
|
}
|
|
r.msgs = append(r.msgs, m)
|
|
}
|
|
|
|
// sendAppend sends RRPC, with entries to the given peer.
|
|
func (r *raft) sendAppend(to uint64) {
|
|
pr := r.prs[to]
|
|
if pr.isPaused() {
|
|
return
|
|
}
|
|
m := pb.Message{}
|
|
m.To = to
|
|
|
|
term, errt := r.raftLog.term(pr.Next - 1)
|
|
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
|
|
|
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
|
|
m.Type = pb.MsgSnap
|
|
snapshot, err := r.raftLog.snapshot()
|
|
if err != nil {
|
|
panic(err) // TODO(bdarnell)
|
|
}
|
|
if IsEmptySnap(snapshot) {
|
|
panic("need non-empty snapshot")
|
|
}
|
|
m.Snapshot = snapshot
|
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
|
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
|
pr.becomeSnapshot(sindex)
|
|
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
|
} else {
|
|
m.Type = pb.MsgApp
|
|
m.Index = pr.Next - 1
|
|
m.LogTerm = term
|
|
m.Entries = ents
|
|
m.Commit = r.raftLog.committed
|
|
if n := len(m.Entries); n != 0 {
|
|
switch pr.State {
|
|
// optimistically increase the next when in ProgressStateReplicate
|
|
case ProgressStateReplicate:
|
|
last := m.Entries[n-1].Index
|
|
pr.optimisticUpdate(last)
|
|
pr.ins.add(last)
|
|
case ProgressStateProbe:
|
|
pr.pause()
|
|
default:
|
|
raftLogger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
|
|
}
|
|
}
|
|
}
|
|
r.send(m)
|
|
}
|
|
|
|
// sendHeartbeat sends an empty MsgApp
|
|
func (r *raft) sendHeartbeat(to uint64) {
|
|
// Attach the commit as min(to.matched, r.committed).
|
|
// When the leader sends out heartbeat message,
|
|
// the receiver(follower) might not be matched with the leader
|
|
// or it might not have all the committed entries.
|
|
// The leader MUST NOT forward the follower's commit to
|
|
// an unmatched index.
|
|
commit := min(r.prs[to].Match, r.raftLog.committed)
|
|
m := pb.Message{
|
|
To: to,
|
|
Type: pb.MsgHeartbeat,
|
|
Commit: commit,
|
|
}
|
|
r.send(m)
|
|
}
|
|
|
|
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date
|
|
// according to the progress recorded in r.prs.
|
|
func (r *raft) bcastAppend() {
|
|
for i := range r.prs {
|
|
if i == r.id {
|
|
continue
|
|
}
|
|
r.sendAppend(i)
|
|
}
|
|
}
|
|
|
|
// bcastHeartbeat sends RRPC, without entries to all the peers.
|
|
func (r *raft) bcastHeartbeat() {
|
|
for i := range r.prs {
|
|
if i == r.id {
|
|
continue
|
|
}
|
|
r.sendHeartbeat(i)
|
|
r.prs[i].resume()
|
|
}
|
|
}
|
|
|
|
func (r *raft) maybeCommit() bool {
|
|
// TODO(bmizerany): optimize.. Currently naive
|
|
mis := make(uint64Slice, 0, len(r.prs))
|
|
for i := range r.prs {
|
|
mis = append(mis, r.prs[i].Match)
|
|
}
|
|
sort.Sort(sort.Reverse(mis))
|
|
mci := mis[r.q()-1]
|
|
return r.raftLog.maybeCommit(mci, r.Term)
|
|
}
|
|
|
|
func (r *raft) reset(term uint64) {
|
|
if r.Term != term {
|
|
r.Term = term
|
|
r.Vote = None
|
|
}
|
|
r.lead = None
|
|
r.elapsed = 0
|
|
r.votes = make(map[uint64]bool)
|
|
for i := range r.prs {
|
|
r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
|
|
if i == r.id {
|
|
r.prs[i].Match = r.raftLog.lastIndex()
|
|
}
|
|
}
|
|
r.pendingConf = false
|
|
}
|
|
|
|
func (r *raft) appendEntry(es ...pb.Entry) {
|
|
li := r.raftLog.lastIndex()
|
|
for i := range es {
|
|
es[i].Term = r.Term
|
|
es[i].Index = li + 1 + uint64(i)
|
|
}
|
|
r.raftLog.append(es...)
|
|
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
|
|
r.maybeCommit()
|
|
}
|
|
|
|
// tickElection is run by followers and candidates after r.electionTimeout.
|
|
func (r *raft) tickElection() {
|
|
if !r.promotable() {
|
|
r.elapsed = 0
|
|
return
|
|
}
|
|
r.elapsed++
|
|
if r.isElectionTimeout() {
|
|
r.elapsed = 0
|
|
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
|
|
}
|
|
}
|
|
|
|
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
|
|
func (r *raft) tickHeartbeat() {
|
|
r.elapsed++
|
|
if r.elapsed >= r.heartbeatTimeout {
|
|
r.elapsed = 0
|
|
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
|
|
}
|
|
}
|
|
|
|
func (r *raft) becomeFollower(term uint64, lead uint64) {
|
|
r.step = stepFollower
|
|
r.reset(term)
|
|
r.tick = r.tickElection
|
|
r.lead = lead
|
|
r.state = StateFollower
|
|
raftLogger.Infof("%x became follower at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) becomeCandidate() {
|
|
// TODO(xiangli) remove the panic when the raft implementation is stable
|
|
if r.state == StateLeader {
|
|
panic("invalid transition [leader -> candidate]")
|
|
}
|
|
r.step = stepCandidate
|
|
r.reset(r.Term + 1)
|
|
r.tick = r.tickElection
|
|
r.Vote = r.id
|
|
r.state = StateCandidate
|
|
raftLogger.Infof("%x became candidate at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) becomeLeader() {
|
|
// TODO(xiangli) remove the panic when the raft implementation is stable
|
|
if r.state == StateFollower {
|
|
panic("invalid transition [follower -> leader]")
|
|
}
|
|
r.step = stepLeader
|
|
r.reset(r.Term)
|
|
r.tick = r.tickHeartbeat
|
|
r.lead = r.id
|
|
r.state = StateLeader
|
|
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
|
|
if err != nil {
|
|
raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
|
|
}
|
|
|
|
for _, e := range ents {
|
|
if e.Type != pb.EntryConfChange {
|
|
continue
|
|
}
|
|
if r.pendingConf {
|
|
panic("unexpected double uncommitted config entry")
|
|
}
|
|
r.pendingConf = true
|
|
}
|
|
r.appendEntry(pb.Entry{Data: nil})
|
|
raftLogger.Infof("%x became leader at term %d", r.id, r.Term)
|
|
}
|
|
|
|
func (r *raft) campaign() {
|
|
r.becomeCandidate()
|
|
if r.q() == r.poll(r.id, true) {
|
|
r.becomeLeader()
|
|
return
|
|
}
|
|
for i := range r.prs {
|
|
if i == r.id {
|
|
continue
|
|
}
|
|
raftLogger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
|
|
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
|
}
|
|
}
|
|
|
|
func (r *raft) poll(id uint64, v bool) (granted int) {
|
|
if v {
|
|
raftLogger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
|
|
} else {
|
|
raftLogger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
|
|
}
|
|
if _, ok := r.votes[id]; !ok {
|
|
r.votes[id] = v
|
|
}
|
|
for _, vv := range r.votes {
|
|
if vv {
|
|
granted++
|
|
}
|
|
}
|
|
return granted
|
|
}
|
|
|
|
func (r *raft) Step(m pb.Message) error {
|
|
if m.Type == pb.MsgHup {
|
|
raftLogger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
|
r.campaign()
|
|
r.Commit = r.raftLog.committed
|
|
return nil
|
|
}
|
|
|
|
switch {
|
|
case m.Term == 0:
|
|
// local message
|
|
case m.Term > r.Term:
|
|
lead := m.From
|
|
if m.Type == pb.MsgVote {
|
|
lead = None
|
|
}
|
|
raftLogger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
|
|
r.id, r.Term, m.Type, m.From, m.Term)
|
|
r.becomeFollower(m.Term, lead)
|
|
case m.Term < r.Term:
|
|
// ignore
|
|
raftLogger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
|
r.id, r.Term, m.Type, m.From, m.Term)
|
|
return nil
|
|
}
|
|
r.step(r, m)
|
|
r.Commit = r.raftLog.committed
|
|
return nil
|
|
}
|
|
|
|
type stepFunc func(r *raft, m pb.Message)
|
|
|
|
func stepLeader(r *raft, m pb.Message) {
|
|
pr := r.prs[m.From]
|
|
|
|
switch m.Type {
|
|
case pb.MsgBeat:
|
|
r.bcastHeartbeat()
|
|
case pb.MsgProp:
|
|
if len(m.Entries) == 0 {
|
|
raftLogger.Panicf("%x stepped empty MsgProp", r.id)
|
|
}
|
|
for i, e := range m.Entries {
|
|
if e.Type == pb.EntryConfChange {
|
|
if r.pendingConf {
|
|
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
|
|
}
|
|
r.pendingConf = true
|
|
}
|
|
}
|
|
r.appendEntry(m.Entries...)
|
|
r.bcastAppend()
|
|
case pb.MsgAppResp:
|
|
if m.Reject {
|
|
raftLogger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
|
|
r.id, m.RejectHint, m.From, m.Index)
|
|
if pr.maybeDecrTo(m.Index, m.RejectHint) {
|
|
raftLogger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
|
|
if pr.State == ProgressStateReplicate {
|
|
pr.becomeProbe()
|
|
}
|
|
r.sendAppend(m.From)
|
|
}
|
|
} else {
|
|
oldPaused := pr.isPaused()
|
|
if pr.maybeUpdate(m.Index) {
|
|
switch {
|
|
case pr.State == ProgressStateProbe:
|
|
pr.becomeReplicate()
|
|
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
|
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
pr.becomeProbe()
|
|
case pr.State == ProgressStateReplicate:
|
|
pr.ins.freeTo(m.Index)
|
|
}
|
|
|
|
if r.maybeCommit() {
|
|
r.bcastAppend()
|
|
} else if oldPaused {
|
|
// update() reset the wait state on this node. If we had delayed sending
|
|
// an update before, send it now.
|
|
r.sendAppend(m.From)
|
|
}
|
|
}
|
|
}
|
|
case pb.MsgHeartbeatResp:
|
|
// free one slot for the full inflights window to allow progress.
|
|
if pr.State == ProgressStateReplicate && pr.ins.full() {
|
|
pr.ins.freeFirstOne()
|
|
}
|
|
if pr.Match < r.raftLog.lastIndex() {
|
|
r.sendAppend(m.From)
|
|
}
|
|
case pb.MsgVote:
|
|
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
|
case pb.MsgSnapStatus:
|
|
if pr.State != ProgressStateSnapshot {
|
|
return
|
|
}
|
|
if !m.Reject {
|
|
pr.becomeProbe()
|
|
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
} else {
|
|
pr.snapshotFailure()
|
|
pr.becomeProbe()
|
|
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
|
}
|
|
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
|
// out the next msgApp.
|
|
// If snapshot failure, wait for a heartbeat interval before next try
|
|
pr.pause()
|
|
case pb.MsgUnreachable:
|
|
// During optimistic replication, if the remote becomes unreachable,
|
|
// there is huge probability that a MsgApp is lost.
|
|
if pr.State == ProgressStateReplicate {
|
|
pr.becomeProbe()
|
|
}
|
|
raftLogger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
|
|
}
|
|
}
|
|
|
|
func stepCandidate(r *raft, m pb.Message) {
|
|
switch m.Type {
|
|
case pb.MsgProp:
|
|
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
|
return
|
|
case pb.MsgApp:
|
|
r.becomeFollower(r.Term, m.From)
|
|
r.handleAppendEntries(m)
|
|
case pb.MsgHeartbeat:
|
|
r.becomeFollower(r.Term, m.From)
|
|
r.handleHeartbeat(m)
|
|
case pb.MsgSnap:
|
|
r.becomeFollower(m.Term, m.From)
|
|
r.handleSnapshot(m)
|
|
case pb.MsgVote:
|
|
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
|
case pb.MsgVoteResp:
|
|
gr := r.poll(m.From, !m.Reject)
|
|
raftLogger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
|
|
switch r.q() {
|
|
case gr:
|
|
r.becomeLeader()
|
|
r.bcastAppend()
|
|
case len(r.votes) - gr:
|
|
r.becomeFollower(r.Term, None)
|
|
}
|
|
}
|
|
}
|
|
|
|
func stepFollower(r *raft, m pb.Message) {
|
|
switch m.Type {
|
|
case pb.MsgProp:
|
|
if r.lead == None {
|
|
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
|
return
|
|
}
|
|
m.To = r.lead
|
|
r.send(m)
|
|
case pb.MsgApp:
|
|
r.elapsed = 0
|
|
r.lead = m.From
|
|
r.handleAppendEntries(m)
|
|
case pb.MsgHeartbeat:
|
|
r.elapsed = 0
|
|
r.lead = m.From
|
|
r.handleHeartbeat(m)
|
|
case pb.MsgSnap:
|
|
r.elapsed = 0
|
|
r.handleSnapshot(m)
|
|
case pb.MsgVote:
|
|
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
|
r.elapsed = 0
|
|
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.Vote = m.From
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
|
} else {
|
|
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *raft) handleAppendEntries(m pb.Message) {
|
|
if m.Index < r.Commit {
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.Commit})
|
|
return
|
|
}
|
|
|
|
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
|
} else {
|
|
raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
|
r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
|
}
|
|
}
|
|
|
|
func (r *raft) handleHeartbeat(m pb.Message) {
|
|
r.raftLog.commitTo(m.Commit)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
|
|
}
|
|
|
|
func (r *raft) handleSnapshot(m pb.Message) {
|
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
|
if r.restore(m.Snapshot) {
|
|
raftLogger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
|
r.id, r.Commit, sindex, sterm)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
|
} else {
|
|
raftLogger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
|
r.id, r.Commit, sindex, sterm)
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
}
|
|
}
|
|
|
|
// restore recovers the state machine from a snapshot. It restores the log and the
|
|
// configuration of state machine.
|
|
func (r *raft) restore(s pb.Snapshot) bool {
|
|
if s.Metadata.Index <= r.raftLog.committed {
|
|
return false
|
|
}
|
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
|
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
r.raftLog.commitTo(s.Metadata.Index)
|
|
return false
|
|
}
|
|
|
|
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
|
|
r.raftLog.restore(s)
|
|
r.prs = make(map[uint64]*Progress)
|
|
for _, n := range s.Metadata.ConfState.Nodes {
|
|
match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
|
|
if n == r.id {
|
|
match = next - 1
|
|
} else {
|
|
match = 0
|
|
}
|
|
r.setProgress(n, match, next)
|
|
raftLogger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
|
|
}
|
|
return true
|
|
}
|
|
|
|
// promotable indicates whether state machine can be promoted to leader,
|
|
// which is true when its own id is in progress list.
|
|
func (r *raft) promotable() bool {
|
|
_, ok := r.prs[r.id]
|
|
return ok
|
|
}
|
|
|
|
func (r *raft) addNode(id uint64) {
|
|
if _, ok := r.prs[id]; ok {
|
|
// Ignore any redundant addNode calls (which can happen because the
|
|
// initial bootstrapping entries are applied twice).
|
|
return
|
|
}
|
|
|
|
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
|
|
r.pendingConf = false
|
|
}
|
|
|
|
func (r *raft) removeNode(id uint64) {
|
|
r.delProgress(id)
|
|
r.pendingConf = false
|
|
}
|
|
|
|
func (r *raft) resetPendingConf() { r.pendingConf = false }
|
|
|
|
func (r *raft) setProgress(id, match, next uint64) {
|
|
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
|
|
}
|
|
|
|
func (r *raft) delProgress(id uint64) {
|
|
delete(r.prs, id)
|
|
}
|
|
|
|
func (r *raft) loadState(state pb.HardState) {
|
|
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
|
raftLogger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
|
}
|
|
r.raftLog.committed = state.Commit
|
|
r.Term = state.Term
|
|
r.Vote = state.Vote
|
|
r.Commit = state.Commit
|
|
}
|
|
|
|
// isElectionTimeout returns true if r.elapsed is greater than the
|
|
// randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
|
|
// Otherwise, it returns false.
|
|
func (r *raft) isElectionTimeout() bool {
|
|
d := r.elapsed - r.electionTimeout
|
|
if d < 0 {
|
|
return false
|
|
}
|
|
return d > r.rand.Int()%r.electionTimeout
|
|
}
|