etcd/raft/tracker/progress.go
Tobias Schottdorf ac6b604bb8 raft/rafttest: introduce datadriven testing
It has often been tedious to test the interactions between multi-member
Raft groups, especially when many steps were required to reach a certain
scenario. Often, this boilerplate was as boring as it is hard to write
and hard to maintain, making it attractive to resort to shortcuts
whenever possible, which in turn tended to undercut how meaningful and
maintainable the tests ended up being - that is, if the tests were even
written, which sometimes they weren't.

This change introduces a datadriven framework specifically for testing
deterministically the interaction between multiple members of a raft group
with the goal of reducing the friction for writing these tests to near
zero.

In the near term, this will be used to add thorough testing for joint
consensus (which is already available today, but wildly undertested),
but just converting an existing test into this framework has shown that
the concise representation and built-in inspection of log messages
highlights unexpected behavior much more readily than the previous unit
tests did (the test in question is `snapshot_succeed_via_app_resp`; the
reader is invited to compare the old and new version of it).

The main building block is `InteractionEnv`, which holds on to the state
of the whole system and exposes various relevant methods for
manipulating it, including but not limited to adding nodes, delivering
and dropping messages, and proposing configuration changes. All of this
is extensible so that in the future I hope to use it to explore the
phenomena discussed in

https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263

which requires injecting appropriate "crash points" in the Ready
handling loop. Discussions of the "what if X happened in state Y"
can quickly be made concrete by "scripting up an interaction test".

Additionally, this framework is intentionally not kept internal to the
raft package.. Though this is in its infancy, a goal is that it should
be possible for a suite of interaction tests to allow applications to
validate that their Storage implementation behaves accordingly, simply
by running a raft-provided interaction suite against their Storage.
2019-08-12 08:10:29 -07:00

260 lines
8.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
import (
"fmt"
"sort"
"strings"
)
// Progress represents a followers progress in the view of the leader. Leader
// maintains progresses of all followers, and sends entries to the follower
// based on its progress.
//
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
type Progress struct {
Match, Next uint64
// State defines how the leader should interact with the follower.
//
// When in StateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in StateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in StateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State StateType
// PendingSnapshot is used in StateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
//
// TODO(tbg): the leader should always have this set to true.
RecentActive bool
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
// true, raft should pause sending replication message to this peer until
// ProbeSent is reset. See ProbeAcked() and IsPaused().
ProbeSent bool
// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
// The max number of entries per message is defined in raft config as MaxSizePerMsg.
// Thus inflight effectively limits both the number of inflight messages
// and the bandwidth each Progress can use.
// When inflights is Full, no more message should be sent.
// When a leader sends out a message, the index of the last
// entry should be added to inflights. The index MUST be added
// into inflights in order.
// When a leader receives a reply, the previous inflights should
// be freed by calling inflights.FreeLE with the index of the last
// received entry.
Inflights *Inflights
// IsLearner is true if this progress is tracked for a learner.
IsLearner bool
}
// ResetState moves the Progress into the specified State, resetting ProbeSent,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.ProbeSent = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
func min(a, b uint64) uint64 {
if a > b {
return b
}
return a
}
// ProbeAcked is called when this peer has accepted an append. It resets
// ProbeSent to signal that additional append messages should be sent without
// further delay.
func (pr *Progress) ProbeAcked() {
pr.ProbeSent = false
}
// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
// optionally and if larger, the index of the pending snapshot.
func (pr *Progress) BecomeProbe() {
// If the original state is StateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == StateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.ResetState(StateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.ResetState(StateProbe)
pr.Next = pr.Match + 1
}
}
// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
func (pr *Progress) BecomeReplicate() {
pr.ResetState(StateReplicate)
pr.Next = pr.Match + 1
}
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
pr.ResetState(StateSnapshot)
pr.PendingSnapshot = snapshoti
}
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.
func (pr *Progress) MaybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.ProbeAcked()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
// OptimisticUpdate signals that appends all the way up to and including index n
// are in-flight. As a result, Next is increased to n+1.
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index the follower rejected to append to its log, and its
// last index.
//
// Rejections can happen spuriously as messages are sent out of order or
// duplicated. In such cases, the rejection pertains to an index that the
// Progress already knows were previously acknowledged, and false is returned
// without changing the Progress.
//
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
// cleared for sending log entries.
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
if pr.State == StateReplicate {
// The rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// Directly decrease next to match + 1.
//
// TODO(tbg): why not use last if it's larger?
pr.Next = pr.Match + 1
return true
}
// The rejection must be stale if "rejected" does not match next - 1. This
// is because non-replicating followers are probed one entry at a time.
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.ProbeSent = false
return true
}
// IsPaused returns whether sending log entries to this node has been throttled.
// This is done when a node has rejected recent MsgApps, is currently waiting
// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
// operation, this is false. A throttled node will be contacted less frequently
// until it has reached a state in which it's able to accept a steady stream of
// log entries again.
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.ProbeSent
case StateReplicate:
return pr.Inflights.Full()
case StateSnapshot:
return true
default:
panic("unexpected state")
}
}
func (pr *Progress) String() string {
var buf strings.Builder
fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
if pr.IsLearner {
fmt.Fprint(&buf, " learner")
}
if pr.IsPaused() {
fmt.Fprint(&buf, " paused")
}
if pr.PendingSnapshot > 0 {
fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
}
if !pr.RecentActive {
fmt.Fprintf(&buf, " inactive")
}
if n := pr.Inflights.Count(); n > 0 {
fmt.Fprintf(&buf, " inflight=%d", n)
if pr.Inflights.Full() {
fmt.Fprint(&buf, "[full]")
}
}
return buf.String()
}
// ProgressMap is a map of *Progress.
type ProgressMap map[uint64]*Progress
// String prints the ProgressMap in sorted key order, one Progress per line.
func (m ProgressMap) String() string {
ids := make([]uint64, 0, len(m))
for k := range m {
ids = append(ids, k)
}
sort.Slice(ids, func(i, j int) bool {
return ids[i] < ids[j]
})
var buf strings.Builder
for _, id := range ids {
fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
}
return buf.String()
}