From 8baaa06cce9e58548f60e6e9b21c9af3d42580bb Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 8 Mar 2017 13:39:11 -0800 Subject: [PATCH] clientv3: serialize updating notifych in balancer FIXES #7283 --- clientv3/balancer.go | 75 ++++++++++++++++++++++++++++--- clientv3/balancer_test.go | 94 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 6 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 87137d4d5..ab11e4ad6 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -47,6 +47,15 @@ type simpleBalancer struct { // upc closes when upEps transitions from empty to non-zero or the balancer closes. upc chan struct{} + // downc closes when grpc calls down() on pinAddr + downc chan struct{} + + // stopc is closed to signal updateNotifyLoop should stop. + stopc chan struct{} + + // donec closes when all goroutines are exited + donec chan struct{} + // grpc issues TLS cert checks using the string passed into dial so // that string must be the host. To recover the full scheme://host URL, // have a map from hosts to the original endpoint. @@ -71,8 +80,12 @@ func newSimpleBalancer(eps []string) *simpleBalancer { notifyCh: notifyCh, readyc: make(chan struct{}), upc: make(chan struct{}), + stopc: make(chan struct{}), + downc: make(chan struct{}), + donec: make(chan struct{}), host2ep: getHost2ep(eps), } + go sb.updateNotifyLoop() return sb } @@ -131,6 +144,50 @@ func (b *simpleBalancer) updateAddrs(eps []string) { } } +func (b *simpleBalancer) updateNotifyLoop() { + defer close(b.donec) + + for { + b.mu.RLock() + upc := b.upc + b.mu.RUnlock() + var downc chan struct{} + select { + case <-upc: + var addr string + b.mu.RLock() + addr = b.pinAddr + // Up() sets pinAddr and downc as a pair under b.mu + downc = b.downc + b.mu.RUnlock() + if addr == "" { + break + } + // close opened connections that are not pinAddr + // this ensures only one connection is open per client + select { + case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-b.stopc: + return + } + } + select { + case <-downc: + b.mu.RLock() + addrs := b.addrs + b.mu.RUnlock() + select { + case b.notifyCh <- addrs: + case <-b.stopc: + return + } + case <-b.stopc: + return + } + + } +} + func (b *simpleBalancer) Up(addr grpc.Address) func(error) { b.mu.Lock() defer b.mu.Unlock() @@ -145,20 +202,18 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if b.pinAddr == "" { // notify waiting Get()s and pin first connected address close(b.upc) + b.downc = make(chan struct{}) b.pinAddr = addr.Addr // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) - // close opened connections that are not pinAddr - // this ensures only one connection is open per client - b.notifyCh <- []grpc.Address{addr} } return func(err error) { b.mu.Lock() if b.pinAddr == addr.Addr { b.upc = make(chan struct{}) + close(b.downc) b.pinAddr = "" - b.notifyCh <- b.addrs } b.mu.Unlock() } @@ -214,14 +269,15 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } func (b *simpleBalancer) Close() error { b.mu.Lock() - defer b.mu.Unlock() // In case gRPC calls close twice. TODO: remove the checking // when we are sure that gRPC wont call close twice. if b.closed { + b.mu.Unlock() + <-b.donec return nil } b.closed = true - close(b.notifyCh) + close(b.stopc) b.pinAddr = "" // In the case of following scenario: @@ -236,6 +292,13 @@ func (b *simpleBalancer) Close() error { // terminate all waiting Get()s close(b.upc) } + + b.mu.Unlock() + + // wait for updateNotifyLoop to finish + <-b.donec + close(b.notifyCh) + return nil } diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go index fd3cfed0d..c6deff0d8 100644 --- a/clientv3/balancer_test.go +++ b/clientv3/balancer_test.go @@ -16,9 +16,14 @@ package clientv3 import ( "errors" + "net" + "sync" "testing" "time" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -124,3 +129,92 @@ func TestBalancerGetBlocking(t *testing.T) { t.Errorf("Get() with no up endpoints should timeout, got %v", err) } } + +// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other +// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever. +// See issue: https://github.com/coreos/etcd/issues/7283 for more detail. +func TestBalancerDoNotBlockOnClose(t *testing.T) { + defer testutil.AfterTest(t) + + kcl := newKillConnListener(t, 3) + defer kcl.close() + + for i := 0; i < 5; i++ { + sb := newSimpleBalancer(kcl.endpoints()) + conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(sb)) + if err != nil { + t.Fatal(err) + } + kvc := pb.NewKVClient(conn) + <-sb.readyc + for j := 0; j < 100; j++ { + go kvc.Range(context.TODO(), &pb.RangeRequest{}, grpc.FailFast(false)) + } + // balancer.Close() might block + // if balancer and grpc deadlock each other. + closec := make(chan struct{}) + go func() { + defer close(closec) + sb.Close() + }() + go conn.Close() + select { + case <-closec: + case <-time.After(3 * time.Second): + testutil.FatalStack(t, "balancer close timeout") + } + } +} + +// killConnListener listens incoming conn and kills it immediately. +type killConnListener struct { + wg sync.WaitGroup + eps []string + stopc chan struct{} + t *testing.T +} + +func newKillConnListener(t *testing.T, size int) *killConnListener { + kcl := &killConnListener{stopc: make(chan struct{}), t: t} + + for i := 0; i < size; i++ { + ln, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + kcl.eps = append(kcl.eps, ln.Addr().String()) + kcl.wg.Add(1) + go kcl.listen(ln) + } + return kcl +} + +func (kcl *killConnListener) endpoints() []string { + return kcl.eps +} + +func (kcl *killConnListener) listen(l net.Listener) { + go func() { + defer kcl.wg.Done() + for { + conn, err := l.Accept() + select { + case <-kcl.stopc: + return + default: + } + if err != nil { + kcl.t.Fatal(err) + } + time.Sleep(1 * time.Millisecond) + conn.Close() + } + }() + <-kcl.stopc + l.Close() +} + +func (kcl *killConnListener) close() { + close(kcl.stopc) + kcl.wg.Wait() +}