clientv3/balancer: initial commit

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-03-19 11:17:00 -07:00
parent 6e2bf40015
commit 7fe4a08fdc
11 changed files with 682 additions and 5 deletions

View File

@ -0,0 +1,275 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"fmt"
"sync"
"github.com/coreos/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// Balancer defines client balancer interface.
type Balancer interface {
// Builder is called at the beginning to initialize sub-connection states and picker.
balancer.Builder
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer
// Picker calls "Pick" for every client request.
picker.Picker
// SetEndpoints updates client's endpoints.
SetEndpoints(eps ...string)
}
type baseBalancer struct {
policy picker.Policy
name string
lg *zap.Logger
mu sync.RWMutex
eps []string
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State
currrentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator
picker.Picker
}
// New returns a new balancer from specified picker policy.
func New(cfg Config) Balancer {
bb := &baseBalancer{
policy: cfg.Policy,
name: cfg.Policy.String(),
lg: cfg.Logger,
eps: cfg.Endpoints,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
currrentConn: nil,
csEvltr: &connectivityStateEvaluator{},
// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if cfg.Name != "" {
bb.name = cfg.Name
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}
balancer.Register(bb)
bb.lg.Info(
"registered balancer",
zap.String("policy", bb.policy.String()),
zap.String("name", bb.name),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (bb *baseBalancer) Name() string { return bb.name }
// Build implements "grpc/balancer.Builder" interface.
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addreses will be handled via "HandleResolvedAddrs".
func (bb *baseBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
// TODO: support multiple connections
bb.mu.Lock()
bb.currrentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.Error(err))
return
}
bb.lg.Info("resolved", zap.Strings("addresses", addrsToStrings(addrs)))
bb.mu.Lock()
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currrentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
sc.Connect()
}
}
for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currrentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)
bb.lg.Info(
"removed subconn",
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("subconn", scToString(sc)),
zap.String("state", s.String()),
)
return
}
bb.lg.Info(
"state changed",
zap.Bool("connected", s == connectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)
bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
}
bb.currrentConn.UpdateBalancerState(bb.currentState, bb.Picker)
return
}
func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
scToAddr[sc] = addr
}
}
switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}
bb.lg.Info(
"generated picker",
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
)
}
// SetEndpoints updates client's endpoints.
// TODO: implement this
func (bb *baseBalancer) SetEndpoints(eps ...string) {
addrs := epsToAddrs(eps...)
bb.mu.Lock()
bb.Picker.UpdateAddrs(addrs)
bb.mu.Unlock()
}
// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}

View File

@ -0,0 +1,39 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"github.com/coreos/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
)
// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy
// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string
// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
// Endpoints is a list of server endpoints.
Endpoints []string
}

View File

@ -0,0 +1,58 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import "google.golang.org/grpc/connectivity"
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
//
// recordTransition should only be called synchronously from the same goroutine.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

View File

