diff --git a/e2e/ctl_v3_snapshot_test.go b/e2e/ctl_v3_snapshot_test.go index 6b2bb379b..9e28833e8 100644 --- a/e2e/ctl_v3_snapshot_test.go +++ b/e2e/ctl_v3_snapshot_test.go @@ -18,9 +18,15 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "os" + "path/filepath" "strings" "testing" + "time" + + "github.com/coreos/etcd/pkg/expect" + "github.com/coreos/etcd/pkg/testutil" ) func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) } @@ -119,3 +125,120 @@ func getSnapshotStatus(cx ctlCtx, fpath string) (snapshotStatus, error) { } return resp, nil } + +// TestIssue6361 ensures new member that starts with snapshot correctly +// syncs up with other members and serve correct data. +func TestIssue6361(t *testing.T) { + defer testutil.AfterTest(t) + mustEtcdctl(t) + os.Setenv("ETCDCTL_API", "3") + defer os.Unsetenv("ETCDCTL_API") + + epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{ + clusterSize: 1, + initialToken: "new", + keepDataDir: true, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }() + + dialTimeout := 7 * time.Second + prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()} + + // write some keys + kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}} + for i := range kvs { + if err = spawnWithExpect(append(prefixArgs, "put", kvs[i].key, kvs[i].val), "OK"); err != nil { + t.Fatal(err) + } + } + + fpath := filepath.Join(os.TempDir(), "test.snapshot") + defer os.RemoveAll(fpath) + + // etcdctl save snapshot + if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil { + t.Fatal(err) + } + + if err = epc.backends()[0].Stop(); err != nil { + t.Fatal(err) + } + + newDataDir := filepath.Join(os.TempDir(), "test.data") + defer os.RemoveAll(newDataDir) + + // etcdctl restore the snapshot + err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member") + if err != nil { + t.Fatal(err) + } + + // start the etcd member using the restored snapshot + epc.procs[0].cfg.dataDirPath = newDataDir + for i := range epc.procs[0].cfg.args { + if epc.procs[0].cfg.args[i] == "--data-dir" { + epc.procs[0].cfg.args[i+1] = newDataDir + } + } + if err = epc.backends()[0].Restart(); err != nil { + t.Fatal(err) + } + + // ensure the restored member has the correct data + for i := range kvs { + if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil { + t.Fatal(err) + } + } + + // add a new member into the cluster + clientURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+30) + peerURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+31) + err = spawnWithExpect(append(prefixArgs, "member", "add", "newmember", fmt.Sprintf("--peer-urls=%s", peerURL)), " added to cluster ") + if err != nil { + t.Fatal(err) + } + + var newDataDir2 string + newDataDir2, err = ioutil.TempDir("", "newdata2") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(newDataDir2) + + name2 := "infra2" + initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL) + + // start the new member + var nepc *expect.ExpectProcess + nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2, + "--listen-client-urls", clientURL, "--advertise-client-urls", clientURL, + "--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL, + "--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}) + if err != nil { + t.Fatal(err) + } + if _, err = nepc.Expect("enabled capabilities for version"); err != nil { + t.Fatal(err) + } + + prefixArgs = []string{ctlBinPath, "--endpoints", clientURL, "--dial-timeout", dialTimeout.String()} + + // ensure added member has data from incoming snapshot + for i := range kvs { + if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil { + t.Fatal(err) + } + } + + if err = nepc.Stop(); err != nil { + t.Fatal(err) + } +} diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index 6de5105a5..0cc442a38 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -132,6 +132,8 @@ type etcdProcessConfig struct { dataDirPath string keepDataDir bool + name string + purl url.URL acurl string @@ -140,6 +142,9 @@ type etcdProcessConfig struct { acurltls string acurlHost string + initialToken string + initialCluster string + isProxy bool } @@ -292,14 +297,16 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { args = append(args, cfg.tlsArgs()...) etcdCfgs[i] = &etcdProcessConfig{ - execPath: cfg.execPath, - args: args, - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - purl: purl, - acurl: curl, - acurltls: curltls, - acurlHost: curlHost, + execPath: cfg.execPath, + args: args, + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + acurltls: curltls, + acurlHost: curlHost, + initialToken: cfg.initialToken, } } for i := 0; i < cfg.proxySize; i++ { @@ -323,6 +330,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { args: args, dataDirPath: dataDirPath, keepDataDir: cfg.keepDataDir, + name: name, acurl: curl.String(), acurlHost: curlHost, isProxy: true, @@ -331,6 +339,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} for i := range etcdCfgs { + etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) }