diff --git a/clientv3/balancer/balancer_test.go b/clientv3/balancer/balancer_test.go index 5ae0e660b..bf9aad93a 100644 --- a/clientv3/balancer/balancer_test.go +++ b/clientv3/balancer/balancer_test.go @@ -17,7 +17,9 @@ package balancer import ( "context" "fmt" + "strings" "testing" + "time" "github.com/coreos/etcd/clientv3/balancer/picker" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -25,9 +27,11 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" ) // TestRoundRobinBalancedResolvableNoFailover ensures that @@ -103,3 +107,189 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) { }) } } + +// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that +// loads be rebalanced while one server goes down and comes back. +func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) { + serverCount := 5 + ms, err := mockserver.StartMockServers(serverCount) + if err != nil { + t.Fatalf("failed to start mock servers: %s", err) + } + defer ms.Stop() + var resolvedAddrs []resolver.Address + for _, svr := range ms.Servers { + resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address}) + } + + rsv, closeResolver := manual.GenerateAndRegisterManualResolver() + defer closeResolver() + cfg := Config{ + Policy: picker.RoundrobinBalanced, + Name: genName(), + Logger: zap.NewExample(), + Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())}, + } + rrb := New(cfg) + conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) + if err != nil { + t.Fatalf("failed to dial mock server: %s", err) + } + defer conn.Close() + rsv.NewAddress(resolvedAddrs) + cli := pb.NewKVClient(conn) + + reqFunc := func(ctx context.Context) (picked string, err error) { + var p peer.Peer + _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) + if p.Addr != nil { + picked = p.Addr.String() + } + return picked, err + } + + // stop first server, loads should be redistributed + // stopped server should never be picked + ms.StopAt(0) + available := make(map[string]struct{}) + for i := 1; i < serverCount; i++ { + available[resolvedAddrs[i].Addr] = struct{}{} + } + + reqN := 10 + prev, switches := "", 0 + for i := 0; i < reqN; i++ { + picked, err := reqFunc(context.Background()) + if err != nil && strings.Contains(err.Error(), "transport is closing") { + continue + } + if prev == "" { // first failover + if resolvedAddrs[0].Addr == picked { + t.Fatalf("expected failover from %q, picked %q", resolvedAddrs[0].Addr, picked) + } + prev = picked + continue + } + if _, ok := available[picked]; !ok { + t.Fatalf("picked unavailable address %q (available %v)", picked, available) + } + if prev != picked { + switches++ + } + prev = picked + } + if switches < reqN-3 { // -3 for initial resolutions + failover + t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) + } + + // now failed server comes back + ms.StartAt(0) + + // enough time for reconnecting to recovered server + time.Sleep(time.Second) + + prev, switches = "", 0 + recoveredAddr, recovered := resolvedAddrs[0].Addr, 0 + available[recoveredAddr] = struct{}{} + + for i := 0; i < 2*reqN; i++ { + picked, err := reqFunc(context.Background()) + if err != nil { + t.Fatalf("#%d: unexpected failure %v", i, err) + } + if prev == "" { + prev = picked + continue + } + if _, ok := available[picked]; !ok { + t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) + } + if prev != picked { + switches++ + } + if picked == recoveredAddr { + recovered++ + } + prev = picked + } + if switches < reqN-3 { // -3 for initial resolutions + t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) + } + if recovered < reqN/serverCount { + t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered) + } +} + +// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that +// loads be rebalanced while some requests are failed. +func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) { + serverCount := 5 + ms, err := mockserver.StartMockServers(serverCount) + if err != nil { + t.Fatalf("failed to start mock servers: %s", err) + } + defer ms.Stop() + var resolvedAddrs []resolver.Address + available := make(map[string]struct{}) + for _, svr := range ms.Servers { + resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address}) + available[svr.Address] = struct{}{} + } + + rsv, closeResolver := manual.GenerateAndRegisterManualResolver() + defer closeResolver() + cfg := Config{ + Policy: picker.RoundrobinBalanced, + Name: genName(), + Logger: zap.NewExample(), + Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())}, + } + rrb := New(cfg) + conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) + if err != nil { + t.Fatalf("failed to dial mock server: %s", err) + } + defer conn.Close() + rsv.NewAddress(resolvedAddrs) + cli := pb.NewKVClient(conn) + + reqFunc := func(ctx context.Context) (picked string, err error) { + var p peer.Peer + _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) + if p.Addr != nil { + picked = p.Addr.String() + } + return picked, err + } + + reqN := 20 + prev, switches := "", 0 + for i := 0; i < reqN; i++ { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if i%2 == 0 { + cancel() + } + picked, err := reqFunc(ctx) + if i%2 == 0 { + if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled || picked != "" { + t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err) + } + continue + } + if prev == "" && picked != "" { + prev = picked + continue + } + if _, ok := available[picked]; !ok { + t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) + } + if prev != picked { + switches++ + } + prev = picked + } + if switches < reqN/2-3 { // -3 for initial resolutions + failover + t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) + } +}