From 8e56e2f5ce4c43e81877b16239c48f8ff3a0dc14 Mon Sep 17 00:00:00 2001 From: WangXiaoxiao <1141195807@qq.com> Date: Sun, 4 Dec 2022 22:42:44 +0800 Subject: [PATCH] add mix version for snapshot e2e case Signed-off-by: WangXiaoxiao <1141195807@qq.com> --- tests/e2e/etcd_mix_versions_test.go | 126 ++++++++++++++++++++++++---- tests/framework/e2e/cluster.go | 4 +- 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/tests/e2e/etcd_mix_versions_test.go b/tests/e2e/etcd_mix_versions_test.go index ae11db6b1..653aa8f70 100644 --- a/tests/e2e/etcd_mix_versions_test.go +++ b/tests/e2e/etcd_mix_versions_test.go @@ -21,15 +21,15 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" ) -// TestMixVersionsSendSnapshot tests the mix version send snapshots -// TODO(ahrtr): add network partition scenario to trigger snapshots. -func TestMixVersionsSendSnapshot(t *testing.T) { +// TestMixVersionsSnapshotByAddingMember tests the mix version send snapshots by adding member +func TestMixVersionsSnapshotByAddingMember(t *testing.T) { cases := []struct { name string clusterVersion e2e.ClusterVersion @@ -54,12 +54,12 @@ func TestMixVersionsSendSnapshot(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - mixVersionsSnapshotTest(t, tc.clusterVersion, tc.newInstanceVersion) + mixVersionsSnapshotTestByAddingMember(t, tc.clusterVersion, tc.newInstanceVersion) }) } } -func mixVersionsSnapshotTest(t *testing.T, clusterVersion, newInstanceVersion e2e.ClusterVersion) { +func mixVersionsSnapshotTestByAddingMember(t *testing.T, clusterVersion, newInstanceVersion e2e.ClusterVersion) { e2e.BeforeTest(t) if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { @@ -72,13 +72,10 @@ func mixVersionsSnapshotTest(t *testing.T, clusterVersion, newInstanceVersion e2 e2e.WithSnapshotCount(10), e2e.WithVersion(clusterVersion), ) - if err != nil { - t.Fatalf("failed to start etcd cluster: %v", err) - } + require.NoError(t, err, "failed to start etcd cluster: %v", err) defer func() { - if err := epc.Close(); err != nil { - t.Fatalf("failed to close etcd cluster: %v", err) - } + err := epc.Close() + require.NoError(t, err, "failed to close etcd cluster: %v", err) }() // Write more than SnapshotCount entries to trigger at least a snapshot. @@ -86,18 +83,17 @@ func mixVersionsSnapshotTest(t *testing.T, clusterVersion, newInstanceVersion e2 for i := 0; i < 20; i++ { key := fmt.Sprintf("key-%d", i) value := fmt.Sprintf("value-%d", i) - if err := epc.Client().Put(context.TODO(), key, value, config.PutOptions{}); err != nil { - t.Fatalf("failed to put %q, error: %v", key, err) - } + err := epc.Client().Put(context.TODO(), key, value, config.PutOptions{}) + require.NoError(t, err, "failed to put %q, error: %v", key, err) } // start a new etcd instance, which will receive a snapshot from the leader. newCfg := *epc.Cfg newCfg.Version = newInstanceVersion + newCfg.SnapshotCatchUpEntries = 10 t.Log("Starting a new etcd instance") - if err := epc.StartNewProc(context.TODO(), &newCfg, t); err != nil { - t.Fatalf("failed to start the new etcd instance: %v", err) - } + err = epc.StartNewProc(context.TODO(), &newCfg, t) + require.NoError(t, err, "failed to start the new etcd instance: %v", err) defer epc.CloseProc(context.TODO(), nil) // verify all nodes have exact same revision and hash @@ -123,3 +119,99 @@ func mixVersionsSnapshotTest(t *testing.T, clusterVersion, newInstanceVersion e2 return true }, 10*time.Second, 500*time.Millisecond) } + +func TestMixVersionsSnapshotByMockingPartition(t *testing.T) { + cases := []struct { + name string + clusterVersion e2e.ClusterVersion + mockPartitionNodeIndex int + }{ + { + name: "etcd instance with last version receives snapshot from the leader with current version", + clusterVersion: e2e.MinorityLastVersion, + mockPartitionNodeIndex: 2, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + mixVersionsSnapshotTestByMockPartition(t, tc.clusterVersion, tc.mockPartitionNodeIndex) + }) + } +} + +func mixVersionsSnapshotTestByMockPartition(t *testing.T, clusterVersion e2e.ClusterVersion, mockPartitionNodeIndex int) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease) + } + + // Create an etcd cluster with 3 member of MinorityLastVersion + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, + e2e.WithClusterSize(3), + e2e.WithSnapshotCount(10), + e2e.WithVersion(clusterVersion), + e2e.WithSnapshotCatchUpEntries(10), + ) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + err := epc.Close() + require.NoError(t, err, "failed to close etcd cluster: %v", err) + }() + toPartitionedMember := epc.Procs[mockPartitionNodeIndex] + + // Stop and restart the partitioned member + err = toPartitionedMember.Stop() + require.NoError(t, err) + + // Write more than SnapshotCount entries to trigger at least a snapshot. + t.Log("Writing 20 keys to the cluster") + for i := 0; i < 20; i++ { + key := fmt.Sprintf("key-%d", i) + value := fmt.Sprintf("value-%d", i) + err := epc.Client().Put(context.TODO(), key, value, config.PutOptions{}) + require.NoError(t, err, "failed to put %q, error: %v", key, err) + } + + t.Log("Verify logs to check leader has saved snapshot") + leaderEPC := epc.Procs[epc.WaitLeader(t)] + e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") + + // Restart the partitioned member + err = toPartitionedMember.Restart(context.TODO()) + require.NoError(t, err) + + // verify all nodes have exact same revision and hash + t.Log("Verify all nodes have exact same revision and hash") + assert.Eventually(t, func() bool { + hashKvs, err := epc.Client().HashKV(context.TODO(), 0) + if err != nil { + t.Logf("failed to get HashKV: %v", err) + return false + } + if len(hashKvs) != 3 { + t.Logf("expected 3 hashkv responses, but got: %d", len(hashKvs)) + return false + } + + if hashKvs[0].Header.Revision != hashKvs[1].Header.Revision { + t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision) + return false + } + if hashKvs[1].Header.Revision != hashKvs[2].Header.Revision { + t.Logf("Got different revisions, [%d, %d]", hashKvs[1].Header.Revision, hashKvs[2].Header.Revision) + return false + } + + assert.Equal(t, hashKvs[0].Hash, hashKvs[1].Hash) + assert.Equal(t, hashKvs[1].Hash, hashKvs[2].Hash) + + return true + }, 10*time.Second, 500*time.Millisecond) + + // assert process logs to check snapshot be sent + t.Log("Verify logs to check snapshot be sent from leader to follower") + leaderEPC = epc.Procs[epc.WaitLeader(t)] + e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot") +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 442921e60..e8a560ca8 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -582,7 +582,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) } if cfg.SnapshotCatchUpEntries > 0 { - args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) + if cfg.Version == CurrentVersion || (cfg.Version == MinorityLastVersion && i <= cfg.ClusterSize/2) || (cfg.Version == QuorumLastVersion && i > cfg.ClusterSize/2) { + args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) + } } envVars := map[string]string{} for key, value := range cfg.EnvVars {