Merge pull request #12658 from dhermes/pr-12636-balancer

Prepare `balancer` interfaces for `>=google.golang.org/grpc@1.30.0` upgrade.
This commit is contained in:
Piotr Tabor 2021-02-02 23:03:56 +01:00 committed by GitHub
commit 6881ea828e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/client/v3/balancer/connectivity" "go.etcd.io/etcd/client/v3/balancer/connectivity"
"go.etcd.io/etcd/client/v3/balancer/picker" "go.etcd.io/etcd/client/v3/balancer/picker"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity" grpcconnectivity "google.golang.org/grpc/connectivity"
@ -31,6 +32,12 @@ import (
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
) )
// NOTE: Ensure
// - `baseBalancer` satisfies `balancer.V2Balancer`.
var (
_ balancer.V2Balancer = (*baseBalancer)(nil)
)
// Config defines balancer configurations. // Config defines balancer configurations.
type Config struct { type Config struct {
// Policy configures balancer policy. // Policy configures balancer policy.
@ -138,12 +145,29 @@ type baseBalancer struct {
picker picker.Picker picker picker.Picker
} }
// UpdateClientConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
return bb.handleResolvedWithError(ccs.ResolverState.Addresses, nil)
}
// ResolverError implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.HandleResolvedAddrs(nil, err)
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build". // gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_ = bb.handleResolvedWithError(addrs, err)
}
// handleResolvedWithError is an implementation shared both by `HandleResolvedAddrs()`,
// which is part of the `Balancer` interface as well as `UpdateClientConnState()`,
// which is part of the `V2Balancer` interface.
func (bb *baseBalancer) handleResolvedWithError(addrs []resolver.Address, err error) error {
if err != nil { if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return return err
} }
bb.lg.Info("resolved", bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()), zap.String("picker", bb.picker.String()),
@ -155,12 +179,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
defer bb.mu.Unlock() defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{}) resolved := make(map[resolver.Address]struct{})
warnedErrors := []error{}
for _, addr := range addrs { for _, addr := range addrs {
resolved[addr] = struct{}{} resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok { if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil { if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
warnedErrors = append(warnedErrors, err)
continue continue
} }
bb.lg.Info("created subconn", zap.String("address", addr.Addr)) bb.lg.Info("created subconn", zap.String("address", addr.Addr))
@ -191,6 +217,15 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
// (DO NOT) delete(bb.scToSt, sc) // (DO NOT) delete(bb.scToSt, sc)
} }
} }
// TODO: Consider just returning `ErrBadResolverState` if `warnedErrors` is
// not empty.
return multierr.Combine(warnedErrors...)
}
// UpdateSubConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
bb.HandleSubConnStateChange(sc, s.ConnectivityState)
} }
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.

View File

@ -20,6 +20,12 @@ import (
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
) )
// NOTE: Ensure
// - `errPickerV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*errPickerV2)(nil)
)
// NewErr returns a picker that always returns err on "Pick". // NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker { func NewErr(err error) Picker {
return &errPicker{p: Error, err: err} return &errPicker{p: Error, err: err}
@ -37,3 +43,11 @@ func (ep *errPicker) String() string {
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err return nil, nil, ep.err
} }
type errPickerV2 struct {
errPicker
}
func (ep2 *errPickerV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, ep2.errPicker.err
}

View File

@ -24,6 +24,12 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
// NOTE: Ensure
// - `rrBalancedV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*rrBalancedV2)(nil)
)
// newRoundrobinBalanced returns a new roundrobin balanced picker. // newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker { func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress)) scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
@ -52,7 +58,7 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() } func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request. // Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { func (rb *rrBalanced) Pick(_ context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock() rb.mu.RLock()
n := len(rb.scs) n := len(rb.scs)
rb.mu.RUnlock() rb.mu.RUnlock()
@ -93,3 +99,13 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
} }
return sc, doneFunc, nil return sc, doneFunc, nil
} }
type rrBalancedV2 struct {
rrBalanced
}
func (rb2 *rrBalancedV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
sc, doneFunc, err := rb2.rrBalanced.Pick(context.TODO(), opts)
pr := balancer.PickResult{SubConn: sc, Done: doneFunc}
return pr, err
}

View File

@ -9,6 +9,7 @@ require (
github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_golang v1.5.1
go.etcd.io/etcd/api/v3 v3.5.0-pre go.etcd.io/etcd/api/v3 v3.5.0-pre
go.etcd.io/etcd/pkg/v3 v3.5.0-pre go.etcd.io/etcd/pkg/v3 v3.5.0-pre
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0 go.uber.org/zap v1.16.0
google.golang.org/grpc v1.29.1 google.golang.org/grpc v1.29.1
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0