From 391d662f77fb8b17042d15e50ca7713bcc1ee8be Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 22 Sep 2021 15:37:23 +0200 Subject: [PATCH] tests: Remove bridge dependency on unix --- tests/integration/bridge.go | 28 ++++++++++------------------ tests/integration/cluster.go | 26 +++++++++++++++++++++----- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 1d2be109e..22040b882 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -15,20 +15,20 @@ package integration import ( - "fmt" "io" "io/ioutil" "net" "sync" - - "go.etcd.io/etcd/client/pkg/v3/transport" ) -// bridge creates a unix socket bridge to another unix socket, making it possible +type Dialer interface { + Dial() (net.Conn, error) +} + +// bridge proxies connections between listener and dialer, making it possible // to disconnect grpc network connections without closing the logical grpc connection. type bridge struct { - inaddr string - outaddr string + dialer Dialer l net.Listener conns map[*bridgeConn]struct{} @@ -40,30 +40,22 @@ type bridge struct { mu sync.Mutex } -func newBridge(addr string) (*bridge, error) { +func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - inaddr: addr + "0", - outaddr: addr, + dialer: dialer, + l: listener, conns: make(map[*bridgeConn]struct{}), stopc: make(chan struct{}), pausec: make(chan struct{}), blackholec: make(chan struct{}), } close(b.pausec) - - l, err := transport.NewUnixListener(b.inaddr) - if err != nil { - return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) - } - b.l = l b.wg.Add(1) go b.serveListen() return b, nil } -func (b *bridge) URL() string { return "unix://" + b.inaddr } - func (b *bridge) Close() { b.l.Close() b.mu.Lock() @@ -127,7 +119,7 @@ func (b *bridge) serveListen() { case <-pausec: } - outc, oerr := net.Dial("unix", b.outaddr) + outc, oerr := b.dialer.Dial() if oerr != nil { inc.Close() return diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index cbf8adacf..f5a391c5f 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -736,20 +736,36 @@ func (m *member) listenGRPC() error { if m.useIP { // for IP-only TLS certs m.grpcAddr = "127.0.0.1:" + m.Name } - l, err := transport.NewUnixListener(m.grpcAddr) + grpcListener, err := transport.NewUnixListener(m.grpcAddr) if err != nil { return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) } - m.grpcBridge, err = newBridge(m.grpcAddr) + bridgeAddr := m.grpcAddr + "0" + bridgeListener, err := transport.NewUnixListener(bridgeAddr) if err != nil { - l.Close() + grpcListener.Close() + return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcAddr, err) + } + m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcAddr}, bridgeListener) + if err != nil { + bridgeListener.Close() + grpcListener.Close() return err } - m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr - m.grpcListener = l + m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + m.grpcListener = grpcListener return nil } +type dialer struct { + network string + addr string +} + +func (d dialer) Dial() (net.Conn, error) { + return net.Dial(d.network, d.addr) +} + func (m *member) ElectionTimeout() time.Duration { return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond }