etcd/raft/progress.go
2015-03-19 10:05:04 -07:00

152 lines
4.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import "fmt"
const (
ProgressStateProbe ProgressStateType = iota
ProgressStateReplicate
ProgressStateSnapshot
)
type ProgressStateType uint64
var prstmap = [...]string{
"ProgressStateProbe",
"ProgressStateReplicate",
"ProgressStateSnapshot",
}
func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
// Progress represents a followers progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
Match, Next uint64
// When in ProgressStateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in ProgressStateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
// PendingSnapshot is used in ProgressStateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64
}
func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false
pr.PendingSnapshot = 0
pr.State = state
}
func (pr *Progress) becomeProbe() {
// If the original state is ProgressStateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == ProgressStateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.resetState(ProgressStateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.resetState(ProgressStateProbe)
pr.Next = pr.Match + 1
}
}
func (pr *Progress) becomeReplicate() {
pr.resetState(ProgressStateReplicate)
pr.Next = pr.Match + 1
}
func (pr *Progress) becomeSnapshot(snapshoti uint64) {
pr.resetState(ProgressStateSnapshot)
pr.PendingSnapshot = snapshoti
}
// maybeUpdate returns false if the given n index comes from an outdated message.
// Otherwise it updates the progress and returns true.
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
pr.Next = pr.Match + 1
return true
}
// the rejection must be stale if "rejected" does not match next - 1
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
func (pr *Progress) pause() { pr.Paused = true }
func (pr *Progress) resume() { pr.Paused = false }
// isPaused returns whether progress stops sending message.
func (pr *Progress) isPaused() bool {
return pr.State == ProgressStateProbe && pr.Paused || pr.State == ProgressStateSnapshot
}
func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
// maybeSnapshotAbort unsets pendingSnapshot if Match is equal or higher than
// the pendingSnapshot
func (pr *Progress) maybeSnapshotAbort() bool {
return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
}
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.isPaused(), pr.PendingSnapshot)
}