From ea34f8dbc62a98f381fbbf4885e3046230902811 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 2 Feb 2021 15:39:03 -0600 Subject: [PATCH] Prepare `balancer` interfaces for `>=google.golang.org/grpc@1.30.0` upgrade. --- client/v3/balancer/balancer.go | 37 ++++++++++++++++++- client/v3/balancer/picker/err.go | 14 +++++++ .../v3/balancer/picker/roundrobin_balanced.go | 18 ++++++++- client/v3/go.mod | 1 + 4 files changed, 68 insertions(+), 2 deletions(-) diff --git a/client/v3/balancer/balancer.go b/client/v3/balancer/balancer.go index 3e63d6091..a7bedc459 100644 --- a/client/v3/balancer/balancer.go +++ b/client/v3/balancer/balancer.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/client/v3/balancer/connectivity" "go.etcd.io/etcd/client/v3/balancer/picker" + "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc/balancer" grpcconnectivity "google.golang.org/grpc/connectivity" @@ -31,6 +32,12 @@ import ( _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver ) +// NOTE: Ensure +// - `baseBalancer` satisfies `balancer.V2Balancer`. +var ( + _ balancer.V2Balancer = (*baseBalancer)(nil) +) + // Config defines balancer configurations. type Config struct { // Policy configures balancer policy. @@ -138,12 +145,29 @@ type baseBalancer struct { picker picker.Picker } +// UpdateClientConnState implements "grpc/balancer.V2Balancer" interface. +func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + return bb.handleResolvedWithError(ccs.ResolverState.Addresses, nil) +} + +// ResolverError implements "grpc/balancer.V2Balancer" interface. +func (bb *baseBalancer) ResolverError(err error) { + bb.HandleResolvedAddrs(nil, err) +} + // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. // gRPC sends initial or updated resolved addresses from "Build". func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + _ = bb.handleResolvedWithError(addrs, err) +} + +// handleResolvedWithError is an implementation shared both by `HandleResolvedAddrs()`, +// which is part of the `Balancer` interface as well as `UpdateClientConnState()`, +// which is part of the `V2Balancer` interface. +func (bb *baseBalancer) handleResolvedWithError(addrs []resolver.Address, err error) error { if err != nil { bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) - return + return err } bb.lg.Info("resolved", zap.String("picker", bb.picker.String()), @@ -155,12 +179,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) defer bb.mu.Unlock() resolved := make(map[resolver.Address]struct{}) + warnedErrors := []error{} for _, addr := range addrs { resolved[addr] = struct{}{} if _, ok := bb.addrToSc[addr]; !ok { sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) if err != nil { bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) + warnedErrors = append(warnedErrors, err) continue } bb.lg.Info("created subconn", zap.String("address", addr.Addr)) @@ -191,6 +217,15 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) // (DO NOT) delete(bb.scToSt, sc) } } + + // TODO: Consider just returning `ErrBadResolverState` if `warnedErrors` is + // not empty. + return multierr.Combine(warnedErrors...) +} + +// UpdateSubConnState implements "grpc/balancer.V2Balancer" interface. +func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { + bb.HandleSubConnStateChange(sc, s.ConnectivityState) } // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. diff --git a/client/v3/balancer/picker/err.go b/client/v3/balancer/picker/err.go index f4b941d65..2330e2c8f 100644 --- a/client/v3/balancer/picker/err.go +++ b/client/v3/balancer/picker/err.go @@ -20,6 +20,12 @@ import ( "google.golang.org/grpc/balancer" ) +// NOTE: Ensure +// - `errPickerV2` satisfies `balancer.V2Picker`. +var ( + _ balancer.V2Picker = (*errPickerV2)(nil) +) + // NewErr returns a picker that always returns err on "Pick". func NewErr(err error) Picker { return &errPicker{p: Error, err: err} @@ -37,3 +43,11 @@ func (ep *errPicker) String() string { func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { return nil, nil, ep.err } + +type errPickerV2 struct { + errPicker +} + +func (ep2 *errPickerV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{}, ep2.errPicker.err +} diff --git a/client/v3/balancer/picker/roundrobin_balanced.go b/client/v3/balancer/picker/roundrobin_balanced.go index e3971ecc4..5b1a17545 100644 --- a/client/v3/balancer/picker/roundrobin_balanced.go +++ b/client/v3/balancer/picker/roundrobin_balanced.go @@ -24,6 +24,12 @@ import ( "google.golang.org/grpc/resolver" ) +// NOTE: Ensure +// - `rrBalancedV2` satisfies `balancer.V2Picker`. +var ( + _ balancer.V2Picker = (*rrBalancedV2)(nil) +) + // newRoundrobinBalanced returns a new roundrobin balanced picker. func newRoundrobinBalanced(cfg Config) Picker { scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress)) @@ -52,7 +58,7 @@ type rrBalanced struct { func (rb *rrBalanced) String() string { return rb.p.String() } // Pick is called for every client request. -func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (rb *rrBalanced) Pick(_ context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { rb.mu.RLock() n := len(rb.scs) rb.mu.RUnlock() @@ -93,3 +99,13 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance } return sc, doneFunc, nil } + +type rrBalancedV2 struct { + rrBalanced +} + +func (rb2 *rrBalancedV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) { + sc, doneFunc, err := rb2.rrBalanced.Pick(context.TODO(), opts) + pr := balancer.PickResult{SubConn: sc, Done: doneFunc} + return pr, err +} diff --git a/client/v3/go.mod b/client/v3/go.mod index 928c335d7..1882f40a8 100644 --- a/client/v3/go.mod +++ b/client/v3/go.mod @@ -9,6 +9,7 @@ require ( github.com/prometheus/client_golang v1.5.1 go.etcd.io/etcd/api/v3 v3.5.0-pre go.etcd.io/etcd/pkg/v3 v3.5.0-pre + go.uber.org/multierr v1.5.0 go.uber.org/zap v1.16.0 google.golang.org/grpc v1.29.1 sigs.k8s.io/yaml v1.2.0