From 391d662f77fb8b17042d15e50ca7713bcc1ee8be Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 22 Sep 2021 15:37:23 +0200 Subject: [PATCH 01/10] tests: Remove bridge dependency on unix --- tests/integration/bridge.go | 28 ++++++++++------------------ tests/integration/cluster.go | 26 +++++++++++++++++++++----- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 1d2be109e..22040b882 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -15,20 +15,20 @@ package integration import ( - "fmt" "io" "io/ioutil" "net" "sync" - - "go.etcd.io/etcd/client/pkg/v3/transport" ) -// bridge creates a unix socket bridge to another unix socket, making it possible +type Dialer interface { + Dial() (net.Conn, error) +} + +// bridge proxies connections between listener and dialer, making it possible // to disconnect grpc network connections without closing the logical grpc connection. type bridge struct { - inaddr string - outaddr string + dialer Dialer l net.Listener conns map[*bridgeConn]struct{} @@ -40,30 +40,22 @@ type bridge struct { mu sync.Mutex } -func newBridge(addr string) (*bridge, error) { +func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - inaddr: addr + "0", - outaddr: addr, + dialer: dialer, + l: listener, conns: make(map[*bridgeConn]struct{}), stopc: make(chan struct{}), pausec: make(chan struct{}), blackholec: make(chan struct{}), } close(b.pausec) - - l, err := transport.NewUnixListener(b.inaddr) - if err != nil { - return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) - } - b.l = l b.wg.Add(1) go b.serveListen() return b, nil } -func (b *bridge) URL() string { return "unix://" + b.inaddr } - func (b *bridge) Close() { b.l.Close() b.mu.Lock() @@ -127,7 +119,7 @@ func (b *bridge) serveListen() { case <-pausec: } - outc, oerr := net.Dial("unix", b.outaddr) + outc, oerr := b.dialer.Dial() if oerr != nil { inc.Close() return diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index cbf8adacf..f5a391c5f 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -736,20 +736,36 @@ func (m *member) listenGRPC() error { if m.useIP { // for IP-only TLS certs m.grpcAddr = "127.0.0.1:" + m.Name } - l, err := transport.NewUnixListener(m.grpcAddr) + grpcListener, err := transport.NewUnixListener(m.grpcAddr) if err != nil { return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) } - m.grpcBridge, err = newBridge(m.grpcAddr) + bridgeAddr := m.grpcAddr + "0" + bridgeListener, err := transport.NewUnixListener(bridgeAddr) if err != nil { - l.Close() + grpcListener.Close() + return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcAddr, err) + } + m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcAddr}, bridgeListener) + if err != nil { + bridgeListener.Close() + grpcListener.Close() return err } - m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr - m.grpcListener = l + m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + m.grpcListener = grpcListener return nil } +type dialer struct { + network string + addr string +} + +func (d dialer) Dial() (net.Conn, error) { + return net.Dial(d.network, d.addr) +} + func (m *member) ElectionTimeout() time.Duration { return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond } From f2dd5d80a19dc5772b9c5ab84ca8b3d1dc75f67a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 22 Sep 2021 15:48:13 +0200 Subject: [PATCH 02/10] tests: Rename grpcAddr to grpcURL to imply that it includes schema --- .../clientv3/connectivity/black_hole_test.go | 4 +- .../clientv3/connectivity/dial_test.go | 12 ++--- .../connectivity/network_partition_test.go | 10 ++--- .../connectivity/server_shutdown_test.go | 10 ++--- tests/integration/clientv3/kv_test.go | 6 +-- .../integration/clientv3/maintenance_test.go | 4 +- tests/integration/clientv3/metrics_test.go | 2 +- .../integration/clientv3/ordering_kv_test.go | 16 +++---- .../clientv3/ordering_util_test.go | 24 +++++----- tests/integration/cluster.go | 44 +++++++++---------- .../proxy/grpcproxy/cluster_test.go | 2 +- tests/integration/proxy/grpcproxy/kv_test.go | 2 +- .../proxy/grpcproxy/register_test.go | 2 +- tests/integration/v3_grpc_test.go | 4 +- tests/integration/v3_tls_test.go | 2 +- 15 files changed, 72 insertions(+), 72 deletions(-) diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index ff56bbd09..a4415322f 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -41,7 +41,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()} ccfg := clientv3.Config{ Endpoints: []string{eps[0]}, @@ -173,7 +173,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()} ccfg := clientv3.Config{ Endpoints: []string{eps[0]}, diff --git a/tests/integration/clientv3/connectivity/dial_test.go b/tests/integration/clientv3/connectivity/dial_test.go index f02ea61aa..52dcca69e 100644 --- a/tests/integration/clientv3/connectivity/dial_test.go +++ b/tests/integration/clientv3/connectivity/dial_test.go @@ -57,7 +57,7 @@ func TestDialTLSExpired(t *testing.T) { } // expect remote errors "tls: bad certificate" _, err = integration.NewClient(t, clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialTimeout: 3 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, TLS: tls, @@ -75,7 +75,7 @@ func TestDialTLSNoConfig(t *testing.T) { defer clus.Terminate(t) // expect "signed by unknown authority" c, err := integration.NewClient(t, clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) @@ -108,7 +108,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) { // get endpoint list eps := make([]string, 3) for i := range eps { - eps[i] = clus.Members[i].GRPCAddr() + eps[i] = clus.Members[i].GRPCURL() } toKill := rand.Intn(len(eps)) @@ -149,7 +149,7 @@ func TestSwitchSetEndpoints(t *testing.T) { defer clus.Terminate(t) // get non partitioned members endpoints - eps := []string{clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} cli := clus.Client(0) clus.Members[0].InjectPartition(t, clus.Members[1:]...) @@ -170,7 +170,7 @@ func TestRejectOldCluster(t *testing.T) { defer clus.Terminate(t) cfg := clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, RejectOldCluster: true, @@ -212,7 +212,7 @@ func TestSetEndpointAndPut(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) defer clus.Terminate(t) - clus.Client(1).SetEndpoints(clus.Members[0].GRPCAddr()) + clus.Client(1).SetEndpoints(clus.Members[0].GRPCURL()) _, err := clus.Client(1).Put(context.TODO(), "foo", "bar") if err != nil && !strings.Contains(err.Error(), "closing") { t.Fatal(err) diff --git a/tests/integration/clientv3/connectivity/network_partition_test.go b/tests/integration/clientv3/connectivity/network_partition_test.go index 3db643e42..c2650ebcd 100644 --- a/tests/integration/clientv3/connectivity/network_partition_test.go +++ b/tests/integration/clientv3/connectivity/network_partition_test.go @@ -111,7 +111,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} // expect pin eps[0] ccfg := clientv3.Config{ @@ -166,7 +166,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T SkipCreatingClient: true, }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} lead := clus.WaitLeader(t) @@ -222,7 +222,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} target := clus.WaitLeader(t) if !isolateLeader { @@ -283,7 +283,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { defer clus.Terminate(t) leaderIndex := clus.WaitLeader(t) // get a follower endpoint - eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()} + eps := []string{clus.Members[(leaderIndex+1)%3].GRPCURL()} ccfg := clientv3.Config{ Endpoints: eps, DialTimeout: 10 * time.Second, @@ -301,7 +301,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { // add other endpoints for later endpoint switch cli.SetEndpoints(eps...) time.Sleep(time.Second * 2) - conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr()) + conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCURL()) if err != nil { t.Fatal(err) } diff --git a/tests/integration/clientv3/connectivity/server_shutdown_test.go b/tests/integration/clientv3/connectivity/server_shutdown_test.go index 8ab90cbc5..e7660852b 100644 --- a/tests/integration/clientv3/connectivity/server_shutdown_test.go +++ b/tests/integration/clientv3/connectivity/server_shutdown_test.go @@ -38,7 +38,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) { }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} lead := clus.WaitLeader(t) @@ -150,7 +150,7 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} // pin eps[0] cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}}) @@ -208,7 +208,7 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl }) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} // pin eps[0] cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}}) @@ -285,9 +285,9 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl clus := integration.NewClusterV3(t, cfg) defer clus.Terminate(t) - eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()} if linearizable { - eps = append(eps, clus.Members[2].GRPCAddr()) + eps = append(eps, clus.Members[2].GRPCURL()) } lead := clus.WaitLeader(t) diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index fcef1a871..ac694aa21 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -1027,7 +1027,7 @@ func TestKVForLearner(t *testing.T) { // 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members // 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config, // because the implementation of integration test has diverged from embed/etcd.go. - learnerEp := clus.Members[3].GRPCAddr() + learnerEp := clus.Members[3].GRPCURL() cfg := clientv3.Config{ Endpoints: []string{learnerEp}, DialTimeout: 5 * time.Second, @@ -1100,7 +1100,7 @@ func TestBalancerSupportLearner(t *testing.T) { } // clus.Members[3] is the newly added learner member, which was appended to clus.Members - learnerEp := clus.Members[3].GRPCAddr() + learnerEp := clus.Members[3].GRPCURL() cfg := clientv3.Config{ Endpoints: []string{learnerEp}, DialTimeout: 5 * time.Second, @@ -1120,7 +1120,7 @@ func TestBalancerSupportLearner(t *testing.T) { } t.Logf("Expected: Read from learner error: %v", err) - eps := []string{learnerEp, clus.Members[0].GRPCAddr()} + eps := []string{learnerEp, clus.Members[0].GRPCURL()} cli.SetEndpoints(eps...) if _, err := cli.Get(context.Background(), "foo"); err != nil { t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err) diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 4bd137d8f..965599583 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -56,7 +56,7 @@ func TestMaintenanceHashKV(t *testing.T) { if _, err := cli.Get(context.TODO(), "foo"); err != nil { t.Fatal(err) } - hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0) + hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCURL(), 0) if err != nil { t.Fatal(err) } @@ -279,7 +279,7 @@ func TestMaintenanceStatus(t *testing.T) { eps := make([]string, 3) for i := 0; i < 3; i++ { - eps[i] = clus.Members[i].GRPCAddr() + eps[i] = clus.Members[i].GRPCURL() } cli, err := integration.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) diff --git a/tests/integration/clientv3/metrics_test.go b/tests/integration/clientv3/metrics_test.go index 494923d3c..4e2202cee 100644 --- a/tests/integration/clientv3/metrics_test.go +++ b/tests/integration/clientv3/metrics_test.go @@ -75,7 +75,7 @@ func TestV3ClientMetrics(t *testing.T) { defer clus.Terminate(t) cfg := clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialOptions: []grpc.DialOption{ grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index b1f4f54ef..fdce92495 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -35,9 +35,9 @@ func TestDetectKvOrderViolation(t *testing.T) { cfg := clientv3.Config{ Endpoints: []string{ - clus.Members[0].GRPCAddr(), - clus.Members[1].GRPCAddr(), - clus.Members[2].GRPCAddr(), + clus.Members[0].GRPCURL(), + clus.Members[1].GRPCURL(), + clus.Members[2].GRPCURL(), }, } cli, err := integration.NewClient(t, cfg) @@ -82,7 +82,7 @@ func TestDetectKvOrderViolation(t *testing.T) { clus.Members[1].Stop(t) assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member - cli.SetEndpoints(clus.Members[2].GRPCAddr()) + cli.SetEndpoints(clus.Members[2].GRPCURL()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed t.Logf("Quering m2 after restart") @@ -102,9 +102,9 @@ func TestDetectTxnOrderViolation(t *testing.T) { cfg := clientv3.Config{ Endpoints: []string{ - clus.Members[0].GRPCAddr(), - clus.Members[1].GRPCAddr(), - clus.Members[2].GRPCAddr(), + clus.Members[0].GRPCURL(), + clus.Members[1].GRPCURL(), + clus.Members[2].GRPCURL(), }, } cli, err := integration.NewClient(t, cfg) @@ -151,7 +151,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { clus.Members[1].Stop(t) assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member - cli.SetEndpoints(clus.Members[2].GRPCAddr()) + cli.SetEndpoints(clus.Members[2].GRPCURL()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != errOrderViolation { diff --git a/tests/integration/clientv3/ordering_util_test.go b/tests/integration/clientv3/ordering_util_test.go index db3fddd99..85c61f407 100644 --- a/tests/integration/clientv3/ordering_util_test.go +++ b/tests/integration/clientv3/ordering_util_test.go @@ -29,11 +29,11 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) eps := []string{ - clus.Members[0].GRPCAddr(), - clus.Members[1].GRPCAddr(), - clus.Members[2].GRPCAddr(), + clus.Members[0].GRPCURL(), + clus.Members[1].GRPCURL(), + clus.Members[2].GRPCURL(), } - cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}} + cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}} cli, err := integration.NewClient(t, cfg) if err != nil { t.Fatal(err) @@ -71,7 +71,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { } t.Logf("Reconfigure client to speak only to the 'partitioned' member") - cli.SetEndpoints(clus.Members[2].GRPCAddr()) + cli.SetEndpoints(clus.Members[2].GRPCURL()) _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != ordering.ErrNoGreaterRev { t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error") @@ -84,11 +84,11 @@ func TestUnresolvableOrderViolation(t *testing.T) { defer clus.Terminate(t) cfg := clientv3.Config{ Endpoints: []string{ - clus.Members[0].GRPCAddr(), - clus.Members[1].GRPCAddr(), - clus.Members[2].GRPCAddr(), - clus.Members[3].GRPCAddr(), - clus.Members[4].GRPCAddr(), + clus.Members[0].GRPCURL(), + clus.Members[1].GRPCURL(), + clus.Members[2].GRPCURL(), + clus.Members[3].GRPCURL(), + clus.Members[4].GRPCURL(), }, } cli, err := integration.NewClient(t, cfg) @@ -99,7 +99,7 @@ func TestUnresolvableOrderViolation(t *testing.T) { eps := cli.Endpoints() ctx := context.TODO() - cli.SetEndpoints(clus.Members[0].GRPCAddr()) + cli.SetEndpoints(clus.Members[0].GRPCURL()) time.Sleep(1 * time.Second) _, err = cli.Put(ctx, "foo", "bar") if err != nil { @@ -139,7 +139,7 @@ func TestUnresolvableOrderViolation(t *testing.T) { t.Fatal(err) } clus.Members[3].WaitStarted(t) - cli.SetEndpoints(clus.Members[3].GRPCAddr()) + cli.SetEndpoints(clus.Members[3].GRPCURL()) _, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != ordering.ErrNoGreaterRev { diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index f5a391c5f..c854030a6 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -249,7 +249,7 @@ func (c *cluster) Launch(t testutil.TB) { c.waitMembersMatch(t, c.HTTPMembers()) c.waitVersion() for _, m := range c.Members { - t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCAddr()) + t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCURL()) } } @@ -572,7 +572,7 @@ type member struct { grpcServerOpts []grpc.ServerOption grpcServer *grpc.Server grpcServerPeer *grpc.Server - grpcAddr string + grpcURL string grpcBridge *bridge // serverClient is a clientv3 that directly calls the etcdserver. @@ -587,7 +587,7 @@ type member struct { closed bool } -func (m *member) GRPCAddr() string { return m.grpcAddr } +func (m *member) GRPCURL() string { return m.grpcURL } type memberConfig struct { name string @@ -731,28 +731,28 @@ func memberLogger(t testutil.TB, name string) *zap.Logger { // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain - m.grpcAddr = "localhost:" + m.Name - m.Logger.Info("LISTEN GRPC", zap.String("m.grpcAddr", m.grpcAddr), zap.String("m.Name", m.Name)) + m.grpcURL = "localhost:" + m.Name + m.Logger.Info("LISTEN GRPC", zap.String("m.grpcURL", m.grpcURL), zap.String("m.Name", m.Name)) if m.useIP { // for IP-only TLS certs - m.grpcAddr = "127.0.0.1:" + m.Name + m.grpcURL = "127.0.0.1:" + m.Name } - grpcListener, err := transport.NewUnixListener(m.grpcAddr) + grpcListener, err := transport.NewUnixListener(m.grpcURL) if err != nil { - return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) + return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcURL, err) } - bridgeAddr := m.grpcAddr + "0" + bridgeAddr := m.grpcURL + "0" bridgeListener, err := transport.NewUnixListener(bridgeAddr) if err != nil { grpcListener.Close() - return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcAddr, err) + return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcURL, err) } - m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcAddr}, bridgeListener) + m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcURL}, bridgeListener) if err != nil { bridgeListener.Close() grpcListener.Close() return err } - m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr m.grpcListener = grpcListener return nil } @@ -780,12 +780,12 @@ func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) { - if m.grpcAddr == "" { + if m.grpcURL == "" { return nil, fmt.Errorf("member not configured for grpc") } cfg := clientv3.Config{ - Endpoints: []string{m.grpcAddr}, + Endpoints: []string{m.grpcURL}, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, MaxCallSendMsgSize: m.clientMaxCallSendMsgSize, @@ -847,7 +847,7 @@ func (m *member) Launch() error { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) var err error if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil { @@ -1004,7 +1004,7 @@ func (m *member) Launch() error { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) return nil } @@ -1117,7 +1117,7 @@ func (m *member) Stop(_ testutil.TB) { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) m.Close() m.serverClosers = nil @@ -1126,7 +1126,7 @@ func (m *member) Stop(_ testutil.TB) { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) } @@ -1151,7 +1151,7 @@ func (m *member) Restart(t testutil.TB) error { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) newPeerListeners := make([]net.Listener, 0) for _, ln := range m.PeerListeners { @@ -1176,7 +1176,7 @@ func (m *member) Restart(t testutil.TB) error { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), zap.Error(err), ) return err @@ -1189,7 +1189,7 @@ func (m *member) Terminate(t testutil.TB) { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) m.Close() if !m.keepDataDirTerminate { @@ -1202,7 +1202,7 @@ func (m *member) Terminate(t testutil.TB) { zap.String("name", m.Name), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), - zap.String("grpc-address", m.grpcAddr), + zap.String("grpc-url", m.grpcURL), ) } diff --git a/tests/integration/proxy/grpcproxy/cluster_test.go b/tests/integration/proxy/grpcproxy/cluster_test.go index 5be35c232..162956444 100644 --- a/tests/integration/proxy/grpcproxy/cluster_test.go +++ b/tests/integration/proxy/grpcproxy/cluster_test.go @@ -36,7 +36,7 @@ func TestClusterProxyMemberList(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCAddr()}, t) + cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t) defer cts.close(t) cfg := clientv3.Config{ diff --git a/tests/integration/proxy/grpcproxy/kv_test.go b/tests/integration/proxy/grpcproxy/kv_test.go index 1ff106e4a..4f9ee8d25 100644 --- a/tests/integration/proxy/grpcproxy/kv_test.go +++ b/tests/integration/proxy/grpcproxy/kv_test.go @@ -34,7 +34,7 @@ func TestKVProxyRange(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - kvts := newKVProxyServer([]string{clus.Members[0].GRPCAddr()}, t) + kvts := newKVProxyServer([]string{clus.Members[0].GRPCURL()}, t) defer kvts.close() // create a client and try to get key from proxy. diff --git a/tests/integration/proxy/grpcproxy/register_test.go b/tests/integration/proxy/grpcproxy/register_test.go index 4fbe08e08..d57d01a87 100644 --- a/tests/integration/proxy/grpcproxy/register_test.go +++ b/tests/integration/proxy/grpcproxy/register_test.go @@ -31,7 +31,7 @@ func TestRegister(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) cli := clus.Client(0) - paddr := clus.Members[0].GRPCAddr() + paddr := clus.Members[0].GRPCURL() testPrefix := "test-name" wa := mustCreateWatcher(t, cli, testPrefix) diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index 298ee9428..e54ba26df 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -1784,7 +1784,7 @@ func testTLSReload( } cli, cerr := NewClient(t, clientv3.Config{ DialOptions: []grpc.DialOption{grpc.WithBlock()}, - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialTimeout: time.Second, TLS: cc, }) @@ -1818,7 +1818,7 @@ func testTLSReload( t.Fatal(terr) } cl, cerr := NewClient(t, clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialTimeout: 5 * time.Second, TLS: tls, }) diff --git a/tests/integration/v3_tls_test.go b/tests/integration/v3_tls_test.go index 4d77bee13..2437df94e 100644 --- a/tests/integration/v3_tls_test.go +++ b/tests/integration/v3_tls_test.go @@ -55,7 +55,7 @@ func testTLSCipherSuites(t *testing.T, valid bool) { t.Fatal(err) } cli, cerr := NewClient(t, clientv3.Config{ - Endpoints: []string{clus.Members[0].GRPCAddr()}, + Endpoints: []string{clus.Members[0].GRPCURL()}, DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, TLS: cc, From 451eb5d711f1a7ba1f41f32db738ab6d3e301dd8 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 Sep 2021 13:55:34 +0200 Subject: [PATCH 03/10] tests: Make using bridge optional --- tests/integration/bridge.go | 8 +-- .../clientv3/connectivity/black_hole_test.go | 2 + .../connectivity/server_shutdown_test.go | 2 + tests/integration/clientv3/kv_test.go | 6 +- .../integration/clientv3/lease/lease_test.go | 16 ++--- .../clientv3/lease/leasing_test.go | 40 ++++++------ .../integration/clientv3/maintenance_test.go | 2 +- .../integration/clientv3/ordering_kv_test.go | 4 +- .../clientv3/ordering_util_test.go | 2 +- tests/integration/clientv3/txn_test.go | 4 +- tests/integration/clientv3/watch_test.go | 12 ++-- tests/integration/cluster.go | 65 +++++++++++++------ tests/integration/cluster_test.go | 14 ++-- tests/integration/member_test.go | 4 +- tests/integration/v3_alarm_test.go | 4 +- tests/integration/v3_grpc_inflight_test.go | 2 +- tests/integration/v3_grpc_test.go | 8 +-- tests/integration/v3_lease_test.go | 11 ++-- tests/integration/v3_watch_test.go | 2 +- 19 files changed, 119 insertions(+), 89 deletions(-) diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 22040b882..67d6ae447 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -29,8 +29,8 @@ type Dialer interface { // to disconnect grpc network connections without closing the logical grpc connection. type bridge struct { dialer Dialer - l net.Listener - conns map[*bridgeConn]struct{} + l net.Listener + conns map[*bridgeConn]struct{} stopc chan struct{} pausec chan struct{} @@ -43,8 +43,8 @@ type bridge struct { func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - dialer: dialer, - l: listener, + dialer: dialer, + l: listener, conns: make(map[*bridgeConn]struct{}), stopc: make(chan struct{}), pausec: make(chan struct{}), diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index a4415322f..c9a199558 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -38,6 +38,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 2, GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings + UseBridge: true, }) defer clus.Terminate(t) @@ -170,6 +171,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 2, SkipCreatingClient: true, + UseBridge: true, }) defer clus.Terminate(t) diff --git a/tests/integration/clientv3/connectivity/server_shutdown_test.go b/tests/integration/clientv3/connectivity/server_shutdown_test.go index e7660852b..5b888e6fe 100644 --- a/tests/integration/clientv3/connectivity/server_shutdown_test.go +++ b/tests/integration/clientv3/connectivity/server_shutdown_test.go @@ -35,6 +35,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 3, SkipCreatingClient: true, + UseBridge: true, }) defer clus.Terminate(t) @@ -278,6 +279,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl cfg := &integration.ClusterConfig{ Size: 2, SkipCreatingClient: true, + UseBridge: true, } if linearizable { cfg.Size = 3 diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index ac694aa21..b45240225 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -712,7 +712,7 @@ func TestKVGetRetry(t *testing.T) { integration.BeforeTest(t) clusterSize := 3 - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize, UseBridge: true}) defer clus.Terminate(t) // because killing leader and following election @@ -765,7 +765,7 @@ func TestKVGetRetry(t *testing.T) { func TestKVPutFailGetRetry(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) @@ -876,7 +876,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // in the presence of network errors. func TestKVPutAtMostOnce(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil { diff --git a/tests/integration/clientv3/lease/lease_test.go b/tests/integration/clientv3/lease/lease_test.go index 326289949..6a6cf7dd3 100644 --- a/tests/integration/clientv3/lease/lease_test.go +++ b/tests/integration/clientv3/lease/lease_test.go @@ -190,7 +190,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // TODO: change this line to get a cluster client @@ -416,7 +416,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -462,7 +462,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { func TestLeaseKeepAliveInitTimeout(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -495,7 +495,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) { func TestLeaseKeepAliveTTLTimeout(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -530,7 +530,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) { func TestLeaseTimeToLive(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) c := clus.RandClient() @@ -656,7 +656,7 @@ func TestLeaseLeases(t *testing.T) { func TestLeaseRenewLostQuorum(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -728,7 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) { // transient cluster failure. func TestV3LeaseFailureOverlap(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) numReqs := 5 @@ -782,7 +782,7 @@ func TestV3LeaseFailureOverlap(t *testing.T) { func TestLeaseWithRequireLeader(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) c := clus.Client(0) diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index 54236be97..aea6b2234 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -195,7 +195,7 @@ func TestLeasingPutInvalidateExisting(t *testing.T) { // TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased. func TestLeasingGetNoLeaseTTL(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -224,7 +224,7 @@ func TestLeasingGetNoLeaseTTL(t *testing.T) { // when the etcd cluster is partitioned. func TestLeasingGetSerializable(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -326,7 +326,7 @@ func TestLeasingRevGet(t *testing.T) { // TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server. func TestLeasingGetWithOpts(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -418,7 +418,7 @@ func TestLeasingConcurrentPut(t *testing.T) { func TestLeasingDisconnectedGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -550,7 +550,7 @@ func TestLeasingOverwriteResponse(t *testing.T) { func TestLeasingOwnerPutResponse(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -617,7 +617,7 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) { func TestLeasingTxnOwnerGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) client := clus.Client(0) @@ -773,7 +773,7 @@ func TestLeasingTxnOwnerDelete(t *testing.T) { func TestLeasingTxnOwnerIf(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -867,7 +867,7 @@ func TestLeasingTxnOwnerIf(t *testing.T) { func TestLeasingTxnCancel(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1085,7 +1085,7 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) { func TestLeasingOwnerPutError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1106,7 +1106,7 @@ func TestLeasingOwnerPutError(t *testing.T) { func TestLeasingOwnerDeleteError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1127,7 +1127,7 @@ func TestLeasingOwnerDeleteError(t *testing.T) { func TestLeasingNonOwnerPutError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1201,7 +1201,7 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) { func TestLeasingDeleteRangeBounds(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/") @@ -1376,7 +1376,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) { // disconnected when trying to submit revoke txn. func TestLeasingReconnectOwnerRevoke(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") @@ -1437,7 +1437,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) { // disconnected and the watch is compacted. func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") @@ -1490,7 +1490,7 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { // not cause inconsistency between the server and the client. func TestLeasingReconnectOwnerConsistency(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1650,7 +1650,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) { // TestLeasingReconnectTxn checks that Txn is resilient to disconnects. func TestLeasingReconnectTxn(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1686,7 +1686,7 @@ func TestLeasingReconnectTxn(t *testing.T) { // not cause inconsistency between the server and the client. func TestLeasingReconnectNonOwnerGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1814,7 +1814,7 @@ func TestLeasingDo(t *testing.T) { func TestLeasingTxnOwnerPutBranch(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1908,7 +1908,7 @@ func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, the func TestLeasingSessionExpire(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) @@ -1984,7 +1984,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) { for i := range tests { t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 965599583..e48a4a4fa 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -192,7 +192,7 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) { func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Context, *clientv3.Client) (io.ReadCloser, error)) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) // take about 1-second to read snapshot diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index fdce92495..b6b3ce71f 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -30,7 +30,7 @@ func TestDetectKvOrderViolation(t *testing.T) { var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ @@ -97,7 +97,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ diff --git a/tests/integration/clientv3/ordering_util_test.go b/tests/integration/clientv3/ordering_util_test.go index 85c61f407..a4b65ec38 100644 --- a/tests/integration/clientv3/ordering_util_test.go +++ b/tests/integration/clientv3/ordering_util_test.go @@ -80,7 +80,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { func TestUnresolvableOrderViolation(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ Endpoints: []string{ diff --git a/tests/integration/clientv3/txn_test.go b/tests/integration/clientv3/txn_test.go index ffe93e096..679b9868f 100644 --- a/tests/integration/clientv3/txn_test.go +++ b/tests/integration/clientv3/txn_test.go @@ -53,7 +53,7 @@ func TestTxnError(t *testing.T) { func TestTxnWriteFail(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) @@ -103,7 +103,7 @@ func TestTxnReadRetry(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) diff --git a/tests/integration/clientv3/watch_test.go b/tests/integration/clientv3/watch_test.go index 2fea3c9ba..b5a0dd08f 100644 --- a/tests/integration/clientv3/watch_test.go +++ b/tests/integration/clientv3/watch_test.go @@ -47,7 +47,7 @@ type watchctx struct { func runWatchTest(t *testing.T, f watcherTest) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) wclientMember := rand.Intn(3) @@ -348,7 +348,7 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { func TestWatchResumeInitRev(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -404,7 +404,7 @@ func TestWatchResumeInitRev(t *testing.T) { func TestWatchResumeCompacted(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // create a waiting watcher at rev 1 @@ -955,7 +955,7 @@ func TestWatchWithCreatedNotification(t *testing.T) { func TestWatchWithCreatedNotificationDropConn(t *testing.T) { integration.BeforeTest(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer cluster.Terminate(t) client := cluster.RandClient() @@ -1063,7 +1063,7 @@ func TestWatchOverlapDropConnContextCancel(t *testing.T) { func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) n := 100 @@ -1154,7 +1154,7 @@ func TestWatchCancelAndCloseClient(t *testing.T) { // then closes the watcher interface to ensure correct clean up. func TestWatchStressResumeClose(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index c854030a6..1261bfbe4 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -153,6 +153,9 @@ type ClusterConfig struct { // UseIP is true to use only IP for gRPC requests. UseIP bool + // UseBridge adds bridge between client and grpc server. Should be used in tests that + // want to manipulate connection or require connection not breaking despite server stop/restart. + UseBridge bool EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration @@ -313,6 +316,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member { clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, + useBridge: c.cfg.UseBridge, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, @@ -582,6 +586,7 @@ type member struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + useBridge bool isLearner bool closed bool @@ -605,6 +610,7 @@ type memberConfig struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + useBridge bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration WatchProgressNotifyInterval time.Duration @@ -698,6 +704,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.useIP = mcfg.useIP + m.useBridge = mcfg.useBridge m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval @@ -731,35 +738,53 @@ func memberLogger(t testutil.TB, name string) *zap.Logger { // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain - m.grpcURL = "localhost:" + m.Name - m.Logger.Info("LISTEN GRPC", zap.String("m.grpcURL", m.grpcURL), zap.String("m.Name", m.Name)) - if m.useIP { // for IP-only TLS certs - m.grpcURL = "127.0.0.1:" + m.Name - } - grpcListener, err := transport.NewUnixListener(m.grpcURL) + grpcAddr := m.grpcAddr() + m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name)) + grpcListener, err := transport.NewUnixListener(grpcAddr) if err != nil { - return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcURL, err) + return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err) } - bridgeAddr := m.grpcURL + "0" - bridgeListener, err := transport.NewUnixListener(bridgeAddr) - if err != nil { - grpcListener.Close() - return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcURL, err) + m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr + if m.useBridge { + _, err = m.addBridge() + if err != nil { + grpcListener.Close() + return err + } } - m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcURL}, bridgeListener) - if err != nil { - bridgeListener.Close() - grpcListener.Close() - return err - } - m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr m.grpcListener = grpcListener return nil } +func (m *member) addBridge() (*bridge, error) { + grpcAddr := m.grpcAddr() + bridgeAddr := grpcAddr + "0" + m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name)) + bridgeListener, err := transport.NewUnixListener(bridgeAddr) + if err != nil { + return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err) + } + m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener) + if err != nil { + bridgeListener.Close() + return nil, err + } + m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + return m.grpcBridge, nil +} + +func (m *member) grpcAddr() string { + // prefix with localhost so cert has right domain + addr := "localhost:" + m.Name + if m.useIP { // for IP-only TLS certs + addr = "127.0.0.1:" + m.Name + } + return addr +} + type dialer struct { network string - addr string + addr string } func (d dialer) Dial() (net.Conn, error) { diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index e25d77f21..2fb5a18d9 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -173,7 +173,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { } func TestForceNewCluster(t *testing.T) { - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) @@ -283,7 +283,7 @@ func testIssue2746(t *testing.T, members int) { func TestIssue2904(t *testing.T) { BeforeTest(t) // start 1-member cluster to ensure member 0 is the leader of the cluster. - c := NewCluster(t, 1) + c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -319,7 +319,7 @@ func TestIssue2904(t *testing.T) { func TestIssue3699(t *testing.T) { // start a cluster of 3 nodes a, b, c BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -371,7 +371,7 @@ func TestIssue3699(t *testing.T) { // TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. func TestRejectUnhealthyAdd(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -415,7 +415,7 @@ func TestRejectUnhealthyAdd(t *testing.T) { // if quorum will be lost. func TestRejectUnhealthyRemove(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 5) + c := newCluster(t, &ClusterConfig{Size: 5, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -464,7 +464,7 @@ func TestRestartRemoved(t *testing.T) { BeforeTest(t) // 1. start single-member cluster - c := NewCluster(t, 1) + c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -540,7 +540,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { func TestSpeedyTerminate(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) // Stop/Restart so requests will time out on lost leaders for i := 0; i < 3; i++ { clus.Members[i].Stop(t) diff --git a/tests/integration/member_test.go b/tests/integration/member_test.go index 5493924c9..99788b757 100644 --- a/tests/integration/member_test.go +++ b/tests/integration/member_test.go @@ -46,7 +46,7 @@ func TestPauseMember(t *testing.T) { func TestRestartMember(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -88,7 +88,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) { BeforeTest(t) - m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"}) + m := mustNewMember(t, memberConfig{name: "snapAndRestartTest", useBridge: true}) m.SnapshotCount = 100 m.Launch() defer m.Terminate(t) diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 55f0366cb..dc2191253 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -35,7 +35,7 @@ func TestV3StorageQuotaApply(t *testing.T) { BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) - clus := NewClusterV3(t, &ClusterConfig{Size: 2}) + clus := NewClusterV3(t, &ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) kvc0 := toGRPC(clus.Client(0)).KV kvc1 := toGRPC(clus.Client(1)).KV @@ -147,7 +147,7 @@ func TestV3AlarmDeactivate(t *testing.T) { func TestV3CorruptAlarm(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) var wg sync.WaitGroup diff --git a/tests/integration/v3_grpc_inflight_test.go b/tests/integration/v3_grpc_inflight_test.go index 9f5085112..7432fb46a 100644 --- a/tests/integration/v3_grpc_inflight_test.go +++ b/tests/integration/v3_grpc_inflight_test.go @@ -61,7 +61,7 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { // See https://github.com/etcd-io/etcd/issues/7322 for more detail. func TestV3KVInflightRangeRequests(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.RandClient() diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index e54ba26df..cbedafbe3 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -88,7 +88,7 @@ func TestV3PutOverwrite(t *testing.T) { // TestPutRestart checks if a put after an unrelated member restart succeeds func TestV3PutRestart(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kvIdx := rand.Intn(3) @@ -1210,7 +1210,7 @@ func TestV3Hash(t *testing.T) { // TestV3HashRestart ensures that hash stays the same after restart. func TestV3HashRestart(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.RandClient() @@ -1243,7 +1243,7 @@ func TestV3StorageQuotaAPI(t *testing.T) { BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) // Set a quota on one node clus.Members[0].QuotaBackendBytes = quotasize @@ -1858,7 +1858,7 @@ func TestGRPCRequireLeader(t *testing.T) { func TestGRPCStreamRequireLeader(t *testing.T) { BeforeTest(t) - cfg := ClusterConfig{Size: 3} + cfg := ClusterConfig{Size: 3, UseBridge: true} clus := newClusterV3NoClients(t, &cfg) defer clus.Terminate(t) diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 08b0ca7bb..1727da65c 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -36,7 +36,7 @@ import ( func TestV3LeasePromote(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // create lease @@ -237,6 +237,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { Size: 3, EnableLeaseCheckpoint: true, LeaseCheckpointInterval: leaseInterval, + UseBridge: true, }) defer clus.Terminate(t) @@ -649,7 +650,7 @@ const fiveMinTTL int64 = 300 func TestV3LeaseRecoverAndRevoke(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -700,7 +701,7 @@ func TestV3LeaseRecoverAndRevoke(t *testing.T) { func TestV3LeaseRevokeAndRecover(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -752,7 +753,7 @@ func TestV3LeaseRevokeAndRecover(t *testing.T) { func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -808,7 +809,7 @@ func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index b2a31cc2f..fb2f510a6 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1034,7 +1034,7 @@ func TestWatchWithProgressNotify(t *testing.T) { // TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams. func TestV3WatchClose(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) c := clus.Client(0) From 536475818101fe69f9dd5f4506c8d935c58d0548 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 Sep 2021 14:06:20 +0200 Subject: [PATCH 04/10] tests: Cleanup member interface by exposing Bridge directly --- tests/integration/bridge.go | 6 +++--- .../clientv3/connectivity/black_hole_test.go | 8 ++++---- tests/integration/clientv3/kv_test.go | 4 ++-- .../integration/clientv3/lease/leasing_test.go | 12 ++++++------ tests/integration/clientv3/watch_test.go | 18 +++++++++--------- tests/integration/cluster.go | 13 +++++++------ tests/integration/v3_watch_test.go | 2 +- 7 files changed, 32 insertions(+), 31 deletions(-) diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 67d6ae447..746168fc7 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -68,7 +68,7 @@ func (b *bridge) Close() { b.wg.Wait() } -func (b *bridge) Reset() { +func (b *bridge) DropConnections() { b.mu.Lock() defer b.mu.Unlock() for bc := range b.conns { @@ -77,13 +77,13 @@ func (b *bridge) Reset() { b.conns = make(map[*bridgeConn]struct{}) } -func (b *bridge) Pause() { +func (b *bridge) PauseConnections() { b.mu.Lock() b.pausec = make(chan struct{}) b.mu.Unlock() } -func (b *bridge) Unpause() { +func (b *bridge) UnpauseConnections() { b.mu.Lock() select { case <-b.pausec: diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index c9a199558..4445c69f6 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -77,7 +77,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { // give enough time for balancer resolution time.Sleep(5 * time.Second) - clus.Members[0].Blackhole() + clus.Members[0].Bridge().Blackhole() if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { t.Fatal(err) @@ -88,12 +88,12 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { t.Error("took too long to receive watch events") } - clus.Members[0].Unblackhole() + clus.Members[0].Bridge().Unblackhole() // waiting for moving eps[0] out of unhealthy, so that it can be re-pined. time.Sleep(ccfg.DialTimeout) - clus.Members[1].Blackhole() + clus.Members[1].Bridge().Blackhole() // make sure client[0] can connect to eps[0] after remove the blackhole. if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil { @@ -196,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien cli.SetEndpoints(eps...) // blackhole eps[0] - clus.Members[0].Blackhole() + clus.Members[0].Bridge().Blackhole() // With round robin balancer, client will make a request to a healthy endpoint // within a few requests. diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index b45240225..8dd98466d 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -884,12 +884,12 @@ func TestKVPutAtMostOnce(t *testing.T) { } for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() donec := make(chan struct{}) go func() { defer close(donec) for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(5 * time.Millisecond) } }() diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index aea6b2234..3e935d8e3 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -1510,11 +1510,11 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) { for i := 0; i < 10; i++ { v := fmt.Sprintf("%d", i) donec := make(chan struct{}) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() go func() { defer close(donec) for i := 0; i < 20; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } }() @@ -1664,9 +1664,9 @@ func TestLeasingReconnectTxn(t *testing.T) { donec := make(chan struct{}) go func() { defer close(donec) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } time.Sleep(10 * time.Millisecond) @@ -1704,11 +1704,11 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) { n := 0 for i := 0; i < 10; i++ { donec := make(chan struct{}) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() go func() { defer close(donec) for j := 0; j < 10; j++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } }() diff --git a/tests/integration/clientv3/watch_test.go b/tests/integration/clientv3/watch_test.go index b5a0dd08f..7a992ecf9 100644 --- a/tests/integration/clientv3/watch_test.go +++ b/tests/integration/clientv3/watch_test.go @@ -188,7 +188,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { defer close(donec) // take down watcher connection for { - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() select { case <-timer: // spinning on close may live lock reconnection @@ -230,7 +230,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "a") } @@ -247,7 +247,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) { } putAndWatch(t, wctx, "a", "a") // take down watcher connection - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "b") } @@ -368,8 +368,8 @@ func TestWatchResumeInitRev(t *testing.T) { t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok) } // pause wch - clus.Members[0].DropConnections() - clus.Members[0].PauseConnections() + clus.Members[0].Bridge().DropConnections() + clus.Members[0].Bridge().PauseConnections() select { case resp, ok := <-wch: @@ -378,7 +378,7 @@ func TestWatchResumeInitRev(t *testing.T) { } // resume wch - clus.Members[0].UnpauseConnections() + clus.Members[0].Bridge().UnpauseConnections() select { case resp, ok := <-wch: @@ -968,7 +968,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) { t.Fatalf("expected created event, got %v", resp) } - cluster.Members[0].DropConnections() + cluster.Members[0].Bridge().DropConnections() // check watch channel doesn't post another watch response. select { @@ -1056,7 +1056,7 @@ func TestWatchOverlapContextCancel(t *testing.T) { func TestWatchOverlapDropConnContextCancel(t *testing.T) { f := func(clus *integration.ClusterV3) { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() } testWatchOverlapContextCancel(t, f) } @@ -1164,7 +1164,7 @@ func TestWatchStressResumeClose(t *testing.T) { for i := range wchs { wchs[i] = cli.Watch(ctx, "abc") } - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() cancel() if err := cli.Close(); err != nil { t.Fatal(err) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 1261bfbe4..86f3e4593 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -773,6 +773,13 @@ func (m *member) addBridge() (*bridge, error) { return m.grpcBridge, nil } +func (m *member) Bridge() *bridge { + if !m.useBridge { + m.Logger.Panic("Bridge not available. Please configure using bridge before creating cluster.") + } + return m.grpcBridge +} + func (m *member) grpcAddr() string { // prefix with localhost so cert has right domain addr := "localhost:" + m.Name @@ -797,12 +804,6 @@ func (m *member) ElectionTimeout() time.Duration { func (m *member) ID() types.ID { return m.s.ID() } -func (m *member) DropConnections() { m.grpcBridge.Reset() } -func (m *member) PauseConnections() { m.grpcBridge.Pause() } -func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } -func (m *member) Blackhole() { m.grpcBridge.Blackhole() } -func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } - // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) { if m.grpcURL == "" { diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index fb2f510a6..323d0d72c 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1062,7 +1062,7 @@ func TestV3WatchClose(t *testing.T) { }() } - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() wg.Wait() } From 77cc91e0b2ff5d35f5b407f118d875fb88318ae0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 Sep 2021 14:32:56 +0200 Subject: [PATCH 05/10] test: Use unique number for grpc port --- tests/integration/cluster.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 86f3e4593..deb82b021 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -73,6 +73,7 @@ const ( basePort = 21000 URLScheme = "unix" URLSchemeTLS = "unixs" + baseGRPCPort = 30000 ) var ( @@ -121,6 +122,10 @@ var ( defaultTokenJWT = fmt.Sprintf("jwt,pub-key=%s,priv-key=%s,sign-method=RS256,ttl=1s", MustAbsPath("../fixtures/server.crt"), MustAbsPath("../fixtures/server.key.insecure")) + + // uniqueNumber is used to generate unique port numbers + // Should only be accessed via atomic package methods. + uniqueNumber int32 ) type ClusterConfig struct { @@ -211,7 +216,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster { c := &cluster{cfg: cfg} ms := make([]*member, cfg.Size) for i := 0; i < cfg.Size; i++ { - ms[i] = c.mustNewMember(t) + ms[i] = c.mustNewMember(t, int32(i)) } c.Members = ms if err := c.fillClusterForMembers(); err != nil { @@ -298,10 +303,11 @@ func (c *cluster) HTTPMembers() []client.Member { return ms } -func (c *cluster) mustNewMember(t testutil.TB) *member { +func (c *cluster) mustNewMember(t testutil.TB, number int32) *member { m := mustNewMember(t, memberConfig{ name: c.generateMemberName(), + memberNumber: number, authToken: c.cfg.AuthToken, peerTLS: c.cfg.PeerTLS, clientTLS: c.cfg.ClientTLS, @@ -332,7 +338,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member { // addMember return PeerURLs of the added member. func (c *cluster) addMember(t testutil.TB) types.URLs { - m := c.mustNewMember(t) + m := c.mustNewMember(t,0) scheme := schemeFromTLSInfo(c.cfg.PeerTLS) @@ -561,6 +567,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type member struct { config.ServerConfig + uniqNumber int32 + memberNumber int32 PeerListeners, ClientListeners []net.Listener grpcListener net.Listener // PeerTLSInfo enables peer TLS when set @@ -596,6 +604,7 @@ func (m *member) GRPCURL() string { return m.grpcURL } type memberConfig struct { name string + memberNumber int32 peerTLS *transport.TLSInfo clientTLS *transport.TLSInfo authToken string @@ -620,7 +629,10 @@ type memberConfig struct { // set, it will use https scheme to communicate between peers. func mustNewMember(t testutil.TB, mcfg memberConfig) *member { var err error - m := &member{} + m := &member{ + uniqNumber: atomic.AddInt32(&uniqueNumber, 1), + memberNumber: mcfg.memberNumber, + } peerScheme := schemeFromTLSInfo(mcfg.peerTLS) clientScheme := schemeFromTLSInfo(mcfg.clientTLS) @@ -782,11 +794,11 @@ func (m *member) Bridge() *bridge { func (m *member) grpcAddr() string { // prefix with localhost so cert has right domain - addr := "localhost:" + m.Name + host := "localhost" if m.useIP { // for IP-only TLS certs - addr = "127.0.0.1:" + m.Name + host = "127.0.0.1" } - return addr + return fmt.Sprintf("%s:%d", host, baseGRPCPort + m.uniqNumber * 10 + m.memberNumber) } type dialer struct { @@ -1462,7 +1474,7 @@ func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) { // AddAndLaunchLearnerMember creates a leaner member, adds it to cluster // via v3 MemberAdd API, and then launches the new member. func (c *ClusterV3) AddAndLaunchLearnerMember(t testutil.TB) { - m := c.mustNewMember(t) + m := c.mustNewMember(t, 0) m.isLearner = true scheme := schemeFromTLSInfo(c.cfg.PeerTLS) @@ -1563,7 +1575,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], // MustNewMember creates a new member instance based on the response of V3 Member Add API. func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member { - m := c.mustNewMember(t) + m := c.mustNewMember(t,0) m.isLearner = resp.Member.IsLearner m.NewCluster = false From c3cc22c60d1abad5f052f62180b2fe4aa4daacbb Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Sep 2021 13:01:22 +0200 Subject: [PATCH 06/10] tests: Allow configuring integration tests to use TCP --- tests/integration/cluster.go | 76 +++++++++++++++++++++++-------- tests/integration/v3_grpc_test.go | 3 ++ 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index deb82b021..025a40580 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -161,6 +161,8 @@ type ClusterConfig struct { // UseBridge adds bridge between client and grpc server. Should be used in tests that // want to manipulate connection or require connection not breaking despite server stop/restart. UseBridge bool + // UseTCP configures server listen on tcp socket. If disabled unix socket is used. + UseTCP bool EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration @@ -216,7 +218,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster { c := &cluster{cfg: cfg} ms := make([]*member, cfg.Size) for i := 0; i < cfg.Size; i++ { - ms[i] = c.mustNewMember(t, int32(i)) + ms[i] = c.mustNewMember(t, int64(i)) } c.Members = ms if err := c.fillClusterForMembers(); err != nil { @@ -303,11 +305,11 @@ func (c *cluster) HTTPMembers() []client.Member { return ms } -func (c *cluster) mustNewMember(t testutil.TB, number int32) *member { +func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member { m := mustNewMember(t, memberConfig{ name: c.generateMemberName(), - memberNumber: number, + memberNumber: memberNumber, authToken: c.cfg.AuthToken, peerTLS: c.cfg.PeerTLS, clientTLS: c.cfg.ClientTLS, @@ -323,6 +325,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member { clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, useBridge: c.cfg.UseBridge, + useTCP: c.cfg.UseTCP, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, @@ -338,7 +341,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member { // addMember return PeerURLs of the added member. func (c *cluster) addMember(t testutil.TB) types.URLs { - m := c.mustNewMember(t,0) + m := c.mustNewMember(t, 0) scheme := schemeFromTLSInfo(c.cfg.PeerTLS) @@ -567,8 +570,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type member struct { config.ServerConfig - uniqNumber int32 - memberNumber int32 + UniqNumber int64 + MemberNumber int64 PeerListeners, ClientListeners []net.Listener grpcListener net.Listener // PeerTLSInfo enables peer TLS when set @@ -595,6 +598,7 @@ type member struct { clientMaxCallRecvMsgSize int useIP bool useBridge bool + useTCP bool isLearner bool closed bool @@ -604,7 +608,8 @@ func (m *member) GRPCURL() string { return m.grpcURL } type memberConfig struct { name string - memberNumber int32 + uniqNumber int64 + memberNumber int64 peerTLS *transport.TLSInfo clientTLS *transport.TLSInfo authToken string @@ -620,6 +625,7 @@ type memberConfig struct { clientMaxCallRecvMsgSize int useIP bool useBridge bool + useTCP bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration WatchProgressNotifyInterval time.Duration @@ -630,8 +636,8 @@ type memberConfig struct { func mustNewMember(t testutil.TB, mcfg memberConfig) *member { var err error m := &member{ - uniqNumber: atomic.AddInt32(&uniqueNumber, 1), - memberNumber: mcfg.memberNumber, + MemberNumber: mcfg.memberNumber, + UniqNumber: atomic.AddInt64(&localListenCount, 1), } peerScheme := schemeFromTLSInfo(mcfg.peerTLS) @@ -717,6 +723,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.useIP = mcfg.useIP m.useBridge = mcfg.useBridge + m.useTCP = mcfg.useTCP m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval @@ -750,13 +757,14 @@ func memberLogger(t testutil.TB, name string) *zap.Logger { // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain - grpcAddr := m.grpcAddr() + network, host, port := m.grpcAddr() + grpcAddr := host + ":" + port m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name)) - grpcListener, err := transport.NewUnixListener(grpcAddr) + grpcListener, err := net.Listen(network, grpcAddr) if err != nil { return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err) } - m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr + m.grpcURL = fmt.Sprintf("%s://%s", m.clientScheme(), grpcAddr) if m.useBridge { _, err = m.addBridge() if err != nil { @@ -768,20 +776,36 @@ func (m *member) listenGRPC() error { return nil } +func (m *member) clientScheme() string { + switch { + case m.useTCP && m.ClientTLSInfo != nil: + return "https" + case m.useTCP && m.ClientTLSInfo == nil: + return "http" + case !m.useTCP && m.ClientTLSInfo != nil: + return "unixs" + case !m.useTCP && m.ClientTLSInfo == nil: + return "unix" + } + m.Logger.Panic("Failed to determine client schema") + return "" +} + func (m *member) addBridge() (*bridge, error) { - grpcAddr := m.grpcAddr() + network, host, port := m.grpcAddr() + grpcAddr := host + ":" + port bridgeAddr := grpcAddr + "0" m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name)) bridgeListener, err := transport.NewUnixListener(bridgeAddr) if err != nil { - return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err) + return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", bridgeAddr, err) } - m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener) + m.grpcBridge, err = newBridge(dialer{network: network, addr: grpcAddr}, bridgeListener) if err != nil { bridgeListener.Close() return nil, err } - m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + m.grpcURL = m.clientScheme() + "://" + bridgeAddr return m.grpcBridge, nil } @@ -792,13 +816,25 @@ func (m *member) Bridge() *bridge { return m.grpcBridge } -func (m *member) grpcAddr() string { +func (m *member) grpcAddr() (network, host, port string) { // prefix with localhost so cert has right domain - host := "localhost" + host = "localhost" if m.useIP { // for IP-only TLS certs host = "127.0.0.1" } - return fmt.Sprintf("%s:%d", host, baseGRPCPort + m.uniqNumber * 10 + m.memberNumber) + network = "unix" + if m.useTCP { + network = "tcp" + } + port = m.Name + if m.useTCP { + port = fmt.Sprintf("%d", GrpcPortNumber(m.UniqNumber, m.MemberNumber)) + } + return network, host, port +} + +func GrpcPortNumber(uniqNumber, memberNumber int64) int64 { + return baseGRPCPort + uniqNumber*10 + memberNumber } type dialer struct { @@ -1575,7 +1611,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], // MustNewMember creates a new member instance based on the response of V3 Member Add API. func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member { - m := c.mustNewMember(t,0) + m := c.mustNewMember(t, 0) m.isLearner = resp.Member.IsLearner m.NewCluster = false diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index cbedafbe3..ca9e5c8ad 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -22,6 +22,7 @@ import ( "math/rand" "os" "reflect" + "strings" "testing" "time" @@ -1601,8 +1602,10 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) { clus.Members[0].ClientTLSInfo = &testTLSInfo clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()} + clus.Members[0].grpcURL = strings.Replace(clus.Members[0].grpcURL, "http://", "https://", 1) client, err := NewClientV3(clus.Members[0]) if client != nil || err == nil { + client.Close() t.Fatalf("expected no client") } else if err != context.DeadlineExceeded { t.Fatalf("unexpected error (%v)", err) From 90932324b140b84651d22d25ee01805bf2e32337 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 17 Sep 2021 12:10:20 +0200 Subject: [PATCH 07/10] client: Add grpc authority header integration tests --- pkg/grpc_testing/recorder.go | 69 ++++++++ server/embed/etcd.go | 2 +- server/embed/serve.go | 4 +- server/etcdserver/api/v3rpc/grpc.go | 6 +- server/etcdserver/api/v3rpc/interceptor.go | 2 +- tests/integration/cluster.go | 42 ++++- tests/integration/grpc_test.go | 182 +++++++++++++++++++++ 7 files changed, 296 insertions(+), 11 deletions(-) create mode 100644 pkg/grpc_testing/recorder.go create mode 100644 tests/integration/grpc_test.go diff --git a/pkg/grpc_testing/recorder.go b/pkg/grpc_testing/recorder.go new file mode 100644 index 000000000..d6b6d2aac --- /dev/null +++ b/pkg/grpc_testing/recorder.go @@ -0,0 +1,69 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_testing + +import ( + "context" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type GrpcRecorder struct { + mux sync.RWMutex + requests []RequestInfo +} + +type RequestInfo struct { + FullMethod string + Authority string +} + +func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ri.record(toRequestInfo(ctx, info)) + resp, err := handler(ctx, req) + return resp, err + } +} + +func (ri *GrpcRecorder) RecordedRequests() []RequestInfo { + ri.mux.RLock() + defer ri.mux.RUnlock() + reqs := make([]RequestInfo, len(ri.requests)) + copy(reqs, ri.requests) + return reqs +} + +func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo { + req := RequestInfo{ + FullMethod: info.FullMethod, + } + md, ok := metadata.FromIncomingContext(ctx) + if ok { + as := md.Get(":authority") + if len(as) != 0 { + req.Authority = as[0] + } + } + return req +} + +func (ri *GrpcRecorder) record(r RequestInfo) { + ri.mux.Lock() + defer ri.mux.Unlock() + ri.requests = append(ri.requests, r) +} diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 2e20566de..feb846ea1 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) { for _, p := range e.Peers { u := p.Listener.Addr().String() - gs := v3rpc.Server(e.Server, peerTLScfg) + gs := v3rpc.Server(e.Server, peerTLScfg, nil) m := cmux.New(p.Listener) go gs.Serve(m.Match(cmux.HTTP2())) srv := &http.Server{ diff --git a/server/embed/serve.go b/server/embed/serve.go index 17b55384e..c3e786321 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve( }() if sctx.insecure { - gs = v3rpc.Server(s, nil, gopts...) + gs = v3rpc.Server(s, nil, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve( if tlsErr != nil { return tlsErr } - gs = v3rpc.Server(s, tlscfg, gopts...) + gs = v3rpc.Server(s, tlscfg, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 26c52b385..ea3dd7570 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -36,19 +36,21 @@ const ( maxSendBytes = math.MaxInt32 ) -func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { +func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) if tls != nil { bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls}) opts = append(opts, grpc.Creds(bundle.TransportCredentials())) } - chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, } + if interceptor != nil { + chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) + } chainStreamInterceptors := []grpc.StreamServerInterceptor{ newStreamInterceptor(s), diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 0d4d5c329..0d41ef527 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -76,7 +76,7 @@ func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerIntercepto startTime := time.Now() resp, err := handler(ctx, req) lg := s.Logger() - if lg != nil { // acquire stats if debug level is enabled or request is expensive + if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp) } return resp, err diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 025a40580..528bcb902 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -39,6 +39,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/v2" "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/grpc_testing" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/embed" @@ -602,6 +603,8 @@ type member struct { isLearner bool closed bool + + grpcServerRecorder *grpc_testing.GrpcRecorder } func (m *member) GRPCURL() string { return m.grpcURL } @@ -734,7 +737,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration m.V2Deprecation = config.V2_DEPR_DEFAULT - + m.grpcServerRecorder = &grpc_testing.GrpcRecorder{} m.Logger = memberLogger(t, mcfg.name) t.Cleanup(func() { // if we didn't cleanup the logger, the consecutive test @@ -947,8 +950,8 @@ func (m *member) Launch() error { return err } } - m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) - m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) + m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerRecorder.UnaryInterceptor(), m.grpcServerOpts...) + m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg, m.grpcServerRecorder.UnaryInterceptor()) m.serverClient = v3client.New(m.s) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) @@ -1083,6 +1086,10 @@ func (m *member) Launch() error { return nil } +func (m *member) RecordedRequests() []grpc_testing.RequestInfo { + return m.grpcServerRecorder.RecordedRequests() +} + func (m *member) WaitOK(t testutil.TB) { m.WaitStarted(t) for m.s.Leader() == 0 { @@ -1372,8 +1379,9 @@ func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] type ClusterV3 struct { *cluster - mu sync.Mutex - clients []*clientv3.Client + mu sync.Mutex + clients []*clientv3.Client + clusterClient *clientv3.Client } // NewClusterV3 returns a launched cluster with a grpc client connection @@ -1419,6 +1427,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) { t.Error(err) } } + if c.clusterClient != nil { + if err := c.clusterClient.Close(); err != nil { + t.Error(err) + } + } c.mu.Unlock() c.cluster.Terminate(t) } @@ -1431,6 +1444,25 @@ func (c *ClusterV3) Client(i int) *clientv3.Client { return c.clients[i] } +func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) { + if c.clusterClient == nil { + endpoints := []string{} + for _, m := range c.Members { + endpoints = append(endpoints, m.grpcURL) + } + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + c.clusterClient, err = newClientV3(cfg) + if err != nil { + return nil, err + } + } + return c.clusterClient, nil +} + // NewClientV3 creates a new grpc client connection to the member func (c *ClusterV3) NewClientV3(memberIndex int) (*clientv3.Client, error) { return NewClientV3(c.Members[memberIndex]) diff --git a/tests/integration/grpc_test.go b/tests/integration/grpc_test.go new file mode 100644 index 000000000..49cbd1df5 --- /dev/null +++ b/tests/integration/grpc_test.go @@ -0,0 +1,182 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + tls "crypto/tls" + "fmt" + "strings" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" +) + +func TestAuthority(t *testing.T) { + tcs := []struct { + name string + useTCP bool + useTLS bool + // Pattern used to generate endpoints for client. Fields filled + // %d - will be filled with member grpc port + // %s - will be filled with member name + clientURLPattern string + + // Pattern used to validate authority received by server. Fields filled: + // %s - list of endpoints concatenated with ";" + expectAuthorityPattern string + }{ + { + name: "unix:path", + clientURLPattern: "unix:localhost:%s", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "unix://absolute_path", + clientURLPattern: "unix://localhost:%s", + expectAuthorityPattern: "#initially=[%s]", + }, + // "unixs" is not standard schema supported by etcd + { + name: "unixs:absolute_path", + useTLS: true, + clientURLPattern: "unixs:localhost:%s", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "unixs://absolute_path", + useTLS: true, + clientURLPattern: "unixs://localhost:%s", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "http://domain[:port]", + useTCP: true, + clientURLPattern: "http://localhost:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://domain[:port]", + useTLS: true, + useTCP: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "http://address[:port]", + useTCP: true, + clientURLPattern: "http://127.0.0.1:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://address[:port]", + useTCP: true, + useTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + } + for _, tc := range tcs { + for _, clusterSize := range []int{1, 3} { + t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) { + BeforeTest(t) + cfg := ClusterConfig{ + Size: clusterSize, + UseTCP: tc.useTCP, + UseIP: tc.useTCP, + } + cfg, tlsConfig := setupTLS(t, tc.useTLS, cfg) + clus := NewClusterV3(t, &cfg) + defer clus.Terminate(t) + endpoints := templateEndpoints(t, tc.clientURLPattern, clus) + + kv := setupClient(t, tc.clientURLPattern, clus, tlsConfig) + defer kv.Close() + + _, err := kv.Put(context.TODO(), "foo", "bar") + if err != nil { + t.Fatal(err) + } + + assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, strings.Join(endpoints, ";")), clus) + }) + } + } +} + +func setupTLS(t *testing.T, useTLS bool, cfg ClusterConfig) (ClusterConfig, *tls.Config) { + t.Helper() + if useTLS { + cfg.ClientTLS = &testTLSInfo + tlsConfig, err := testTLSInfo.ClientConfig() + if err != nil { + t.Fatal(err) + } + return cfg, tlsConfig + } + return cfg, nil +} + +func setupClient(t *testing.T, endpointPattern string, clus *ClusterV3, tlsConfig *tls.Config) *clientv3.Client { + t.Helper() + endpoints := templateEndpoints(t, endpointPattern, clus) + kv, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + TLS: tlsConfig, + }) + if err != nil { + t.Fatal(err) + } + return kv +} + +func templateEndpoints(t *testing.T, pattern string, clus *ClusterV3) []string { + t.Helper() + endpoints := []string{} + for _, m := range clus.Members { + ent := pattern + if strings.Contains(ent, "%d") { + ent = fmt.Sprintf(ent, GrpcPortNumber(m.UniqNumber, m.MemberNumber)) + } + if strings.Contains(ent, "%s") { + ent = fmt.Sprintf(ent, m.Name) + } + if strings.Contains(ent, "%") { + t.Fatalf("Failed to template pattern, %% symbol left %q", ent) + } + endpoints = append(endpoints, ent) + } + return endpoints +} + +func assertAuthority(t *testing.T, expectedAuthority string, clus *ClusterV3) { + t.Helper() + requestsFound := 0 + for _, m := range clus.Members { + for _, r := range m.RecordedRequests() { + requestsFound++ + if r.Authority != expectedAuthority { + t.Errorf("Got unexpected authority header, member: %q, request: %q, got authority: %q, expected %q", m.Name, r.FullMethod, r.Authority, expectedAuthority) + } + } + } + if requestsFound == 0 { + t.Errorf("Expected at least one request") + } +} From ec419f8613df9d333c661d1b979527f1f57c7838 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Sep 2021 18:09:17 +0200 Subject: [PATCH 08/10] tests: Add grpc authority e2e tests --- tests/e2e/cluster_proxy_test.go | 4 + tests/e2e/cluster_test.go | 2 + tests/e2e/ctl_v3_grpc_test.go | 213 ++++++++++++++++++++++++++++++++ tests/e2e/etcd_process.go | 15 ++- 4 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 tests/e2e/ctl_v3_grpc_test.go diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index f11db67ac..fd7924835 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { return p.etcdProc.WithStopSignal(sig) } +func (p *proxyEtcdProcess) Logs() logsExpect { + return p.etcdProc.Logs() +} + type proxyProc struct { lg *zap.Logger execPath string diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 4b3993d5c..eb39b3afe 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct { execPath string dataDirPath string keepDataDir bool + envVars map[string]string clusterSize int @@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* lg: lg, execPath: cfg.execPath, args: args, + envVars: cfg.envVars, tlsArgs: cfg.tlsArgs(), dataDirPath: dataDirPath, keepDataDir: cfg.keepDataDir, diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go new file mode 100644 index 000000000..b54b773ec --- /dev/null +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -0,0 +1,213 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy +// +build !cluster_proxy + +package e2e + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/client/pkg/v3/testutil" +) + +func TestAuthority(t *testing.T) { + tcs := []struct { + name string + useTLS bool + useInsecureTLS bool + // Pattern used to generate endpoints for client. Fields filled + // %d - will be filled with member grpc port + clientURLPattern string + + // Pattern used to validate authority received by server. Fields filled: + // %s - list of endpoints concatenated with ";" + expectAuthorityPattern string + }{ + { + name: "http://domain[:port]", + clientURLPattern: "http://localhost:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "http://address[:port]", + clientURLPattern: "http://127.0.0.1:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://domain[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://address[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://domain[:port]", + useTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + { + name: "https://address[:port]", + useTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "#initially=[%s]", + }, + } + for _, tc := range tcs { + for _, clusterSize := range []int{1, 3} { + t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) { + BeforeTest(t) + + cfg := newConfigNoTLS() + cfg.clusterSize = clusterSize + if tc.useTLS { + cfg.clientTLS = clientTLS + } + cfg.isClientAutoTLS = tc.useInsecureTLS + // Enable debug mode to get logs with http2 headers (including authority) + cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"} + + epc, err := newEtcdProcessCluster(t, cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer epc.Close() + endpoints := templateEndpoints(t, tc.clientURLPattern, epc) + + client := clusterEtcdctlV3(cfg, endpoints) + err = client.Put("foo", "bar") + if err != nil { + t.Fatal(err) + } + + executeWithTimeout(t, 5*time.Second, func() { + assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, strings.Join(endpoints, ";")), epc) + }) + }) + + } + } +} + +func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string { + t.Helper() + endpoints := []string{} + for i := 0; i < clus.cfg.clusterSize; i++ { + ent := pattern + if strings.Contains(ent, "%d") { + ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5) + } + if strings.Contains(ent, "%") { + t.Fatalf("Failed to template pattern, %% symbol left %q", ent) + } + endpoints = append(endpoints, ent) + } + return endpoints +} + +func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) { + logs := []logsExpect{} + for _, proc := range clus.procs { + logs = append(logs, proc.Logs()) + } + line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...) + line = strings.TrimSuffix(line, "\n") + line = strings.TrimSuffix(line, "\r") + expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority) + assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine)) +} + +func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string { + t.Helper() + match := make(chan string, len(logs)) + for i := range logs { + go func(l logsExpect) { + line, _ := l.Expect(expectLine) + match <- line + }(logs[i]) + } + return <-match +} + +func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} + +type etcdctlV3 struct { + cfg *etcdProcessClusterConfig + endpoints []string +} + +func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 { + return &etcdctlV3{ + cfg: cfg, + endpoints: endpoints, + } +} + +func (ctl *etcdctlV3) Put(key, value string) error { + return ctl.runCmd("put", key, value) +} + +func (ctl *etcdctlV3) runCmd(args ...string) error { + cmdArgs := []string{ctlBinPath + "3"} + for k, v := range ctl.flags() { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) + } + cmdArgs = append(cmdArgs, args...) + return spawnWithExpect(cmdArgs, "OK") +} + +func (ctl *etcdctlV3) flags() map[string]string { + fmap := make(map[string]string) + if ctl.cfg.clientTLS == clientTLS { + if ctl.cfg.isClientAutoTLS { + fmap["insecure-transport"] = "false" + fmap["insecure-skip-tls-verify"] = "true" + } else if ctl.cfg.isClientCRL { + fmap["cacert"] = caPath + fmap["cert"] = revokedCertPath + fmap["key"] = revokedPrivateKeyPath + } else { + fmap["cacert"] = caPath + fmap["cert"] = certPath + fmap["key"] = privateKeyPath + } + } + fmap["endpoints"] = strings.Join(ctl.endpoints, ",") + return fmap +} diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index c61001cec..6fbb595e0 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -43,6 +43,11 @@ type etcdProcess interface { Close() error WithStopSignal(sig os.Signal) os.Signal Config() *etcdServerProcessConfig + Logs() logsExpect +} + +type logsExpect interface { + Expect(string) (string, error) } type etcdServerProcess struct { @@ -56,6 +61,7 @@ type etcdServerProcessConfig struct { execPath string args []string tlsArgs []string + envVars map[string]string dataDirPath string keepDataDir bool @@ -92,7 +98,7 @@ func (ep *etcdServerProcess) Start() error { panic("already started") } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name)) - proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), nil) + proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars) if err != nil { return err } @@ -163,3 +169,10 @@ func (ep *etcdServerProcess) waitReady() error { } func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } + +func (ep *etcdServerProcess) Logs() logsExpect { + if ep.proc == nil { + ep.cfg.lg.Panic("Please grap logs before process is stopped") + } + return ep.proc +} From c929a917b6aa5a731a9bcdf8558678a4f6fcb227 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 17 Sep 2021 12:20:59 +0200 Subject: [PATCH 09/10] client: Use first endpoint as http2 authority header --- client/v3/client.go | 18 ++++++++++++++--- tests/e2e/ctl_v3_grpc_test.go | 16 +++++++-------- tests/integration/grpc_test.go | 37 ++++++++++++++++++++++++---------- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/client/v3/client.go b/client/v3/client.go index 530b0399f..c39f00421 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -297,9 +297,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc. dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout) defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? } - - initialEndpoints := strings.Join(c.Endpoints(), ";") - target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints) + target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0])) conn, err := grpc.DialContext(dctx, target, opts...) if err != nil { return nil, err @@ -307,6 +305,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc. return conn, nil } +func authority(endpoint string) string { + spl := strings.SplitN(endpoint, "://", 2) + if len(spl) < 2 { + if strings.HasPrefix(endpoint, "unix:") { + return endpoint[len("unix:"):] + } + if strings.HasPrefix(endpoint, "unixs:") { + return endpoint[len("unixs:"):] + } + return endpoint + } + return spl[1] +} + func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { r := endpoint.RequiresCredentials(ep) switch r { diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index b54b773ec..b0f824552 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -37,44 +37,44 @@ func TestAuthority(t *testing.T) { clientURLPattern string // Pattern used to validate authority received by server. Fields filled: - // %s - list of endpoints concatenated with ";" + // %d - will be filled with first member grpc port expectAuthorityPattern string }{ { name: "http://domain[:port]", clientURLPattern: "http://localhost:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%d", }, { name: "http://address[:port]", clientURLPattern: "http://127.0.0.1:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "127.0.0.1:%d", }, { name: "https://domain[:port] insecure", useTLS: true, useInsecureTLS: true, clientURLPattern: "https://localhost:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%d", }, { name: "https://address[:port] insecure", useTLS: true, useInsecureTLS: true, clientURLPattern: "https://127.0.0.1:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "127.0.0.1:%d", }, { name: "https://domain[:port]", useTLS: true, clientURLPattern: "https://localhost:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%d", }, { name: "https://address[:port]", useTLS: true, clientURLPattern: "https://127.0.0.1:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "127.0.0.1:%d", }, } for _, tc := range tcs { @@ -105,7 +105,7 @@ func TestAuthority(t *testing.T) { } executeWithTimeout(t, 5*time.Second, func() { - assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, strings.Join(endpoints, ";")), epc) + assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc) }) }) diff --git a/tests/integration/grpc_test.go b/tests/integration/grpc_test.go index 49cbd1df5..eb71191a3 100644 --- a/tests/integration/grpc_test.go +++ b/tests/integration/grpc_test.go @@ -37,57 +37,58 @@ func TestAuthority(t *testing.T) { clientURLPattern string // Pattern used to validate authority received by server. Fields filled: - // %s - list of endpoints concatenated with ";" + // %d - will be filled with first member grpc port + // %s - will be filled with first member name expectAuthorityPattern string }{ { name: "unix:path", clientURLPattern: "unix:localhost:%s", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%s", }, { name: "unix://absolute_path", clientURLPattern: "unix://localhost:%s", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%s", }, // "unixs" is not standard schema supported by etcd { name: "unixs:absolute_path", useTLS: true, clientURLPattern: "unixs:localhost:%s", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%s", }, { name: "unixs://absolute_path", useTLS: true, clientURLPattern: "unixs://localhost:%s", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%s", }, { name: "http://domain[:port]", useTCP: true, clientURLPattern: "http://localhost:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%d", }, { name: "https://domain[:port]", useTLS: true, useTCP: true, clientURLPattern: "https://localhost:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "localhost:%d", }, { name: "http://address[:port]", useTCP: true, clientURLPattern: "http://127.0.0.1:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "127.0.0.1:%d", }, { name: "https://address[:port]", useTCP: true, useTLS: true, clientURLPattern: "https://127.0.0.1:%d", - expectAuthorityPattern: "#initially=[%s]", + expectAuthorityPattern: "127.0.0.1:%d", }, } for _, tc := range tcs { @@ -102,7 +103,6 @@ func TestAuthority(t *testing.T) { cfg, tlsConfig := setupTLS(t, tc.useTLS, cfg) clus := NewClusterV3(t, &cfg) defer clus.Terminate(t) - endpoints := templateEndpoints(t, tc.clientURLPattern, clus) kv := setupClient(t, tc.clientURLPattern, clus, tlsConfig) defer kv.Close() @@ -112,7 +112,7 @@ func TestAuthority(t *testing.T) { t.Fatal(err) } - assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, strings.Join(endpoints, ";")), clus) + assertAuthority(t, templateAuthority(t, tc.expectAuthorityPattern, clus.Members[0]), clus) }) } } @@ -165,6 +165,21 @@ func templateEndpoints(t *testing.T, pattern string, clus *ClusterV3) []string { return endpoints } +func templateAuthority(t *testing.T, pattern string, m *member) string { + t.Helper() + authority := pattern + if strings.Contains(authority, "%d") { + authority = fmt.Sprintf(authority, GrpcPortNumber(m.UniqNumber, m.MemberNumber)) + } + if strings.Contains(authority, "%s") { + authority = fmt.Sprintf(authority, m.Name) + } + if strings.Contains(authority, "%") { + t.Fatalf("Failed to template pattern, %% symbol left %q", authority) + } + return authority +} + func assertAuthority(t *testing.T, expectedAuthority string, clus *ClusterV3) { t.Helper() requestsFound := 0 From d52d7fc21e15bf8d3465fae2f555242ef63ec819 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 29 Sep 2021 13:07:43 +0200 Subject: [PATCH 10/10] Update changelog with information about authority header fix --- CHANGELOG-3.5.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index 974c1bde6..e2a52bff0 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -16,6 +16,13 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.5.1) and - Fix [self-signed-cert-validity parameter cannot be specified in the config file](https://github.com/etcd-io/etcd/pull/13237). +### etcd client + +- [Fix etcd client sends invalid :authority header](https://github.com/etcd-io/etcd/issues/13192) + +### package clientv3 + +- Endpoints self identify now as `etcd-endpoints://{id}/{authority}` where authority is based on first endpoint passed, for example `etcd-endpoints://0xc0009d8540/localhost:2079`