From 43e5f892f6f0a14e9a53b64cf77baa93f42a7a90 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 28 Apr 2017 16:57:22 -0700 Subject: [PATCH] clientv3: don't race on upc/downc/switch endpoints in balancer If the balancer update notification loop starts with a downed connection and endpoints are switched while the old connection is up, the balancer can potentially wait forever for an up connection without refreshing the connections to reflect the current endpoints. Instead, fetch upc/downc together, only caring about a single transition either from down->up or up->down for each iteration Simple way to reproduce failures: add time.Sleep(time.Second) to the beginning of the update notification loop. --- clientv3/balancer.go | 90 +++++++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 39 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b7767221a..6ae047e98 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -77,7 +77,6 @@ func newSimpleBalancer(eps []string) *simpleBalancer { for i := range eps { addrs[i].Addr = getHost(eps[i]) } - notifyCh <- addrs sb := &simpleBalancer{ addrs: addrs, notifyCh: notifyCh, @@ -89,6 +88,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { updateAddrsC: make(chan struct{}, 1), host2ep: getHost2ep(eps), } + close(sb.downc) go sb.updateNotifyLoop() return sb } @@ -170,38 +170,51 @@ func (b *simpleBalancer) updateNotifyLoop() { for { b.mu.RLock() - upc := b.upc + upc, downc, addr := b.upc, b.downc, b.pinAddr b.mu.RUnlock() - var downc chan struct{} + // downc or upc should be closed + select { + case <-downc: + downc = nil + default: + } select { case <-upc: - var addr string - b.mu.RLock() - addr = b.pinAddr - // Up() sets pinAddr and downc as a pair under b.mu - downc = b.downc - b.mu.RUnlock() - if addr == "" { - break - } - // close opened connections that are not pinAddr - // this ensures only one connection is open per client + upc = nil + default: + } + switch { + case downc == nil && upc == nil: + // stale select { - case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-b.stopc: + return + default: + } + case downc == nil: + b.notifyAddrs() + select { + case <-upc: + case <-b.updateAddrsC: + b.notifyAddrs() + case <-b.stopc: + return + } + case upc == nil: + select { + // close connections that are not the pinned address + case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-downc: + case <-b.stopc: + return + } + select { + case <-downc: + case <-b.updateAddrsC: case <-b.stopc: return } - case <-b.updateAddrsC: b.notifyAddrs() - continue - } - select { - case <-downc: - b.notifyAddrs() - case <-b.updateAddrsC: - b.notifyAddrs() - case <-b.stopc: - return } } } @@ -231,23 +244,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } - - if b.pinAddr == "" { - // notify waiting Get()s and pin first connected address - close(b.upc) - b.downc = make(chan struct{}) - b.pinAddr = addr.Addr - // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) + if b.pinAddr != "" { + return func(err error) {} } - + // notify waiting Get()s and pin first connected address + close(b.upc) + b.downc = make(chan struct{}) + b.pinAddr = addr.Addr + // notify client that a connection is up + b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() - if b.pinAddr == addr.Addr { - b.upc = make(chan struct{}) - close(b.downc) - b.pinAddr = "" - } + b.upc = make(chan struct{}) + close(b.downc) + b.pinAddr = "" b.mu.Unlock() } } @@ -280,6 +290,8 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) b.mu.RUnlock() select { case <-ch: + case <-b.donec: + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing case <-ctx.Done(): return grpc.Address{Addr: ""}, nil, ctx.Err() }