diff --git a/server/etcdserver/api/membership/storev2.go b/server/etcdserver/api/membership/storev2.go index 9ed94dee9..d428cb66e 100644 --- a/server/etcdserver/api/membership/storev2.go +++ b/server/etcdserver/api/membership/storev2.go @@ -17,9 +17,10 @@ package membership import ( "encoding/json" "fmt" - "go.etcd.io/etcd/client/pkg/v3/types" "path" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "github.com/coreos/go-semver/semver" diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index aab849b5b..33db80370 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -209,7 +209,7 @@ func (c *Cluster) fillClusterForMembers() error { return nil } -func NewClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster { +func newClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster { testutil.SkipTestIfShortMode(t, "Cannot start etcd Cluster in --short tests") c := &Cluster{Cfg: cfg} @@ -225,18 +225,6 @@ func NewClusterFromConfig(t testutil.TB, cfg *ClusterConfig) *Cluster { return c } -// NewCluster returns an unlaunched Cluster of the given size which has been -// set to use static bootstrap. -func NewCluster(t testutil.TB, size int) *Cluster { - t.Helper() - return NewClusterFromConfig(t, &ClusterConfig{Size: size}) -} - -// NewClusterByConfig returns an unlaunched Cluster defined by a Cluster configuration -func NewClusterByConfig(t testutil.TB, cfg *ClusterConfig) *Cluster { - return NewClusterFromConfig(t, cfg) -} - func (c *Cluster) Launch(t testutil.TB) { errc := make(chan error) for _, m := range c.Members { @@ -1416,7 +1404,7 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 { cfg.UseGRPC = true clus := &ClusterV3{ - Cluster: NewClusterByConfig(t, cfg), + Cluster: newClusterFromConfig(t, cfg), } clus.Launch(t) diff --git a/tests/integration/client/client_test.go b/tests/integration/client/client_test.go index aa0271e95..132118b9a 100644 --- a/tests/integration/client/client_test.go +++ b/tests/integration/client/client_test.go @@ -89,8 +89,7 @@ func TestV2NoRetryNoLeader(t *testing.T) { // TestV2RetryRefuse tests destructive api calls will retry if a connection is refused. func TestV2RetryRefuse(t *testing.T) { integration2.BeforeTest(t) - cl := integration2.NewCluster(t, 1) - cl.Launch(t) + cl := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1}) defer cl.Terminate(t) // test connection refused; expect no error failover cli := integration2.MustNewHTTPClient(t, []string{integration2.URLScheme + "://refuseconn:123", cl.URL(0)}, nil) diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index f4aafdf0c..2b3fd3179 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -26,6 +26,7 @@ import ( "time" "go.etcd.io/etcd/client/v2" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -45,16 +46,14 @@ func TestClusterOf3(t *testing.T) { testCluster(t, 3) } func testCluster(t *testing.T, size int) { integration.BeforeTest(t) - c := integration.NewCluster(t, size) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size}) defer c.Terminate(t) clusterMustProgress(t, c.Members) } func TestTLSClusterOf3(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo}) defer c.Terminate(t) clusterMustProgress(t, c.Members) } @@ -63,8 +62,7 @@ func TestTLSClusterOf3(t *testing.T) { // authorities that don't issue dual-usage certificates. func TestTLSClusterOf3WithSpecificUsage(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfoWithSpecificUsage}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfoWithSpecificUsage}) defer c.Terminate(t) clusterMustProgress(t, c.Members) } @@ -74,8 +72,7 @@ func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3 func testClusterUsingDiscovery(t *testing.T, size int) { integration.BeforeTest(t) - dc := integration.NewCluster(t, 1) - dc.Launch(t) + dc := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseIP: true}) defer dc.Terminate(t) // init discovery token space dcc := integration.MustNewHTTPClient(t, dc.URLs(), nil) @@ -86,19 +83,14 @@ func testClusterUsingDiscovery(t *testing.T, size int) { } cancel() - c := integration.NewClusterByConfig( - t, - &integration.ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"}, - ) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"}) defer c.Terminate(t) clusterMustProgress(t, c.Members) } func TestTLSClusterOf3UsingDiscovery(t *testing.T) { integration.BeforeTest(t) - dc := integration.NewCluster(t, 1) - dc.Launch(t) + dc := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseIP: true}) defer dc.Terminate(t) // init discovery token space dcc := integration.MustNewHTTPClient(t, dc.URLs(), nil) @@ -109,13 +101,11 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { } cancel() - c := integration.NewClusterByConfig(t, - &integration.ClusterConfig{ - Size: 3, - PeerTLS: &integration.TestTLSInfo, - DiscoveryURL: dc.URL(0) + "/v2/keys"}, + c := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + PeerTLS: &integration.TestTLSInfo, + DiscoveryURL: dc.URL(0) + "/v2/keys"}, ) - c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) } @@ -125,8 +115,7 @@ func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) } func testDoubleClusterSize(t *testing.T, size int) { integration.BeforeTest(t) - c := integration.NewCluster(t, size) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size}) defer c.Terminate(t) for i := 0; i < size; i++ { @@ -137,8 +126,7 @@ func testDoubleClusterSize(t *testing.T, size int) { func TestDoubleTLSClusterSizeOf3(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterByConfig(t, &integration.ClusterConfig{Size: 3, PeerTLS: &integration.TestTLSInfo}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, PeerTLS: &integration.TestTLSInfo}) defer c.Terminate(t) for i := 0; i < 3; i++ { @@ -152,8 +140,7 @@ func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) } func testDecreaseClusterSize(t *testing.T, size int) { integration.BeforeTest(t) - c := integration.NewCluster(t, size) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size}) defer c.Terminate(t) // TODO: remove the last but one member @@ -174,8 +161,8 @@ func testDecreaseClusterSize(t *testing.T, size int) { } func TestForceNewCluster(t *testing.T) { - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) - c.Launch(t) + integration.BeforeTest(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true, UseGRPC: true}) cc := integration.MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout) @@ -216,8 +203,7 @@ func TestForceNewCluster(t *testing.T) { func TestAddMemberAfterClusterFullRotation(t *testing.T) { integration.BeforeTest(t) - c := integration.NewCluster(t, 3) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer c.Terminate(t) // remove all the previous three members and add in three new members. @@ -238,8 +224,7 @@ func TestAddMemberAfterClusterFullRotation(t *testing.T) { // Ensure we can remove a member then add a new one back immediately. func TestIssue2681(t *testing.T) { integration.BeforeTest(t) - c := integration.NewCluster(t, 5) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5}) defer c.Terminate(t) c.MustRemoveMember(t, uint64(c.Members[4].Server.ID())) @@ -258,13 +243,7 @@ func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) } func testIssue2746(t *testing.T, members int) { integration.BeforeTest(t) - c := integration.NewCluster(t, members) - - for _, m := range c.Members { - m.SnapshotCount = 10 - } - - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: members, SnapshotCount: 10}) defer c.Terminate(t) // force a snapshot @@ -284,8 +263,7 @@ func testIssue2746(t *testing.T, members int) { func TestIssue2904(t *testing.T) { integration.BeforeTest(t) // start 1-member Cluster to ensure member 0 is the leader of the Cluster. - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer c.Terminate(t) c.AddMember(t) @@ -321,8 +299,7 @@ func TestIssue2904(t *testing.T) { func TestIssue3699(t *testing.T) { // start a Cluster of 3 nodes a, b, c integration.BeforeTest(t) - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer c.Terminate(t) // make node a unavailable @@ -373,8 +350,7 @@ func TestIssue3699(t *testing.T) { // TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. func TestRejectUnhealthyAdd(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true, StrictReconfigCheck: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true, StrictReconfigCheck: true}) defer c.Terminate(t) // make Cluster unhealthy and wait for downed peer @@ -414,8 +390,7 @@ func TestRejectUnhealthyAdd(t *testing.T) { // if quorum will be lost. func TestRejectUnhealthyRemove(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 5, UseBridge: true, StrictReconfigCheck: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, UseBridge: true, StrictReconfigCheck: true}) defer c.Terminate(t) // make cluster unhealthy and wait for downed peer; (3 up, 2 down) @@ -460,8 +435,7 @@ func TestRestartRemoved(t *testing.T) { integration.BeforeTest(t) // 1. start single-member Cluster - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 1, UseBridge: true, StrictReconfigCheck: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, StrictReconfigCheck: true, UseBridge: true}) defer c.Terminate(t) // 2. add a new member @@ -469,27 +443,27 @@ func TestRestartRemoved(t *testing.T) { c.AddMember(t) c.WaitLeader(t) - oldm := c.Members[0] - oldm.KeepDataDirTerminate = true + firstMember := c.Members[0] + firstMember.KeepDataDirTerminate = true // 3. remove first member, shut down without deleting data - if err := c.RemoveMember(t, uint64(c.Members[0].Server.ID())); err != nil { + if err := c.RemoveMember(t, uint64(firstMember.Server.ID())); err != nil { t.Fatalf("expected to remove member, got error %v", err) } c.WaitLeader(t) // 4. restart first member with 'initial-cluster-state=new' // wrong config, expects exit within ReqTimeout - oldm.ServerConfig.NewCluster = false - if err := oldm.Restart(t); err != nil { + firstMember.ServerConfig.NewCluster = false + if err := firstMember.Restart(t); err != nil { t.Fatalf("unexpected ForceRestart error: %v", err) } defer func() { - oldm.Close() - os.RemoveAll(oldm.ServerConfig.DataDir) + firstMember.Close() + os.RemoveAll(firstMember.ServerConfig.DataDir) }() select { - case <-oldm.Server.StopNotify(): + case <-firstMember.Server.StopNotify(): case <-time.After(time.Minute): t.Fatalf("removed member didn't exit within %v", time.Minute) } @@ -498,35 +472,39 @@ func TestRestartRemoved(t *testing.T) { // clusterMustProgress ensures that cluster can make progress. It creates // a random key first, and check the new key could be got from all client urls // of the cluster. -func clusterMustProgress(t *testing.T, membs []*integration.Member) { - cc := integration.MustNewHTTPClient(t, []string{membs[0].URL()}, nil) - kapi := client.NewKeysAPI(cc) +func clusterMustProgress(t *testing.T, members []*integration.Member) { key := fmt.Sprintf("foo%d", rand.Int()) var ( err error - resp *client.Response + resp *clientv3.PutResponse ) // retry in case of leader loss induced by slow CI for i := 0; i < 3; i++ { ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout) - resp, err = kapi.Create(ctx, "/"+key, "bar") + resp, err = members[0].Client.Put(ctx, key, "bar") cancel() if err == nil { break } - t.Logf("failed to create key on %q (%v)", membs[0].URL(), err) + t.Logf("failed to create key on #0 (%v)", err) } if err != nil { - t.Fatalf("create on %s error: %v", membs[0].URL(), err) + t.Fatalf("create on #0 error: %v", err) } - for i, m := range membs { - u := m.URL() - mcc := integration.MustNewHTTPClient(t, []string{u}, nil) - mkapi := client.NewKeysAPI(mcc) + for i, m := range members { mctx, mcancel := context.WithTimeout(context.Background(), integration.RequestTimeout) - if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { - t.Fatalf("#%d: watch on %s error: %v", i, u, err) + watch := m.Client.Watcher.Watch(mctx, key, clientv3.WithRev(resp.Header.Revision-1)) + for resp := range watch { + if len(resp.Events) != 0 { + break + } + if resp.Err() != nil { + t.Fatalf("#%d: watch error: %q", i, resp.Err()) + } + if resp.Canceled { + t.Fatalf("#%d: watch: cancelled", i) + } } mcancel() } diff --git a/tests/integration/member_test.go b/tests/integration/member_test.go index fe67c0ce3..2fb834ff0 100644 --- a/tests/integration/member_test.go +++ b/tests/integration/member_test.go @@ -29,8 +29,7 @@ import ( func TestPauseMember(t *testing.T) { integration.BeforeTest(t) - c := integration.NewCluster(t, 5) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5}) defer c.Terminate(t) for i := 0; i < 5; i++ { @@ -47,8 +46,7 @@ func TestPauseMember(t *testing.T) { func TestRestartMember(t *testing.T) { integration.BeforeTest(t) - c := integration.NewClusterFromConfig(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) - c.Launch(t) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer c.Terminate(t) for i := 0; i < 3; i++ { @@ -69,14 +67,13 @@ func TestRestartMember(t *testing.T) { func TestLaunchDuplicateMemberShouldFail(t *testing.T) { integration.BeforeTest(t) size := 3 - c := integration.NewCluster(t, size) + c := integration.NewClusterV3(t, &integration.ClusterConfig{Size: size}) m := c.Members[0].Clone(t) var err error m.DataDir, err = os.MkdirTemp(t.TempDir(), "etcd") if err != nil { t.Fatal(err) } - c.Launch(t) defer c.Terminate(t) if err := m.Launch(); err == nil { diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index ebb148259..4f5b6a34c 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -1552,8 +1552,7 @@ func TestV3RangeRequest(t *testing.T) { func newClusterV3NoClients(t *testing.T, cfg *integration.ClusterConfig) *integration.ClusterV3 { cfg.UseGRPC = true - clus := &integration.ClusterV3{Cluster: integration.NewClusterByConfig(t, cfg)} - clus.Launch(t) + clus := integration.NewClusterV3(t, cfg) return clus } @@ -1561,8 +1560,7 @@ func newClusterV3NoClients(t *testing.T, cfg *integration.ClusterConfig) *integr func TestTLSGRPCRejectInsecureClient(t *testing.T) { integration.BeforeTest(t) - cfg := integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo} - clus := newClusterV3NoClients(t, &cfg) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo}) defer clus.Terminate(t) // nil out TLS field so client will use an insecure connection @@ -1596,8 +1594,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) { func TestTLSGRPCRejectSecureClient(t *testing.T) { integration.BeforeTest(t) - cfg := integration.ClusterConfig{Size: 3} - clus := newClusterV3NoClients(t, &cfg) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) clus.Members[0].ClientTLSInfo = &integration.TestTLSInfo @@ -1616,8 +1613,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) { func TestTLSGRPCAcceptSecureAll(t *testing.T) { integration.BeforeTest(t) - cfg := integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo} - clus := newClusterV3NoClients(t, &cfg) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, ClientTLS: &integration.TestTLSInfo}) defer clus.Terminate(t) client, err := integration.NewClientV3(clus.Members[0]) @@ -1834,8 +1830,7 @@ func testTLSReload( func TestGRPCRequireLeader(t *testing.T) { integration.BeforeTest(t) - cfg := integration.ClusterConfig{Size: 3} - clus := newClusterV3NoClients(t, &cfg) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) clus.Members[1].Stop(t) @@ -1861,8 +1856,7 @@ func TestGRPCRequireLeader(t *testing.T) { func TestGRPCStreamRequireLeader(t *testing.T) { integration.BeforeTest(t) - cfg := integration.ClusterConfig{Size: 3, UseBridge: true} - clus := newClusterV3NoClients(t, &cfg) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) client, err := integration.NewClientV3(clus.Members[0])