From 741d7e9dcaee8e01cc7b949a9e771da5c34de1a2 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 25 Aug 2017 13:30:14 -0700 Subject: [PATCH] integration: add Blackhole to bridgeConn Signed-off-by: Gyu-Ho Lee --- integration/bridge.go | 67 +++++++++++++++++++++++++++++++++++------- integration/cluster.go | 2 ++ 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/integration/bridge.go b/integration/bridge.go index b9e67318e..59cebe1f0 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -17,6 +17,7 @@ package integration import ( "fmt" "io" + "io/ioutil" "net" "sync" @@ -31,9 +32,10 @@ type bridge struct { l net.Listener conns map[*bridgeConn]struct{} - stopc chan struct{} - pausec chan struct{} - wg sync.WaitGroup + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup mu sync.Mutex } @@ -41,11 +43,12 @@ type bridge struct { func newBridge(addr string) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - inaddr: addr + "0", - outaddr: addr, - conns: make(map[*bridgeConn]struct{}), - stopc: make(chan struct{}), - pausec: make(chan struct{}), + inaddr: addr + "0", + outaddr: addr, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), } close(b.pausec) @@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) { var wg sync.WaitGroup wg.Add(2) go func() { - io.Copy(bc.out, bc.in) + b.ioCopy(bc, bc.out, bc.in) bc.close() wg.Done() }() go func() { - io.Copy(bc.in, bc.out) + b.ioCopy(bc, bc.in, bc.out) bc.close() wg.Done() }() @@ -179,3 +182,47 @@ func (bc *bridgeConn) close() { bc.in.Close() bc.out.Close() } + +func (b *bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +func (b *bridge) Unblackhole() { + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-b.blackholec: + io.Copy(ioutil.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return +} diff --git a/integration/cluster.go b/integration/cluster.go index ec9a9f493..a164480d0 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -588,6 +588,8 @@ func (m *member) ID() types.ID { return m.s.ID() } func (m *member) DropConnections() { m.grpcBridge.Reset() } func (m *member) PauseConnections() { m.grpcBridge.Pause() } func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } +func (m *member) Blackhole() { m.grpcBridge.Blackhole() } +func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) {