etcd/raft/raft.go
Xiang Li b4022899eb raft: fix panic in send app
sendApp accesses the storage several times. Perviously, we
assume that the storage will not be modified during the read
opeartions. The assumption is not true since the storage can
be compacted between the read operations. If a compaction
causes a read entries error, we should not painc. Instead, we
can simply retry the sendApp logic until succeed.
2015-06-15 14:23:33 -07:00

777 lines
22 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"errors"
"fmt"
"math"
"math/rand"
"sort"
"strings"
pb "github.com/coreos/etcd/raft/raftpb"
)
// None is a placeholder node ID used when there is no leader.
const None uint64 = 0
const noLimit = math.MaxUint64
var errNoLeader = errors.New("no leader")
// Possible values for StateType.
const (
StateFollower StateType = iota
StateCandidate
StateLeader
)
// StateType represents the role of a node in a cluster.
type StateType uint64
var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
}
func (st StateType) String() string {
return stmap[uint64(st)]
}
// Config contains the parameters to start a raft.
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64
// peers contains the IDs of all nodes (including self) in
// the raft cluster. It should only be set when starting a new
// raft cluster.
// Restarting raft from previous configuration will panic if
// peers is set.
// peer is private and only used for testing right now.
peers []uint64
// ElectionTick is the election timeout. If a follower does not
// receive any message from the leader of current term during
// ElectionTick, it will become candidate and start an election.
// ElectionTick must be greater than HeartbeatTick. We suggest
// to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary
// leader switching.
ElectionTick int
// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
// message to maintain the leadership every heartbeat interval.
HeartbeatTick int
// Storage is the storage for raft. raft generates entires and
// states to be stored in storage. raft reads the persisted entires
// and states out of Storage when it needs. raft reads out the previous
// state and configuration out of storage when restarting.
Storage Storage
// Applied is the last applied index. It should only be set when restarting
// raft. raft will not return entries to the application smaller or equal to Applied.
// If Applied is unset when restarting, raft might return previous applied entries.
// This is a very application dependent configuration.
Applied uint64
// MaxSizePerMsg limits the max size of each append message. Smaller value lowers
// the raft recovery cost(initial probing and message lost during normal operation).
// On the other side, it might affect the throughput during normal replication.
// Note: math.MaxUint64 for unlimited, 0 for at most one entry per message.
MaxSizePerMsg uint64
// MaxInflightMsgs limits the max number of in-flight append messages during optimistic
// replication phase. The application transportation layer usually has its own sending
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
// TODO (xiangli): feedback to application to limit the proposal rate?
MaxInflightMsgs int
}
func (c *Config) validate() error {
if c.ID == None {
return errors.New("cannot use none as id")
}
if c.HeartbeatTick <= 0 {
return errors.New("heartbeat tick must be greater than 0")
}
if c.ElectionTick <= c.HeartbeatTick {
return errors.New("election tick must be greater than heartbeat tick")
}
if c.Storage == nil {
return errors.New("storage cannot be nil")
}
if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
return nil
}
type raft struct {
pb.HardState
id uint64
// the log
raftLog *raftLog
maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress
state StateType
votes map[uint64]bool
msgs []pb.Message
// the leader id
lead uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
elapsed int // number of ticks since the last msg
heartbeatTimeout int
electionTimeout int
rand *rand.Rand
tick func()
step stepFunc
}
func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
}
peers := c.peers
if len(cs.Nodes) > 0 {
if len(peers) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
}
peers = cs.Nodes
}
r := &raft{
id: c.ID,
lead: None,
raftLog: raftlog,
// 4MB for now and hard code it
// TODO(xiang): add a config arguement into newRaft after we add
// the max inflight message field.
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
prs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
}
r.rand = rand.New(rand.NewSource(int64(c.ID)))
for _, p := range peers {
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
}
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
}
if c.Applied > 0 {
raftlog.appliedTo(c.Applied)
}
r.becomeFollower(r.Term, None)
nodesStrs := make([]string, 0)
for _, n := range r.nodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
raftLogger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
func (r *raft) hasLeader() bool { return r.lead != None }
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
func (r *raft) q() int { return len(r.prs)/2 + 1 }
func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
for k := range r.prs {
nodes = append(nodes, k)
}
sort.Sort(uint64Slice(nodes))
return nodes
}
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
// do not attach term to MsgProp
// proposals are a way to forward to the leader and
// should be treated as local message.
if m.Type != pb.MsgProp {
m.Term = r.Term
}
r.msgs = append(r.msgs, m)
}
// sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
if pr.isPaused() {
return
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
raftLogger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
}
// sendHeartbeat sends an empty MsgApp
func (r *raft) sendHeartbeat(to uint64) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
}
r.send(m)
}
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
for i := range r.prs {
if i == r.id {
continue
}
r.sendAppend(i)
}
}
// bcastHeartbeat sends RRPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
for i := range r.prs {
if i == r.id {
continue
}
r.sendHeartbeat(i)
r.prs[i].resume()
}
}
func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, len(r.prs))
for i := range r.prs {
mis = append(mis, r.prs[i].Match)
}
sort.Sort(sort.Reverse(mis))
mci := mis[r.q()-1]
return r.raftLog.maybeCommit(mci, r.Term)
}
func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
r.Vote = None
}
r.lead = None
r.elapsed = 0
r.votes = make(map[uint64]bool)
for i := range r.prs {
r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
if i == r.id {
r.prs[i].Match = r.raftLog.lastIndex()
}
}
r.pendingConf = false
}
func (r *raft) appendEntry(es ...pb.Entry) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
r.raftLog.append(es...)
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
r.maybeCommit()
}
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
if !r.promotable() {
r.elapsed = 0
return
}
r.elapsed++
if r.isElectionTimeout() {
r.elapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
r.elapsed++
if r.elapsed >= r.heartbeatTimeout {
r.elapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
raftLogger.Infof("%x became follower at term %d", r.id, r.Term)
}
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
raftLogger.Infof("%x became candidate at term %d", r.id, r.Term)
}
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}
for _, e := range ents {
if e.Type != pb.EntryConfChange {
continue
}
if r.pendingConf {
panic("unexpected double uncommitted config entry")
}
r.pendingConf = true
}
r.appendEntry(pb.Entry{Data: nil})
raftLogger.Infof("%x became leader at term %d", r.id, r.Term)
}
func (r *raft) campaign() {
r.becomeCandidate()
if r.q() == r.poll(r.id, true) {
r.becomeLeader()
return
}
for i := range r.prs {
if i == r.id {
continue
}
raftLogger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
}
}
func (r *raft) poll(id uint64, v bool) (granted int) {
if v {
raftLogger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
} else {
raftLogger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
}
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
}
for _, vv := range r.votes {
if vv {
granted++
}
}
return granted
}
func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgHup {
raftLogger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign()
r.Commit = r.raftLog.committed
return nil
}
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
lead := m.From
if m.Type == pb.MsgVote {
lead = None
}
raftLogger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
r.becomeFollower(m.Term, lead)
case m.Term < r.Term:
// ignore
raftLogger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
return nil
}
r.step(r, m)
r.Commit = r.raftLog.committed
return nil
}
type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m pb.Message) {
pr := r.prs[m.From]
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
case pb.MsgProp:
if len(m.Entries) == 0 {
raftLogger.Panicf("%x stepped empty MsgProp", r.id)
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
r.bcastAppend()
case pb.MsgAppResp:
if m.Reject {
raftLogger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
raftLogger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.isPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
}
}
case pb.MsgHeartbeatResp:
// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() {
pr.ins.freeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
case pb.MsgVote:
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return
}
if !m.Reject {
pr.becomeProbe()
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFailure()
pr.becomeProbe()
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause()
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
raftLogger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
}
}
func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(r.Term, m.From)
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From)
r.handleSnapshot(m)
case pb.MsgVote:
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgVoteResp:
gr := r.poll(m.From, !m.Reject)
raftLogger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
switch r.q() {
case gr:
r.becomeLeader()
r.bcastAppend()
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
}
}
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.elapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.elapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.elapsed = 0
r.handleSnapshot(m)
case pb.MsgVote:
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.elapsed = 0
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
} else {
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
}
}
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.Commit {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.Commit})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
}
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
raftLogger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
raftLogger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.commitTo(s.Metadata.Index)
return false
}
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s)
r.prs = make(map[uint64]*Progress)
for _, n := range s.Metadata.ConfState.Nodes {
match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
if n == r.id {
match = next - 1
} else {
match = 0
}
r.setProgress(n, match, next)
raftLogger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
}
return true
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok
}
func (r *raft) addNode(id uint64) {
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.pendingConf = false
}
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
}
func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
}
func (r *raft) delProgress(id uint64) {
delete(r.prs, id)
}
func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
raftLogger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
r.Commit = state.Commit
}
// isElectionTimeout returns true if r.elapsed is greater than the
// randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
// Otherwise, it returns false.
func (r *raft) isElectionTimeout() bool {
d := r.elapsed - r.electionTimeout
if d < 0 {
return false
}
return d > r.rand.Int()%r.electionTimeout
}