From efd7800e0fd5b3e5342479a04a90fca8cb017954 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 11 Sep 2017 21:05:53 -0700 Subject: [PATCH] clientv3: try next endpoint point on unavailable error --- clientv3/balancer.go | 52 +++++++++++++++++++++++++++++++++++--------- clientv3/retry.go | 12 ++++++++-- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index ea9308b74..75c5cd438 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -29,6 +29,13 @@ import ( // This error is returned only when opts.BlockingWait is true. var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available") +type notifyMsg int + +const ( + notifyReset notifyMsg = iota + notifyNext +) + type balancer interface { grpc.Balancer ConnectNotify() <-chan struct{} @@ -43,6 +50,8 @@ type balancer interface { updateAddrs(endpoints ...string) // ready returns a channel that closes when the balancer first connects. ready() <-chan struct{} + // next forces the balancer to switch endpoints. + next() } // simpleBalancer does the bare minimum to expose multiple eps @@ -77,7 +86,7 @@ type simpleBalancer struct { donec chan struct{} // updateAddrsC notifies updateNotifyLoop to update addrs. - updateAddrsC chan struct{} + updateAddrsC chan notifyMsg // grpc issues TLS cert checks using the string passed into dial so // that string must be the host. To recover the full scheme://host URL, @@ -92,7 +101,7 @@ type simpleBalancer struct { } func newSimpleBalancer(eps []string) *simpleBalancer { - notifyCh := make(chan []grpc.Address, 1) + notifyCh := make(chan []grpc.Address) addrs := eps2addrs(eps) sb := &simpleBalancer{ addrs: addrs, @@ -103,7 +112,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { stopc: make(chan struct{}), downc: make(chan struct{}), donec: make(chan struct{}), - updateAddrsC: make(chan struct{}, 1), + updateAddrsC: make(chan notifyMsg), host2ep: getHost2ep(eps), } close(sb.downc) @@ -171,12 +180,27 @@ func (b *simpleBalancer) updateAddrs(eps ...string) { if update { select { - case b.updateAddrsC <- struct{}{}: + case b.updateAddrsC <- notifyReset: case <-b.stopc: } } } +func (b *simpleBalancer) next() { + b.mu.RLock() + downc := b.downc + b.mu.RUnlock() + select { + case b.updateAddrsC <- notifyNext: + case <-b.stopc: + } + // wait until disconnect so new RPCs are not issued on old connection + select { + case <-downc: + case <-b.stopc: + } +} + func hasAddr(addrs []grpc.Address, targetAddr string) bool { for _, addr := range addrs { if targetAddr == addr.Addr { @@ -213,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() { default: } case downc == nil: - b.notifyAddrs() + b.notifyAddrs(notifyReset) select { case <-upc: - case <-b.updateAddrsC: - b.notifyAddrs() + case msg := <-b.updateAddrsC: + b.notifyAddrs(msg) case <-b.stopc: return } @@ -231,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() { } select { case <-downc: - case <-b.updateAddrsC: + b.notifyAddrs(notifyReset) + case msg := <-b.updateAddrsC: + b.notifyAddrs(msg) case <-b.stopc: return } - b.notifyAddrs() } } } -func (b *simpleBalancer) notifyAddrs() { +func (b *simpleBalancer) notifyAddrs(msg notifyMsg) { + if msg == notifyNext { + select { + case b.notifyCh <- []grpc.Address{}: + case <-b.stopc: + return + } + } b.mu.RLock() addrs := b.addrs b.mu.RUnlock() diff --git a/clientv3/retry.go b/clientv3/retry.go index 272b62b92..aab2c9235 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -51,11 +51,19 @@ func isWriteStopError(err error) bool { func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { - if err := f(rpcCtx); err == nil || isStop(err) { + err := f(rpcCtx) + if err == nil { + return nil + } + notify := c.balancer.ConnectNotify() + if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + c.balancer.next() + } + if isStop(err) { return err } select { - case <-c.balancer.ConnectNotify(): + case <-notify: case <-rpcCtx.Done(): return rpcCtx.Err() case <-c.ctx.Done():