@ -30,7 +30,7 @@ import (
var endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
func TestBalancerGetUnblocking(t *testing.T) {
func TestOldHealthBalancerGetUnblocking(t *testing.T) {
hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil })
defer hb.Close()
if addrs := <-hb.Notify(); len(addrs) != len(endpoints) {
@ -74,7 +74,7 @@ func TestBalancerGetUnblocking(t *testing.T) {
}
}
func TestBalancerGetBlocking(t *testing.T) {
func TestOldHealthBalancerGetBlocking(t *testing.T) {
hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil })
defer hb.Close()
if addrs := <-hb.Notify(); len(addrs) != len(endpoints) {
@ -131,9 +131,9 @@ func TestBalancerGetBlocking(t *testing.T) {
}
}
// TestHealthBalancerGraylist checks one endpoint is tried after the other
// TestOldHealthBalancerGraylist checks one endpoint is tried after the other
// due to gray listing.
func TestHealthBalancerGraylist(t *testing.T) {
func TestOldHealthBalancerGraylist(t *testing.T) {
var wg sync.WaitGroup
// Use 3 endpoints so gray list doesn't fallback to all connections
// after failing on 2 endpoints.
@ -192,7 +192,7 @@ func TestHealthBalancerGraylist(t *testing.T) {
// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
// See issue: https://github.com/coreos/etcd/issues/7283 for more detail.
func TestBalancerDoNotBlockOnClose(t *testing.T) {
func TestOldHealthBalancerDoNotBlockOnClose(t *testing.T) {
defer testutil.AfterTest(t)
kcl := newKillConnListener(t, 3)

View File

@ -0,0 +1,16 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package picker defines/implements client balancer picker policy.
package picker

View File

@ -0,0 +1,39 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{err: err}
}
type errPicker struct {
err error
}
func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
func (p *errPicker) UpdateAddrs(addrs []resolver.Address) {
return
}

View File

@ -0,0 +1,31 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
// UpdateAddrs updates current endpoints in picker.
// Used when endpoints are updated manually.
// TODO: handle resolver target change
// TODO: handle resolved addresses change
UpdateAddrs(addrs []resolver.Address)
}

View File

@ -0,0 +1,49 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import "fmt"
// Policy defines balancer picker policy.
type Policy uint8
const (
// TODO: custom picker is not supported yet.
// custom defines custom balancer picker.
custom Policy = iota
// RoundrobinBalanced balance loads over multiple endpoints
// and implements failover in roundrobin fashion.
RoundrobinBalanced Policy = iota
// TODO: only send loads to pinned address "RoundrobinFailover"
// just like how 3.3 client works
//
// TODO: priotize leader
// TODO: health-check
// TODO: weighted roundrobin
// TODO: power of two random choice
)
func (p Policy) String() string {
switch p {
case custom:
panic("'custom' picker policy is not supported yet")
case RoundrobinBalanced:
return "etcd-client-roundrobin-balanced"
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
}
}

View File

@ -0,0 +1,105 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// NewRoundrobinBalanced returns a new roundrobin balanced picker.
func NewRoundrobinBalanced(
lg *zap.Logger,
scs []balancer.SubConn,
addrToSc map[resolver.Address]balancer.SubConn,
scToAddr map[balancer.SubConn]resolver.Address,
) Picker {
return &rrBalanced{
lg: lg,
scs: scs,
addrToSc: addrToSc,
scToAddr: scToAddr,
}
}
type rrBalanced struct {
lg *zap.Logger
mu sync.RWMutex
next int
scs []balancer.SubConn
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
updateAddrs func(addrs []resolver.Address)
}
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
if n == 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
rb.mu.Lock()
cur := rb.next
sc := rb.scs[cur]
picked := rb.scToAddr[sc].Addr
rb.next = (rb.next + 1) % len(rb.scs)
rb.mu.Unlock()
rb.lg.Debug(
"picked",
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
)
doneFunc := func(info balancer.DoneInfo) {
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
zap.Bool("bytes-received", info.BytesReceived),
}
if info.Err == nil {
rb.lg.Debug("balancer done", fss...)
} else {
rb.lg.Warn("balancer failed", fss...)
}
}
return sc, doneFunc, nil
}
// UpdateAddrs
// TODO: implement this
func (rb *rrBalanced) UpdateAddrs(addrs []resolver.Address) {
rb.mu.Lock()
// close all resolved sub-connections first
for _, sc := range rb.scs {
sc.UpdateAddresses([]resolver.Address{})
}
rb.mu.Unlock()
}

View File

@ -0,0 +1,45 @@
package balancer
import (
"fmt"
"net/url"
"sort"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
func scToString(sc balancer.SubConn) string {
return fmt.Sprintf("%p", sc)
}
func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) {
ss = make([]string, 0, len(scs))
for a, sc := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)
return ss
}
func addrsToStrings(addrs []resolver.Address) (ss []string) {
ss = make([]string, len(addrs))
for i := range addrs {
ss[i] = addrs[i].Addr
}
sort.Strings(ss)
return ss
}
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
u, err := url.Parse(ep)
if err != nil {
addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
continue
}
addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
}
return addrs
}

View File

@ -0,0 +1,20 @@
package balancer
import (
"reflect"
"testing"
"google.golang.org/grpc/resolver"
)
func Test_epsToAddrs(t *testing.T) {
eps := []string{"https://example.com:2379", "127.0.0.1:2379"}
exp := []resolver.Address{
{Addr: "example.com:2379", Type: resolver.Backend},
{Addr: "127.0.0.1:2379", Type: resolver.Backend},
}
rs := epsToAddrs(eps...)
if !reflect.DeepEqual(rs, exp) {
t.Fatalf("expected %v, got %v", exp, rs)
}
}