Merge pull request #14204 from lavacat/release-3.4-balancer-tests

clientv3/balance: fixed flaky balancer tests
This commit is contained in:
Benjamin Wang 2022-07-12 06:14:35 +08:00 committed by GitHub
commit d3dfc9b796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,7 +19,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"testing" "testing"
"time"
"go.etcd.io/etcd/clientv3/balancer/picker" "go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
@ -92,24 +91,25 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
return picked, err return picked, err
} }
prev, switches := "", 0 _, picked, err := warmupConnections(reqFunc, tc.serverCount)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
// verify that we round robin
prev, switches := picked, 0
for i := 0; i < tc.reqN; i++ { for i := 0; i < tc.reqN; i++ {
picked, err := reqFunc(context.Background()) picked, err = reqFunc(context.Background())
if err != nil { if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err) t.Fatalf("#%d: unexpected failure %v", i, err)
} }
if prev == "" {
prev = picked
continue
}
if prev != picked { if prev != picked {
switches++ switches++
} }
prev = picked prev = picked
} }
if tc.serverCount > 1 && switches < tc.reqN-3 { // -3 for initial resolutions if tc.serverCount > 1 && switches != tc.reqN {
// TODO: FIX ME t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
t.Skipf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
} }
}) })
} }
@ -162,25 +162,18 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
// stop first server, loads should be redistributed // stop first server, loads should be redistributed
// stopped server should never be picked // stopped server should never be picked
ms.StopAt(0) ms.StopAt(0)
available := make(map[string]struct{}) available, picked, err := warmupConnections(reqFunc, serverCount-1)
for i := 1; i < serverCount; i++ { if err != nil {
available[eps[i]] = struct{}{} t.Fatalf("Unexpected failure %v", err)
} }
reqN := 10 reqN := 10
prev, switches := "", 0 prev, switches := picked, 0
for i := 0; i < reqN; i++ { for i := 0; i < reqN; i++ {
picked, err := reqFunc(context.Background()) picked, err = reqFunc(context.Background())
if err != nil && strings.Contains(err.Error(), "transport is closing") { if err != nil && strings.Contains(err.Error(), "transport is closing") {
continue continue
} }
if prev == "" { // first failover
if eps[0] == picked {
t.Fatalf("expected failover from %q, picked %q", eps[0], picked)
}
prev = picked
continue
}
if _, ok := available[picked]; !ok { if _, ok := available[picked]; !ok {
t.Fatalf("picked unavailable address %q (available %v)", picked, available) t.Fatalf("picked unavailable address %q (available %v)", picked, available)
} }
@ -189,18 +182,19 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
} }
prev = picked prev = picked
} }
if switches < reqN-3 { // -3 for initial resolutions + failover if switches != reqN {
// TODO: FIX ME! t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
t.Skipf("expected balanced loads for %d requests, got switches %d", reqN, switches)
} }
// now failed server comes back // now failed server comes back
ms.StartAt(0) ms.StartAt(0)
// enough time for reconnecting to recovered server available, picked, err = warmupConnections(reqFunc, serverCount)
time.Sleep(time.Second) if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
prev, switches = "", 0 prev, switches = picked, 0
recoveredAddr, recovered := eps[0], 0 recoveredAddr, recovered := eps[0], 0
available[recoveredAddr] = struct{}{} available[recoveredAddr] = struct{}{}
@ -209,10 +203,6 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err) t.Fatalf("#%d: unexpected failure %v", i, err)
} }
if prev == "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok { if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
} }
@ -224,10 +214,10 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
} }
prev = picked prev = picked
} }
if switches < reqN-3 { // -3 for initial resolutions if switches != 2*reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
} }
if recovered < reqN/serverCount { if recovered != 2*reqN/serverCount {
t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered) t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered)
} }
} }
@ -242,11 +232,10 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
} }
defer ms.Stop() defer ms.Stop()
var eps []string var eps []string
available := make(map[string]struct{})
for _, svr := range ms.Servers { for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr) eps = append(eps, svr.ResolverAddress().Addr)
available[svr.Address] = struct{}{}
} }
rsv, err := endpoint.NewResolverGroup("requestfail") rsv, err := endpoint.NewResolverGroup("requestfail")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -277,6 +266,11 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
return picked, err return picked, err
} }
available, picked, err := warmupConnections(reqFunc, serverCount)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
reqN := 20 reqN := 20
prev, switches := "", 0 prev, switches := "", 0
for i := 0; i < reqN; i++ { for i := 0; i < reqN; i++ {
@ -285,17 +279,13 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
if i%2 == 0 { if i%2 == 0 {
cancel() cancel()
} }
picked, err := reqFunc(ctx) picked, err = reqFunc(ctx)
if i%2 == 0 { if i%2 == 0 {
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled || picked != "" { if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled {
t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err) t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err)
} }
continue continue
} }
if prev == "" && picked != "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok { if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
} }
@ -304,7 +294,25 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
} }
prev = picked prev = picked
} }
if switches < reqN/2-3 { // -3 for initial resolutions + failover if switches != reqN/2 {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
} }
} }
type reqFuncT = func(ctx context.Context) (picked string, err error)
func warmupConnections(reqFunc reqFuncT, serverCount int) (map[string]struct{}, string, error) {
var picked string
var err error
available := make(map[string]struct{})
// cycle through all peers to indirectly verify that balancer subconn list is fully loaded
// otherwise we can't reliably count switches between 'picked' peers in the test assert phase
for len(available) < serverCount {
picked, err = reqFunc(context.Background())
if err != nil {
return available, picked, err
}
available[picked] = struct{}{}
}
return available, picked, err
}