From 3be37f042ed2fc10f0fe7bd6686ee128d1485646 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 21 Apr 2017 12:40:40 -0700 Subject: [PATCH] integration: add pause/unpause to client bridge Resetting connections sometimes isn't enough; need to stop/resume accepting connections for some tests while keeping the member up. --- integration/bridge.go | 49 ++++++++++++++++++++++++++++++++++++------ integration/cluster.go | 4 +++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/integration/bridge.go b/integration/bridge.go index 6c1ec5449..09c65aa1f 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -31,8 +31,9 @@ type bridge struct { l net.Listener conns map[*bridgeConn]struct{} - stopc chan struct{} - wg sync.WaitGroup + stopc chan struct{} + pausec chan struct{} + wg sync.WaitGroup mu sync.Mutex } @@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) { inaddr: addr + "0", outaddr: addr, conns: make(map[*bridgeConn]struct{}), - stopc: make(chan struct{}, 1), + stopc: make(chan struct{}), + pausec: make(chan struct{}), } + close(b.pausec) + l, err := transport.NewUnixListener(b.inaddr) if err != nil { return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) @@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr } func (b *bridge) Close() { b.l.Close() + b.mu.Lock() select { - case b.stopc <- struct{}{}: + case <-b.stopc: default: + close(b.stopc) } + b.mu.Unlock() b.wg.Wait() } @@ -75,6 +82,22 @@ func (b *bridge) Reset() { b.conns = make(map[*bridgeConn]struct{}) } +func (b *bridge) Pause() { + b.mu.Lock() + b.pausec = make(chan struct{}) + b.mu.Unlock() +} + +func (b *bridge) Unpause() { + b.mu.Lock() + select { + case <-b.pausec: + default: + close(b.pausec) + } + b.mu.Unlock() +} + func (b *bridge) serveListen() { defer func() { b.l.Close() @@ -91,13 +114,22 @@ func (b *bridge) serveListen() { if ierr != nil { return } + b.mu.Lock() + pausec := b.pausec + b.mu.Unlock() + select { + case <-b.stopc: + return + case <-pausec: + } + outc, oerr := net.Dial("unix", b.outaddr) if oerr != nil { inc.Close() return } - bc := &bridgeConn{inc, outc} + bc := &bridgeConn{inc, outc, make(chan struct{})} b.wg.Add(1) b.mu.Lock() b.conns[bc] = struct{}{} @@ -108,6 +140,7 @@ func (b *bridge) serveListen() { func (b *bridge) serveConn(bc *bridgeConn) { defer func() { + close(bc.donec) bc.Close() b.mu.Lock() delete(b.conns, bc) @@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) { } type bridgeConn struct { - in net.Conn - out net.Conn + in net.Conn + out net.Conn + donec chan struct{} } func (bc *bridgeConn) Close() { bc.in.Close() bc.out.Close() + <-bc.donec } diff --git a/integration/cluster.go b/integration/cluster.go index 1b056f64b..4989e1f62 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -532,7 +532,9 @@ func (m *member) electionTimeout() time.Duration { return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond } -func (m *member) DropConnections() { m.grpcBridge.Reset() } +func (m *member) DropConnections() { m.grpcBridge.Reset() } +func (m *member) PauseConnections() { m.grpcBridge.Pause() } +func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) {