// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package e2e import ( "context" "fmt" "os" "strings" "testing" "time" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/pkg/v3/flags" "go.etcd.io/etcd/tests/v3/framework/e2e" ) func TestCtlV3Version(t *testing.T) { testCtl(t, versionTest) } func TestClusterVersion(t *testing.T) { e2e.BeforeTest(t) tests := []struct { name string rollingStart bool }{ { name: "When start servers at the same time", rollingStart: false, }, { name: "When start servers one by one", rollingStart: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e2e.BeforeTest(t) cfg := e2e.NewConfig( e2e.WithSnapshotCount(3), e2e.WithBasePeerScheme("unix"), // to avoid port conflict) e2e.WithRollingStart(tt.rollingStart), ) epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg)) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } defer func() { if errC := epc.Close(); errC != nil { t.Fatalf("error closing etcd processes (%v)", errC) } }() ctx := ctlCtx{ t: t, cfg: *cfg, epc: epc, } cv := version.Cluster(version.Version) clusterVersionTest(ctx, `"etcdcluster":"`+cv) }) } } func versionTest(cx ctlCtx) { if err := ctlV3Version(cx); err != nil { cx.t.Fatalf("versionTest ctlV3Version error (%v)", err) } } func clusterVersionTest(cx ctlCtx, expected string) { var err error for i := 0; i < 35; i++ { if err = e2e.CURLGet(cx.epc, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected}}); err != nil { cx.t.Logf("#%d: v3 is not ready yet (%v)", i, err) time.Sleep(200 * time.Millisecond) continue } break } if err != nil { cx.t.Fatalf("failed cluster version test expected %v got (%v)", expected, err) } } func ctlV3Version(cx ctlCtx) error { cmdArgs := append(cx.PrefixArgs(), "version") return e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, expect.ExpectedResponse{Value: version.Version}) } // TestCtlV3DialWithHTTPScheme ensures that client handles Endpoints with HTTPS scheme. func TestCtlV3DialWithHTTPScheme(t *testing.T) { testCtl(t, dialWithSchemeTest, withCfg(*e2e.NewConfigClientTLS())) } func dialWithSchemeTest(cx ctlCtx) { cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsGRPC()), "put", "foo", "bar") if err := e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, expect.ExpectedResponse{Value: "OK"}); err != nil { cx.t.Fatal(err) } } type ctlCtx struct { t *testing.T cfg e2e.EtcdProcessClusterConfig corruptFunc func(string) error disableStrictReconfigCheck bool epc *e2e.EtcdProcessCluster envMap map[string]string dialTimeout time.Duration testTimeout time.Duration quorum bool // if true, set up 3-node cluster and linearizable read interactive bool user string pass string initialCorruptCheck bool // dir that was used during the test dataDir string } type ctlOption func(*ctlCtx) func (cx *ctlCtx) applyOpts(opts []ctlOption) { for _, opt := range opts { opt(cx) } cx.initialCorruptCheck = true } func withCfg(cfg e2e.EtcdProcessClusterConfig) ctlOption { return func(cx *ctlCtx) { cx.cfg = cfg } } func withDefaultDialTimeout() ctlOption { return withDialTimeout(0) } func withDialTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.dialTimeout = timeout } } func withTestTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.testTimeout = timeout } } func withQuorum() ctlOption { return func(cx *ctlCtx) { cx.quorum = true } } func withInteractive() ctlOption { return func(cx *ctlCtx) { cx.interactive = true } } func withInitialCorruptCheck() ctlOption { return func(cx *ctlCtx) { cx.initialCorruptCheck = true } } func withCorruptFunc(f func(string) error) ctlOption { return func(cx *ctlCtx) { cx.corruptFunc = f } } func withFlagByEnv() ctlOption { return func(cx *ctlCtx) { cx.envMap = make(map[string]string) } } // This function must be called after the `withCfg`, otherwise its value // may be overwritten by `withCfg`. func withMaxConcurrentStreams(streams uint32) ctlOption { return func(cx *ctlCtx) { cx.cfg.ServerConfig.MaxConcurrentStreams = streams } } func withLogLevel(logLevel string) ctlOption { return func(cx *ctlCtx) { cx.cfg.ServerConfig.LogLevel = logLevel } } func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) } func getDefaultCtlCtx(t *testing.T) ctlCtx { return ctlCtx{ t: t, cfg: *e2e.NewConfigAutoTLS(), dialTimeout: 7 * time.Second, } } func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) { e2e.BeforeTest(t) ret := getDefaultCtlCtx(t) ret.applyOpts(opts) if !ret.quorum { ret.cfg = *e2e.ConfigStandalone(ret.cfg) } ret.cfg.ServerConfig.StrictReconfigCheck = !ret.disableStrictReconfigCheck if ret.initialCorruptCheck { ret.cfg.ServerConfig.ExperimentalInitialCorruptCheck = ret.initialCorruptCheck } if testOfflineFunc != nil { ret.cfg.KeepDataDir = true } epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(&ret.cfg)) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } ret.epc = epc ret.dataDir = epc.Procs[0].Config().DataDirPath runCtlTest(t, testFunc, testOfflineFunc, ret) } func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), cx ctlCtx) { defer func() { if cx.envMap != nil { for k := range cx.envMap { os.Unsetenv(k) } cx.envMap = make(map[string]string) } if cx.epc != nil { cx.epc.Stop() cx.epc.Close() } }() donec := make(chan struct{}) go func() { defer close(donec) testFunc(cx) t.Log("---testFunc logic DONE") }() timeout := cx.getTestTimeout() select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) case <-donec: } t.Log("closing test cluster...") assert.NoError(t, cx.epc.Stop()) assert.NoError(t, cx.epc.Close()) cx.epc = nil t.Log("closed test cluster...") if testOfflineFunc != nil { testOfflineFunc(cx) } } func (cx *ctlCtx) getTestTimeout() time.Duration { timeout := cx.testTimeout if timeout == 0 { timeout = 2*cx.dialTimeout + time.Second if cx.dialTimeout == 0 { timeout = 30 * time.Second } } return timeout } func (cx *ctlCtx) prefixArgs(eps []string) []string { fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") fmap["dial-timeout"] = cx.dialTimeout.String() if cx.epc.Cfg.Client.ConnectionType == e2e.ClientTLS { if cx.epc.Cfg.Client.AutoTLS { fmap["insecure-transport"] = "false" fmap["insecure-skip-tls-verify"] = "true" } else if cx.epc.Cfg.Client.RevokeCerts { fmap["cacert"] = e2e.CaPath fmap["cert"] = e2e.RevokedCertPath fmap["key"] = e2e.RevokedPrivateKeyPath } else { fmap["cacert"] = e2e.CaPath fmap["cert"] = e2e.CertPath fmap["key"] = e2e.PrivateKeyPath } } if cx.user != "" { fmap["user"] = cx.user + ":" + cx.pass } useEnv := cx.envMap != nil cmdArgs := []string{e2e.BinPath.Etcdctl} for k, v := range fmap { if useEnv { ek := flags.FlagToEnv("ETCDCTL", k) cx.envMap[ek] = v } else { cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) } } return cmdArgs } // PrefixArgs prefixes etcdctl command. // Make sure to unset environment variables after tests. func (cx *ctlCtx) PrefixArgs() []string { return cx.prefixArgs(cx.epc.EndpointsGRPC()) } // PrefixArgsUtl returns prefix of the command that is etcdutl // Please not thet 'utl' compatible commands does not consume --endpoints flag. func (cx *ctlCtx) PrefixArgsUtl() []string { return []string{e2e.BinPath.Etcdutl} } func isGRPCTimedout(err error) bool { return strings.Contains(err.Error(), "grpc: timed out trying to connect") }