From bd7405a52eecd864307298191d925c8faa1c958a Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 30 Oct 2022 14:42:11 +0800 Subject: [PATCH] test: added e2e test case for issue 14571: etcd doesn't load auth info when recovering from a snapshot Signed-off-by: Benjamin Wang --- tests/e2e/ctl_v3_auth_test.go | 197 ++++++++++++++++++++++++++++++++++ tests/e2e/ctl_v3_test.go | 8 ++ 2 files changed, 205 insertions(+) diff --git a/tests/e2e/ctl_v3_auth_test.go b/tests/e2e/ctl_v3_auth_test.go index 695483fe2..9d937c381 100644 --- a/tests/e2e/ctl_v3_auth_test.go +++ b/tests/e2e/ctl_v3_auth_test.go @@ -17,11 +17,15 @@ package e2e import ( "context" "fmt" + "net/url" "os" "syscall" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/v3" ) @@ -72,6 +76,10 @@ func TestCtlV3AuthJWTExpire(t *testing.T) { testCtl(t, authTestJWTExpi func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) } func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) } +func TestCtlV3AuthRecoverFromSnapshot(t *testing.T) { + testCtl(t, authTestRecoverSnapshot, withCfg(*newConfigNoTLS()), withQuorum(), withSnapshotCount(5)) +} + func authEnableTest(cx ctlCtx) { if err := authEnable(cx); err != nil { cx.t.Fatal(err) @@ -1289,3 +1297,192 @@ func authTestCacheReload(cx ctlCtx) { cx.t.Fatal(err) } } + +// Verify that etcd works after recovering from a snapshot. +// Refer to https://github.com/etcd-io/etcd/issues/14571. +func authTestRecoverSnapshot(cx ctlCtx) { + roles := []authRole{ + { + role: "role0", + permission: clientv3.PermissionType(clientv3.PermReadWrite), + key: "foo", + }, + } + + users := []authUser{ + { + user: "root", + pass: "rootPass", + role: "root", + }, + { + user: "user0", + pass: "user0Pass", + role: "role0", + }, + } + + cx.t.Log("setup and enable auth") + setupAuth(cx, roles, users) + + // create a client with root user + cx.t.Log("create a client with root user") + cliRoot, err := clientv3.New(clientv3.Config{Endpoints: cx.epc.EndpointsV3(), Username: "root", Password: "rootPass", DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer cliRoot.Close() + + // write more than SnapshotCount keys, so that at least one snapshot is created + cx.t.Log("Write enough key/value to trigger a snapshot") + for i := 0; i <= 6; i++ { + if _, err := cliRoot.Put(context.TODO(), fmt.Sprintf("key_%d", i), fmt.Sprintf("value_%d", i)); err != nil { + cx.t.Fatalf("failed to Put (%v)", err) + } + } + + // add a new member into the cluster + // Refer to https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/tests/e2e/cluster_test.go#L238 + var ( + idx = 3 + name = fmt.Sprintf("test-%d", idx) + port = cx.cfg.basePort + 5*idx + curlHost = fmt.Sprintf("localhost:%d", port) + nodeClientURL = url.URL{Scheme: cx.cfg.clientScheme(), Host: curlHost} + nodePeerURL = url.URL{Scheme: cx.cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + initialCluster = cx.epc.procs[0].Config().initialCluster + "," + fmt.Sprintf("%s=%s", name, nodePeerURL.String()) + ) + cx.t.Logf("Adding a new member: %s", nodePeerURL.String()) + // Must wait at least 5 seconds, otherwise it will always get an + // "etcdserver: unhealthy cluster" response, please refer to link below, + // https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/server/etcdserver/server.go#L1611 + assert.Eventually(cx.t, func() bool { + if _, err := cliRoot.MemberAdd(context.TODO(), []string{nodePeerURL.String()}); err != nil { + cx.t.Logf("Failed to add member, peelURL: %s, error: %v", nodePeerURL.String(), err) + return false + } + return true + }, 8*time.Second, 2*time.Second) + + cx.t.Logf("Starting the new member: %s", nodePeerURL.String()) + newProc, err := runEtcdNode(name, cx.t.TempDir(), nodeClientURL.String(), nodePeerURL.String(), "existing", initialCluster) + require.NoError(cx.t, err) + defer newProc.Stop() + + // create a client with user "user0", and connects to the new member + cx.t.Log("create a client with user 'user0'") + cliUser, err := clientv3.New(clientv3.Config{Endpoints: []string{nodeClientURL.String()}, Username: "user0", Password: "user0Pass", DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer cliUser.Close() + + // write data using the cliUser, expect no error + cx.t.Log("Write a key/value using user 'user0'") + _, err = cliUser.Put(context.TODO(), "foo", "bar") + require.NoError(cx.t, err) + + //verify all nodes have the same revision and hash + var endpoints []string + for _, proc := range cx.epc.procs { + endpoints = append(endpoints, proc.Config().acurl) + } + endpoints = append(endpoints, nodeClientURL.String()) + cx.t.Log("Verify all members have the same revision and hash") + assert.Eventually(cx.t, func() bool { + hashKvs, err := hashKVs(endpoints, cliRoot) + if err != nil { + cx.t.Logf("failed to get HashKV: %v", err) + return false + } + + if len(hashKvs) != 4 { + cx.t.Logf("expected 4 hashkv responses, but got: %d", len(hashKvs)) + return false + } + + if !(hashKvs[0].Header.Revision == hashKvs[1].Header.Revision && + hashKvs[0].Header.Revision == hashKvs[2].Header.Revision && + hashKvs[0].Header.Revision == hashKvs[3].Header.Revision) { + cx.t.Logf("Got different revisions, [%d, %d, %d, %d]", + hashKvs[0].Header.Revision, + hashKvs[1].Header.Revision, + hashKvs[2].Header.Revision, + hashKvs[3].Header.Revision) + return false + } + + assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[1].Hash) + assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[2].Hash) + assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[3].Hash) + + return true + }, 5*time.Second, 100*time.Millisecond) +} + +type authRole struct { + role string + permission clientv3.PermissionType + key string + keyEnd string +} + +type authUser struct { + user string + pass string + role string +} + +func setupAuth(cx ctlCtx, roles []authRole, users []authUser) { + endpoint := cx.epc.procs[0].EndpointsV3()[0] + + // create a client + c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer c.Close() + + // create roles + for _, r := range roles { + // add role + if _, err = c.RoleAdd(context.TODO(), r.role); err != nil { + cx.t.Fatal(err) + } + + // grant permission to role + if _, err = c.RoleGrantPermission(context.TODO(), r.role, r.key, r.keyEnd, r.permission); err != nil { + cx.t.Fatal(err) + } + } + + // create users + for _, u := range users { + // add user + if _, err = c.UserAdd(context.TODO(), u.user, u.pass); err != nil { + cx.t.Fatal(err) + } + + // grant role to user + if _, err = c.UserGrantRole(context.TODO(), u.user, u.role); err != nil { + cx.t.Fatal(err) + } + } + + // enable auth + if _, err = c.AuthEnable(context.TODO()); err != nil { + cx.t.Fatal(err) + } +} + +func hashKVs(endpoints []string, cli *clientv3.Client) ([]*clientv3.HashKVResponse, error) { + var retHashKVs []*clientv3.HashKVResponse + for _, ep := range endpoints { + resp, err := cli.HashKV(context.TODO(), ep, 0) + if err != nil { + return nil, err + } + retHashKVs = append(retHashKVs, resp) + } + return retHashKVs, nil +} diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 8fc4954be..4f65caa4c 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -221,6 +221,14 @@ func withMaxConcurrentStreams(streams uint32) ctlOption { } } +// This function must be called after the `withCfg`, otherwise its value +// may be overwritten by `withCfg`. +func withSnapshotCount(snapshotCount int) ctlOption { + return func(cx *ctlCtx) { + cx.cfg.snapshotCount = snapshotCount + } +} + func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) }