integration: Fix flakes of TestV3WatchRestoreSnapshotUnsync

```
```

The flakes manifested as:
```
--- FAIL: TestV3WatchRestoreSnapshotUnsync (3.59s)
    v3_watch_restore_test.go:82: inflight snapshot sends expected 0 or 1, got ""
FAIL
coverage: 55.2% of statements
FAIL	go.etcd.io/etcd/v3/integration	3.646s
FAIL
```

The root reason is that all the SnapMsg processing happends on both ends
(leader, follower) assynchronously in goroutines, e.g. on Fifo schedule
within EtcdServer.run, so when we observe through metrics, we don't
know whether it finised (or even got started).

Idally we should have EtcdServer.Drain() call that exits when the server
processed or internal 'queues' and is idle.
This commit is contained in:
Piotr Tabor 2020-10-02 20:35:51 +02:00
parent 98b123f034
commit 97820f1c6e
2 changed files with 45 additions and 12 deletions

View File

@ -429,9 +429,11 @@ func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
}
}
// WaitLeader returns index of the member in c.Members that is leader (or -1).
func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
// waitLeader waits until given members agree on the same leader.
// waitLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list (or -1).
func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
possibleLead := make(map[uint64]bool)
var lead uint64

View File

@ -23,6 +23,28 @@ import (
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
)
// MustFetchNotEmptyMetric attempts to fetch given 'metric' from 'member',
// waiting for not-empty value or 'timeout'.
func MustFetchNotEmptyMetric(tb testing.TB, member *member, metric string, timeout <-chan time.Time) string {
metricValue := ""
tick := time.Tick(tickDuration)
for metricValue == "" {
tb.Logf("Waiting for metric: %v", metric)
select {
case <-timeout:
tb.Fatalf("Failed to fetch metric %v", metric)
return ""
case <-tick:
var err error
metricValue, err = member.Metric(metric)
if err != nil {
tb.Fatal(err)
}
}
}
return metricValue
}
// TestV3WatchRestoreSnapshotUnsync tests whether slow follower can restore
// from leader snapshot, and still notify on watchers from an old revision
// that were created in synced watcher group in the first place.
@ -55,8 +77,11 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
clus.Members[0].InjectPartition(t, clus.Members[1:]...)
clus.waitLeader(t, clus.Members[1:])
initialLead := clus.waitLeader(t, clus.Members[1:])
t.Logf("elected lead: %v", clus.Members[initialLead].s.ID())
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")
kvc := toGRPC(clus.Client(1)).KV
@ -71,26 +96,32 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
// trigger snapshot send from leader to this slow follower
// which then calls watchable store Restore
clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
// We don't expect leadership change here, just recompute the leader's index
// within clus.Members list.
lead := clus.WaitLeader(t)
sends, err := clus.Members[lead].Metric("etcd_network_snapshot_send_inflights_total")
if err != nil {
t.Fatal(err)
}
if sends != "0" && sends != "1" {
// Sending is scheduled on fifo 'sched' within EtcdServer::run,
// so it can start delayed after recovery.
send := MustFetchNotEmptyMetric(t, clus.Members[lead],
"etcd_network_snapshot_send_inflights_total",
time.After(5*time.Second))
if send != "0" && send != "1" {
// 0 if already sent, 1 if sending
t.Fatalf("inflight snapshot sends expected 0 or 1, got %q", sends)
}
receives, err := clus.Members[(lead+1)%3].Metric("etcd_network_snapshot_receive_inflights_total")
if err != nil {
t.Fatal(err)
t.Fatalf("inflight snapshot snapshot_send_inflights_total expected 0 or 1, got %q", send)
}
receives := MustFetchNotEmptyMetric(t, clus.Members[(lead+1)%3],
"etcd_network_snapshot_receive_inflights_total",
time.After(5*time.Second))
if receives != "0" && receives != "1" {
// 0 if already received, 1 if receiving
t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives)
}
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")
// slow follower now applies leader snapshot
// should be able to notify on old-revision watchers in unsynced