Deflaking: Make WaitLeader (and WaitMembersForLeader) aggressively (30s) wait for leader being established.

Nearly none of the tests was checking the value... just assuming WaitLeader success.

```
    maintenance_test.go:277: Waiting for leader...
    logger.go:130: 2022-04-03T08:01:09.914+0200	INFO	m0	cluster version differs from storage version.	{"member": "m0", "cluster-version": "3.6.0", "storage-version": "3.5.0"}
    logger.go:130: 2022-04-03T08:01:09.915+0200	WARN	m0	leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk	{"member": "m0", "to": "2acc3d3b521981", "heartbeat-interval": "10ms", "expected-duration": "20ms", "exceeded-duration": "103.756219ms"}
    logger.go:130: 2022-04-03T08:01:09.916+0200	INFO	m0	updated storage version	{"member": "m0", "new-storage-version": "3.6.0"}
    ...
    logger.go:130: 2022-04-03T08:01:09.926+0200	INFO	grpc	[[roundrobin] roundrobinPicker: Build called with info: {map[0xc002630ac0:{{unix:localhost:m0 localhost <nil> 0 <nil>}} 0xc002630af0:{{unix:localhost:m1 localhost <nil> 0 <nil>}} 0xc002630b20:{{unix:localhost:m2 localhost <nil> 0 <nil>}}]}]
    logger.go:130: 2022-04-03T08:01:09.926+0200	WARN	m0	apply request took too long	{"member": "m0", "took": "114.661766ms", "expected-duration": "100ms", "prefix": "", "request": "header:<ID:12658633312866157316 > cluster_version_set:<ver:\"3.6.0\" > ", "response": ""}
    logger.go:130: 2022-04-03T08:01:09.927+0200	INFO	m0	cluster version is updated	{"member": "m0", "cluster-version": "3.6"}
    logger.go:130: 2022-04-03T08:01:09.955+0200	INFO	m2.raft	9f96af25a04e2ec3 [logterm: 2, index: 8, vote: 9903a56eaf96afac] ignored MsgVote from 2acc3d3b521981 [logterm: 2, index: 8] at term 2: lease is not expired (remaining ticks: 10)	{"member": "m2"}
    logger.go:130: 2022-04-03T08:01:09.955+0200	INFO	m0.raft	9903a56eaf96afac [logterm: 2, index: 8, vote: 9903a56eaf96afac] ignored MsgVote from 2acc3d3b521981 [logterm: 2, index: 8] at term 2: lease is not expired (remaining ticks: 5)	{"member": "m0"}
    logger.go:130: 2022-04-03T08:01:09.955+0200	INFO	m0.raft	9903a56eaf96afac [term: 2] received a MsgAppResp message with higher term from 2acc3d3b521981 [term: 3]	{"member": "m0"}
    logger.go:130: 2022-04-03T08:01:09.955+0200	INFO	m0.raft	9903a56eaf96afac became follower at term 3	{"member": "m0"}
    logger.go:130: 2022-04-03T08:01:09.955+0200	INFO	m0.raft	raft.node: 9903a56eaf96afac lost leader 9903a56eaf96afac at term 3	{"member": "m0"}
    maintenance_test.go:279: Leader established.
```

Tmp
This commit is contained in:
Piotr Tabor 2022-04-03 09:17:10 +02:00
parent 2fab3f3ae5
commit d57f8dba62
3 changed files with 43 additions and 13 deletions

View File

@ -398,12 +398,43 @@ func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {
}
}
// WaitLeader returns index of the member in c.Members that is leader (or -1).
func (c *Cluster) WaitLeader(t testutil.TB) int { return c.WaitMembersForLeader(t, c.Members) }
// WaitLeader returns index of the member in c.Members that is leader
// or fails the test (if not established in 30min).
func (c *Cluster) WaitLeader(t testutil.TB) int {
return c.WaitMembersForLeader(t, c.Members)
}
// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list (or -1).
// and returns its 'index' in the 'membs' list
func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
t.Logf("WaitMembersForLeader")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l := 0
for l = c.waitMembersForLeader(t, ctx, membs); l < 0; {
if ctx.Err() != nil {
t.Fatal("WaitLeader FAILED: %v", ctx.Err())
}
}
t.Logf("WaitMembersForLeader succeeded. Cluster leader index: %v", l)
// TODO: Consider second pass check as sometimes leadership is lost
// soon after election:
//
// We perform multiple attempts, as some-times just after successful WaitLLeader
// there is a race and leadership is quickly lost:
// - MsgAppResp message with higher term from 2acc3d3b521981 [term: 3] {"member": "m0"}
// - 9903a56eaf96afac became follower at term 3 {"member": "m0"}
// - 9903a56eaf96afac lost leader 9903a56eaf96afac at term 3 {"member": "m0"}
return l
}
// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) waitMembersForLeader(t testutil.TB, ctx context.Context, membs []*Member) int {
t.Logf("WaitMembersForLeader...")
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
@ -415,7 +446,7 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
}
// ensure leader is up via linearizable get
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*TickDuration+time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*TickDuration+time.Second)
_, err := cc.Get(ctx, "0")
cancel()
if err == nil || strings.Contains(err.Error(), "Key not found") {
@ -442,10 +473,12 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
for i, m := range membs {
if uint64(m.Server.ID()) == lead {
t.Logf("WaitMembersForLeader found leader. Member: %v lead: %x", i, lead)
return i
}
}
t.Logf("WaitMembersForLeader FAILED (-1)")
return -1
}

View File

@ -274,18 +274,22 @@ func TestMaintenanceStatus(t *testing.T) {
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
t.Logf("Waiting for leader...")
clus.WaitLeader(t)
t.Logf("Leader established.")
eps := make([]string, 3)
for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCURL()
}
t.Logf("Creating client...")
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
t.Logf("Creating client [DONE]")
prevID, leaderFound := uint64(0), false
for i := 0; i < 3; i++ {
@ -293,6 +297,7 @@ func TestMaintenanceStatus(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Logf("Response from %v: %v", i, resp)
if prevID == 0 {
prevID, leaderFound = resp.Header.MemberId, resp.Header.MemberId == resp.Leader
continue

View File

@ -415,15 +415,7 @@ func TestWatchResumeCompacted(t *testing.T) {
}
clus.Members[0].Stop(t)
ticker := time.After(time.Second * 10)
for clus.WaitLeader(t) <= 0 {
select {
case <-ticker:
t.Fatalf("failed to wait for new leader")
default:
time.Sleep(10 * time.Millisecond)
}
}
clus.WaitLeader(t)
// put some data and compact away
numPuts := 5