// Copyright 2023 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build !cluster_proxy package e2e import ( "context" "testing" "time" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" _ "google.golang.org/grpc/health" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" ) const ( // in sync with how kubernetes uses etcd // https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71 keepaliveTime = 30 * time.Second keepaliveTimeout = 10 * time.Second dialTimeout = 20 * time.Second clientRuntime = 10 * time.Second requestTimeout = 100 * time.Millisecond ) func TestFailoverOnDefrag(t *testing.T) { tcs := []struct { name string clusterOptions []e2e.EPClusterOption gRPCDialOptions []grpc.DialOption // common assertion expectedMinQPS float64 // happy case assertion expectedMaxFailureRate float64 // negative case assertion expectedMinFailureRate float64 }{ { name: "defrag failover happy case", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithExperimentalStopGRPCServiceOnDefrag(true), e2e.WithGoFailEnabled(true), }, gRPCDialOptions: []grpc.DialOption{ grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, expectedMinQPS: 20, expectedMaxFailureRate: 0.01, }, { name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithExperimentalStopGRPCServiceOnDefrag(false), e2e.WithGoFailEnabled(true), }, gRPCDialOptions: []grpc.DialOption{ grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, { name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithExperimentalStopGRPCServiceOnDefrag(true), e2e.WithGoFailEnabled(true), }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, { name: "defrag failover happy case with feature gate", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), e2e.WithGoFailEnabled(true), }, gRPCDialOptions: []grpc.DialOption{ grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, expectedMinQPS: 20, expectedMaxFailureRate: 0.01, }, { name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to false", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", false), e2e.WithGoFailEnabled(true), }, gRPCDialOptions: []grpc.DialOption{ grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, { name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to true and client health check disabled", clusterOptions: []e2e.EPClusterOption{ e2e.WithClusterSize(3), e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), e2e.WithGoFailEnabled(true), }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { e2e.BeforeTest(t) clus, cerr := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...) require.NoError(t, cerr) t.Cleanup(func() { clus.Stop() }) endpoints := clus.EndpointsGRPC() requestVolume, successfulRequestCount := 0, 0 start := time.Now() g := new(errgroup.Group) g.Go(func() (lastErr error) { clusterClient, cerr := clientv3.New(clientv3.Config{ DialTimeout: dialTimeout, DialKeepAliveTime: keepaliveTime, DialKeepAliveTimeout: keepaliveTimeout, Endpoints: endpoints, DialOptions: tc.gRPCDialOptions, }) if cerr != nil { return cerr } defer clusterClient.Close() timeout := time.After(clientRuntime) for { select { case <-timeout: return lastErr default: } getContext, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err := clusterClient.Get(getContext, "health") cancel() requestVolume++ if err != nil { lastErr = err continue } successfulRequestCount++ } }) triggerDefrag(t, clus.Procs[0]) err := g.Wait() if err != nil { t.Logf("etcd client failed to fail over, error (%v)", err) } qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second) failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume) t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps) require.GreaterOrEqual(t, qps, tc.expectedMinQPS) if tc.expectedMaxFailureRate != 0.0 { require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate) } if tc.expectedMinFailureRate != 0.0 { require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate) } }) } } func triggerDefrag(t *testing.T, member e2e.EtcdProcess) { require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`)) require.NoError(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute})) }