Merge pull request #8728 from gyuho/eee

clientv3: remove balancer interface
This commit is contained in:
Gyu-Ho Lee 2017-10-20 16:43:32 -07:00 committed by GitHub
commit 785a5a11ed
3 changed files with 14 additions and 38 deletions

View File

@ -36,28 +36,6 @@ const (
notifyNext
)
type balancer interface {
grpc.Balancer
ConnectNotify() <-chan struct{}
endpoint(hostPort string) string
endpoints() []string
// pinned returns the current pinned endpoint.
pinned() string
// hostPortError handles error from server-side.
hostPortError(hostPort string, err error)
// up is Up but includes whether the balancer will use the connection.
up(addr grpc.Address) (func(error), bool)
// updateAddrs changes the balancer's endpoints.
updateAddrs(endpoints ...string)
// ready returns a channel that closes when the balancer first connects.
ready() <-chan struct{}
// next forces the balancer to switch endpoints.
next()
}
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
@ -152,8 +130,6 @@ func (b *simpleBalancer) pinned() string {
return b.pinAddr
}
func (b *simpleBalancer) hostPortError(hostPort string, err error) { return }
func getHostPort2ep(eps []string) map[string]string {
hm := make(map[string]string, len(eps))
for i := range eps {

View File

@ -55,7 +55,7 @@ type Client struct {
cfg Config
creds *credentials.TransportCredentials
balancer balancer
balancer *healthBalancer
mu sync.Mutex
ctx context.Context

View File

@ -33,7 +33,7 @@ type healthCheckFunc func(ep string) (bool, error)
// healthBalancer wraps a balancer so that it uses health checking
// to choose its endpoints.
type healthBalancer struct {
balancer
*simpleBalancer
// healthCheck checks an endpoint's health.
healthCheck healthCheckFunc
@ -59,15 +59,15 @@ type healthBalancer struct {
wg sync.WaitGroup
}
func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
func newHealthBalancer(b *simpleBalancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
hb := &healthBalancer{
balancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
hostPort2ep: getHostPort2ep(b.endpoints()),
unhealthy: make(map[string]time.Time),
stopc: make(chan struct{}),
simpleBalancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
hostPort2ep: getHostPort2ep(b.endpoints()),
unhealthy: make(map[string]time.Time),
stopc: make(chan struct{}),
}
if timeout < minHealthRetryDuration {
timeout = minHealthRetryDuration
@ -107,13 +107,13 @@ func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
if !hb.mayPin(addr) {
return func(err error) {}, false
}
return hb.balancer.up(addr)
return hb.simpleBalancer.up(addr)
}
func (hb *healthBalancer) Close() error {
hb.stopOnce.Do(func() { close(hb.stopc) })
hb.wg.Wait()
return hb.balancer.Close()
return hb.simpleBalancer.Close()
}
func (hb *healthBalancer) updateAddrs(eps ...string) {
@ -122,7 +122,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) {
hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep
hb.unhealthy = make(map[string]time.Time)
hb.mu.Unlock()
hb.balancer.updateAddrs(eps...)
hb.simpleBalancer.updateAddrs(eps...)
}
func (hb *healthBalancer) endpoint(host string) string {
@ -162,7 +162,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
for _, addr := range hb.liveAddrs() {
eps = append(eps, hb.endpoint(addr.Addr))
}
hb.balancer.updateAddrs(eps...)
hb.simpleBalancer.updateAddrs(eps...)
case <-hb.stopc:
return
}