mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft/rafttest: add test for replication pausing
This commit adds a data-driven test which simulates conditions under which Raft messages flow to a particular node is throttled while in StateReplicate. The test demonstrates that MsgApp messages with non-empty Entries may "leak" to a paused stream every time there is successful heartbeat exchange. Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
parent
4d15f5074c
commit
0a0f0ae719
@ -84,15 +84,13 @@ type Storage interface {
|
||||
Append([]pb.Entry) error
|
||||
}
|
||||
|
||||
// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults.
|
||||
// In particular, no limits are set.
|
||||
func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config {
|
||||
return &raft.Config{
|
||||
ID: id,
|
||||
Applied: applied,
|
||||
// raftConfigStub sets up a raft.Config stub with reasonable testing defaults.
|
||||
// In particular, no limits are set. It is not a complete config: ID and Storage
|
||||
// must be set for each node using the stub as a template.
|
||||
func raftConfigStub() raft.Config {
|
||||
return raft.Config{
|
||||
ElectionTick: 3,
|
||||
HeartbeatTick: 1,
|
||||
Storage: s,
|
||||
MaxSizePerMsg: math.MaxUint64,
|
||||
MaxInflightMsgs: math.MaxInt32,
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error {
|
||||
n := firstAsInt(t, d)
|
||||
var snap pb.Snapshot
|
||||
cfg := raftConfigStub()
|
||||
for _, arg := range d.CmdArgs[1:] {
|
||||
for i := range arg.Vals {
|
||||
switch arg.Key {
|
||||
@ -39,14 +40,17 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
|
||||
var id uint64
|
||||
arg.Scan(t, i, &id)
|
||||
snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id)
|
||||
case "inflight":
|
||||
arg.Scan(t, i, &cfg.MaxInflightMsgs)
|
||||
case "index":
|
||||
arg.Scan(t, i, &snap.Metadata.Index)
|
||||
cfg.Applied = snap.Metadata.Index
|
||||
case "content":
|
||||
arg.Scan(t, i, &snap.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
return env.AddNodes(n, snap)
|
||||
return env.AddNodes(n, cfg, snap)
|
||||
}
|
||||
|
||||
type snapOverrideStorage struct {
|
||||
@ -63,9 +67,9 @@ func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) {
|
||||
|
||||
var _ raft.Storage = snapOverrideStorage{}
|
||||
|
||||
// AddNodes adds n new nodes initializes from the given snapshot (which may be
|
||||
// empty). They will be assigned consecutive IDs.
|
||||
func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
||||
// AddNodes adds n new nodes initialized from the given snapshot (which may be
|
||||
// empty), and using the cfg as template. They will be assigned consecutive IDs.
|
||||
func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error {
|
||||
bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{})
|
||||
for i := 0; i < n; i++ {
|
||||
id := uint64(1 + len(env.Nodes))
|
||||
@ -103,9 +107,10 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
||||
return fmt.Errorf("failed to establish first index %d; got %d", exp, fi)
|
||||
}
|
||||
}
|
||||
cfg := defaultRaftConfig(id, snap.Metadata.Index, s)
|
||||
cfg := cfg // fork the config stub
|
||||
cfg.ID, cfg.Storage = id, s
|
||||
if env.Options.OnConfig != nil {
|
||||
env.Options.OnConfig(cfg)
|
||||
env.Options.OnConfig(&cfg)
|
||||
if cfg.ID != id {
|
||||
// This could be supported but then we need to do more work
|
||||
// translating back and forth -- not worth it.
|
||||
@ -117,7 +122,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
||||
}
|
||||
cfg.Logger = env.Output
|
||||
|
||||
rn, err := raft.NewRawNode(cfg)
|
||||
rn, err := raft.NewRawNode(&cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -127,7 +132,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
||||
// TODO(tbg): allow a more general Storage, as long as it also allows
|
||||
// us to apply snapshots, append entries, and update the HardState.
|
||||
Storage: s,
|
||||
Config: cfg,
|
||||
Config: &cfg,
|
||||
History: []pb.Snapshot{snap},
|
||||
}
|
||||
env.Nodes = append(env.Nodes, node)
|
||||
|
190
raft/testdata/replicate_pause.txt
vendored
Normal file
190
raft/testdata/replicate_pause.txt
vendored
Normal file
@ -0,0 +1,190 @@
|
||||
# This test ensures that MsgApp stream to a follower is paused when the
|
||||
# in-flight state exceeds the configured limits. This is a regression test for
|
||||
# the issue fixed by https://github.com/etcd-io/etcd/pull/14633.
|
||||
|
||||
# Turn off output during the setup of the test.
|
||||
log-level none
|
||||
----
|
||||
ok
|
||||
|
||||
# Start with 3 nodes, with a limited in-flight capacity.
|
||||
add-nodes 3 voters=(1,2,3) index=10 inflight=3
|
||||
----
|
||||
ok
|
||||
|
||||
campaign 1
|
||||
----
|
||||
ok
|
||||
|
||||
stabilize
|
||||
----
|
||||
ok (quiet)
|
||||
|
||||
# Propose 3 entries.
|
||||
propose 1 prop_1_12
|
||||
----
|
||||
ok
|
||||
|
||||
propose 1 prop_1_13
|
||||
----
|
||||
ok
|
||||
|
||||
propose 1 prop_1_14
|
||||
----
|
||||
ok
|
||||
|
||||
# Store entries and send proposals.
|
||||
process-ready 1
|
||||
----
|
||||
ok (quiet)
|
||||
|
||||
# Re-enable log messages.
|
||||
log-level debug
|
||||
----
|
||||
ok
|
||||
|
||||
# Expect that in-flight tracking to nodes 2 and 3 is saturated.
|
||||
status 1
|
||||
----
|
||||
1: StateReplicate match=14 next=15
|
||||
2: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||
|
||||
log-level none
|
||||
----
|
||||
ok
|
||||
|
||||
# Commit entries between nodes 1 and 2.
|
||||
stabilize 1 2
|
||||
----
|
||||
ok (quiet)
|
||||
|
||||
log-level debug
|
||||
----
|
||||
ok
|
||||
|
||||
# Expect that the entries are committed and stored on nodes 1 and 2.
|
||||
status 1
|
||||
----
|
||||
1: StateReplicate match=14 next=15
|
||||
2: StateReplicate match=14 next=15
|
||||
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||
|
||||
# Drop append messages to node 3.
|
||||
deliver-msgs drop=3
|
||||
----
|
||||
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
|
||||
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
|
||||
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
|
||||
|
||||
|
||||
# Repeat committing 3 entries.
|
||||
propose 1 prop_1_15
|
||||
----
|
||||
ok
|
||||
|
||||
propose 1 prop_1_16
|
||||
----
|
||||
ok
|
||||
|
||||
propose 1 prop_1_17
|
||||
----
|
||||
ok
|
||||
|
||||
# In-flight tracking to nodes 2 and 3 is saturated, but node 3 is behind.
|
||||
status 1
|
||||
----
|
||||
1: StateReplicate match=14 next=15
|
||||
2: StateReplicate match=14 next=18 paused inflight=3[full]
|
||||
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||
|
||||
log-level none
|
||||
----
|
||||
ok
|
||||
|
||||
# Commit entries between nodes 1 and 2 again.
|
||||
stabilize 1 2
|
||||
----
|
||||
ok (quiet)
|
||||
|
||||
log-level debug
|
||||
----
|
||||
ok
|
||||
|
||||
# Expect that the entries are committed and stored only on nodes 1 and 2.
|
||||
status 1
|
||||
----
|
||||
1: StateReplicate match=17 next=18
|
||||
2: StateReplicate match=17 next=18
|
||||
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||
|
||||
# Make a heartbeat roundtrip.
|
||||
tick-heartbeat 1
|
||||
----
|
||||
ok
|
||||
|
||||
stabilize 1
|
||||
----
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
|
||||
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
|
||||
|
||||
stabilize 2 3
|
||||
----
|
||||
> 2 receiving messages
|
||||
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
|
||||
> 3 receiving messages
|
||||
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
|
||||
> 2 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
2->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||
> 3 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
3->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||
|
||||
# After handling heartbeat responses, node 1 sends a new replication MsgApp to a
|
||||
# throttled node 3 which hasn't yet replied to a single MsgApp.
|
||||
# TODO(pavelkalinnikov): this should not happen, send empty MsgApp instead.
|
||||
stabilize 1
|
||||
----
|
||||
> 1 receiving messages
|
||||
2->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||
3->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||
> 1 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"]
|
||||
|
||||
# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it.
|
||||
stabilize 3
|
||||
----
|
||||
> 3 receiving messages
|
||||
1->3 MsgApp Term:1 Log:1/14 Commit:17 Entries:[1/15 EntryNormal "prop_1_15", 1/16 EntryNormal "prop_1_16", 1/17 EntryNormal "prop_1_17"]
|
||||
DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1
|
||||
> 3 handling Ready
|
||||
Ready MustSync=false:
|
||||
Messages:
|
||||
3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11)
|
||||
|
||||
log-level none
|
||||
----
|
||||
ok
|
||||
|
||||
stabilize
|
||||
----
|
||||
ok (quiet)
|
||||
|
||||
log-level debug
|
||||
----
|
||||
ok
|
||||
|
||||
# Eventually all nodes catch up on the committed state.
|
||||
status 1
|
||||
----
|
||||
1: StateReplicate match=17 next=18
|
||||
2: StateReplicate match=17 next=18
|
||||
3: StateReplicate match=17 next=18
|
Loading…
x
Reference in New Issue
Block a user