mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
205 lines
6.4 KiB
Go
205 lines
6.4 KiB
Go
// Copyright 2023 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//go:build !cluster_proxy
|
|
|
|
package e2e
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
_ "google.golang.org/grpc/health"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/tests/v3/framework/config"
|
|
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
|
)
|
|
|
|
const (
|
|
// in sync with how kubernetes uses etcd
|
|
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
|
|
keepaliveTime = 30 * time.Second
|
|
keepaliveTimeout = 10 * time.Second
|
|
dialTimeout = 20 * time.Second
|
|
|
|
clientRuntime = 10 * time.Second
|
|
requestTimeout = 100 * time.Millisecond
|
|
)
|
|
|
|
func TestFailoverOnDefrag(t *testing.T) {
|
|
tcs := []struct {
|
|
name string
|
|
clusterOptions []e2e.EPClusterOption
|
|
gRPCDialOptions []grpc.DialOption
|
|
|
|
// common assertion
|
|
expectedMinQPS float64
|
|
// happy case assertion
|
|
expectedMaxFailureRate float64
|
|
// negative case assertion
|
|
expectedMinFailureRate float64
|
|
}{
|
|
{
|
|
name: "defrag failover happy case",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
gRPCDialOptions: []grpc.DialOption{
|
|
grpc.WithDisableServiceConfig(),
|
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMaxFailureRate: 0.01,
|
|
},
|
|
{
|
|
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithExperimentalStopGRPCServiceOnDefrag(false),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
gRPCDialOptions: []grpc.DialOption{
|
|
grpc.WithDisableServiceConfig(),
|
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMinFailureRate: 0.25,
|
|
},
|
|
{
|
|
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMinFailureRate: 0.25,
|
|
},
|
|
{
|
|
name: "defrag failover happy case with feature gate",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
gRPCDialOptions: []grpc.DialOption{
|
|
grpc.WithDisableServiceConfig(),
|
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMaxFailureRate: 0.01,
|
|
},
|
|
{
|
|
name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to false",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", false),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
gRPCDialOptions: []grpc.DialOption{
|
|
grpc.WithDisableServiceConfig(),
|
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMinFailureRate: 0.25,
|
|
},
|
|
{
|
|
name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to true and client health check disabled",
|
|
clusterOptions: []e2e.EPClusterOption{
|
|
e2e.WithClusterSize(3),
|
|
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true),
|
|
e2e.WithGoFailEnabled(true),
|
|
},
|
|
expectedMinQPS: 20,
|
|
expectedMinFailureRate: 0.25,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tcs {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
e2e.BeforeTest(t)
|
|
clus, cerr := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...)
|
|
require.NoError(t, cerr)
|
|
t.Cleanup(func() { clus.Stop() })
|
|
|
|
endpoints := clus.EndpointsGRPC()
|
|
|
|
requestVolume, successfulRequestCount := 0, 0
|
|
start := time.Now()
|
|
g := new(errgroup.Group)
|
|
g.Go(func() (lastErr error) {
|
|
clusterClient, cerr := clientv3.New(clientv3.Config{
|
|
DialTimeout: dialTimeout,
|
|
DialKeepAliveTime: keepaliveTime,
|
|
DialKeepAliveTimeout: keepaliveTimeout,
|
|
Endpoints: endpoints,
|
|
DialOptions: tc.gRPCDialOptions,
|
|
})
|
|
if cerr != nil {
|
|
return cerr
|
|
}
|
|
defer clusterClient.Close()
|
|
|
|
timeout := time.After(clientRuntime)
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return lastErr
|
|
default:
|
|
}
|
|
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
_, err := clusterClient.Get(getContext, "health")
|
|
cancel()
|
|
requestVolume++
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
successfulRequestCount++
|
|
}
|
|
})
|
|
triggerDefrag(t, clus.Procs[0])
|
|
|
|
err := g.Wait()
|
|
if err != nil {
|
|
t.Logf("etcd client failed to fail over, error (%v)", err)
|
|
}
|
|
|
|
qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second)
|
|
failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume)
|
|
t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps)
|
|
|
|
require.GreaterOrEqual(t, qps, tc.expectedMinQPS)
|
|
if tc.expectedMaxFailureRate != 0.0 {
|
|
require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate)
|
|
}
|
|
if tc.expectedMinFailureRate != 0.0 {
|
|
require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
|
|
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
|
|
require.NoError(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute}))
|
|
}
|