integration: add pause/unpause to client bridge

Resetting connections sometimes isn't enough; need to stop/resume
accepting connections for some tests while keeping the member up.
This commit is contained in:
Anthony Romano 2017-04-21 12:40:40 -07:00
parent 91039bef7c
commit fe1ce3a2f0
2 changed files with 45 additions and 8 deletions

View File

@ -31,8 +31,9 @@ type bridge struct {
l net.Listener l net.Listener
conns map[*bridgeConn]struct{} conns map[*bridgeConn]struct{}
stopc chan struct{} stopc chan struct{}
wg sync.WaitGroup pausec chan struct{}
wg sync.WaitGroup
mu sync.Mutex mu sync.Mutex
} }
@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
inaddr: addr + "0", inaddr: addr + "0",
outaddr: addr, outaddr: addr,
conns: make(map[*bridgeConn]struct{}), conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}, 1), stopc: make(chan struct{}),
pausec: make(chan struct{}),
} }
close(b.pausec)
l, err := transport.NewUnixListener(b.inaddr) l, err := transport.NewUnixListener(b.inaddr)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }
func (b *bridge) Close() { func (b *bridge) Close() {
b.l.Close() b.l.Close()
b.mu.Lock()
select { select {
case b.stopc <- struct{}{}: case <-b.stopc:
default: default:
close(b.stopc)
} }
b.mu.Unlock()
b.wg.Wait() b.wg.Wait()
} }
@ -75,6 +82,22 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{}) b.conns = make(map[*bridgeConn]struct{})
} }
func (b *bridge) Pause() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}
func (b *bridge) Unpause() {
b.mu.Lock()
select {
case <-b.pausec:
default:
close(b.pausec)
}
b.mu.Unlock()
}
func (b *bridge) serveListen() { func (b *bridge) serveListen() {
defer func() { defer func() {
b.l.Close() b.l.Close()
@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
if ierr != nil { if ierr != nil {
return return
} }
b.mu.Lock()
pausec := b.pausec
b.mu.Unlock()
select {
case <-b.stopc:
return
case <-pausec:
}
outc, oerr := net.Dial("unix", b.outaddr) outc, oerr := net.Dial("unix", b.outaddr)
if oerr != nil { if oerr != nil {
inc.Close() inc.Close()
return return
} }
bc := &bridgeConn{inc, outc} bc := &bridgeConn{inc, outc, make(chan struct{})}
b.wg.Add(1) b.wg.Add(1)
b.mu.Lock() b.mu.Lock()
b.conns[bc] = struct{}{} b.conns[bc] = struct{}{}
@ -108,6 +140,7 @@ func (b *bridge) serveListen() {
func (b *bridge) serveConn(bc *bridgeConn) { func (b *bridge) serveConn(bc *bridgeConn) {
defer func() { defer func() {
close(bc.donec)
bc.Close() bc.Close()
b.mu.Lock() b.mu.Lock()
delete(b.conns, bc) delete(b.conns, bc)
@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) {
} }
type bridgeConn struct { type bridgeConn struct {
in net.Conn in net.Conn
out net.Conn out net.Conn
donec chan struct{}
} }
func (bc *bridgeConn) Close() { func (bc *bridgeConn) Close() {
bc.in.Close() bc.in.Close()
bc.out.Close() bc.out.Close()
<-bc.donec
} }

View File

@ -556,7 +556,9 @@ func (m *member) electionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
} }
func (m *member) DropConnections() { m.grpcBridge.Reset() } func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
// NewClientV3 creates a new grpc client connection to the member // NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) { func NewClientV3(m *member) (*clientv3.Client, error) {