diff --git a/clientv3/balancer.go b/clientv3/balancer.go index eb28b4fef..a54e78622 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -36,28 +36,6 @@ const ( notifyNext ) -type balancer interface { - grpc.Balancer - ConnectNotify() <-chan struct{} - - endpoint(hostPort string) string - endpoints() []string - // pinned returns the current pinned endpoint. - pinned() string - // hostPortError handles error from server-side. - hostPortError(hostPort string, err error) - - // up is Up but includes whether the balancer will use the connection. - up(addr grpc.Address) (func(error), bool) - - // updateAddrs changes the balancer's endpoints. - updateAddrs(endpoints ...string) - // ready returns a channel that closes when the balancer first connects. - ready() <-chan struct{} - // next forces the balancer to switch endpoints. - next() -} - // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { @@ -152,8 +130,6 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } -func (b *simpleBalancer) hostPortError(hostPort string, err error) { return } - func getHostPort2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) for i := range eps { diff --git a/clientv3/client.go b/clientv3/client.go index 317990f7e..bff7d7cc6 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -55,7 +55,7 @@ type Client struct { cfg Config creds *credentials.TransportCredentials - balancer balancer + balancer *healthBalancer mu sync.Mutex ctx context.Context diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index 48e0d5958..d1846b151 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -33,7 +33,7 @@ type healthCheckFunc func(ep string) (bool, error) // healthBalancer wraps a balancer so that it uses health checking // to choose its endpoints. type healthBalancer struct { - balancer + *simpleBalancer // healthCheck checks an endpoint's health. healthCheck healthCheckFunc @@ -59,15 +59,15 @@ type healthBalancer struct { wg sync.WaitGroup } -func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { +func newHealthBalancer(b *simpleBalancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { hb := &healthBalancer{ - balancer: b, - healthCheck: hc, - eps: b.endpoints(), - addrs: eps2addrs(b.endpoints()), - hostPort2ep: getHostPort2ep(b.endpoints()), - unhealthy: make(map[string]time.Time), - stopc: make(chan struct{}), + simpleBalancer: b, + healthCheck: hc, + eps: b.endpoints(), + addrs: eps2addrs(b.endpoints()), + hostPort2ep: getHostPort2ep(b.endpoints()), + unhealthy: make(map[string]time.Time), + stopc: make(chan struct{}), } if timeout < minHealthRetryDuration { timeout = minHealthRetryDuration @@ -107,13 +107,13 @@ func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) { if !hb.mayPin(addr) { return func(err error) {}, false } - return hb.balancer.up(addr) + return hb.simpleBalancer.up(addr) } func (hb *healthBalancer) Close() error { hb.stopOnce.Do(func() { close(hb.stopc) }) hb.wg.Wait() - return hb.balancer.Close() + return hb.simpleBalancer.Close() } func (hb *healthBalancer) updateAddrs(eps ...string) { @@ -122,7 +122,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep hb.unhealthy = make(map[string]time.Time) hb.mu.Unlock() - hb.balancer.updateAddrs(eps...) + hb.simpleBalancer.updateAddrs(eps...) } func (hb *healthBalancer) endpoint(host string) string { @@ -162,7 +162,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { for _, addr := range hb.liveAddrs() { eps = append(eps, hb.endpoint(addr.Addr)) } - hb.balancer.updateAddrs(eps...) + hb.simpleBalancer.updateAddrs(eps...) case <-hb.stopc: return }