Avoid sending Compact request when LazyFS is enabled

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-06-15 12:14:29 +02:00
parent dd7a2a6237
commit 2e04ee77b6
12 changed files with 132 additions and 63 deletions

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
var ( var (
@ -135,7 +136,7 @@ func (f memberReplace) Name() string {
return "MemberReplace" return "MemberReplace"
} }
func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
// a lower etcd version may not be able to join a cluster with higher cluster version. // a lower etcd version may not be able to join a cluster with higher cluster version.
return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd) return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd)
} }

View File

@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
const ( const (
@ -54,10 +55,10 @@ var (
} }
) )
func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) { func PickRandom(clus *e2e.EtcdProcessCluster, profile traffic.Profile) (Failpoint, error) {
availableFailpoints := make([]Failpoint, 0, len(allFailpoints)) availableFailpoints := make([]Failpoint, 0, len(allFailpoints))
for _, failpoint := range allFailpoints { for _, failpoint := range allFailpoints {
err := Validate(clus, failpoint) err := Validate(clus, failpoint, profile)
if err != nil { if err != nil {
continue continue
} }
@ -69,16 +70,16 @@ func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) {
return availableFailpoints[rand.Int()%len(availableFailpoints)], nil return availableFailpoints[rand.Int()%len(availableFailpoints)], nil
} }
func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint, profile traffic.Profile) error {
for _, proc := range clus.Procs { for _, proc := range clus.Procs {
if !failpoint.Available(*clus.Cfg, proc) { if !failpoint.Available(*clus.Cfg, proc, profile) {
return fmt.Errorf("failpoint %q not available on %s", failpoint.Name(), proc.Config().Name) return fmt.Errorf("failpoint %q not available on %s", failpoint.Name(), proc.Config().Name)
} }
} }
return nil return nil
} }
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) { func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*report.FailpointReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout) ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel() defer cancel()
var err error var err error
@ -99,8 +100,8 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name())) lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime) end := time.Since(baseTime)
return &FailpointReport{ return &report.FailpointReport{
Injection: Injection{ FailpointInjection: report.FailpointInjection{
Start: start, Start: start,
End: end, End: end,
Name: failpoint.Name(), Name: failpoint.Name(),
@ -109,16 +110,6 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
}, nil }, nil
} }
type FailpointReport struct {
Injection
Client []report.ClientReport
}
type Injection struct {
Start, End time.Duration
Name string
}
func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error { func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error {
for i := 0; i < len(clus.Procs); i++ { for i := 0; i < len(clus.Procs); i++ {
clusterClient, err := clientv3.New(clientv3.Config{ clusterClient, err := clientv3.New(clientv3.Config{
@ -154,5 +145,5 @@ type Failpoint interface {
} }
type AvailabilityChecker interface { type AvailabilityChecker interface {
Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool
} }

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
var ( var (
@ -147,11 +148,11 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster)
} }
} }
func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if f.target == Follower && config.ClusterSize == 1 { if f.target == Follower && config.ClusterSize == 1 {
return false return false
} }
if f.trigger != nil && !f.trigger.Available(config, member) { if f.trigger != nil && !f.trigger.Available(config, member, profile) {
return false return false
} }
memberFailpoints := member.Failpoints() memberFailpoints := member.Failpoints()
@ -200,7 +201,7 @@ func (f killAndGofailSleep) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time) return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
} }
func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if config.ClusterSize == 1 { if config.ClusterSize == 1 {
return false return false
} }
@ -238,7 +239,7 @@ func (f gofailSleepAndDeactivate) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time) return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
} }
func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
memberFailpoints := member.Failpoints() memberFailpoints := member.Failpoints()
if memberFailpoints == nil { if memberFailpoints == nil {
return false return false

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
var ( var (
@ -67,6 +68,6 @@ func (f killFailpoint) Name() string {
return "Kill" return "Kill"
} }
func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool {
return true return true
} }

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
var ( var (
@ -56,7 +57,7 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e
return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot)
} }
func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess, profile traffic.Profile) bool {
// Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout. // Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout.
// Number of required entries for snapshot depends on etcd configuration. // Number of required entries for snapshot depends on etcd configuration.
if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) { if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) {
@ -179,7 +180,7 @@ func (f delayPeerNetworkFailpoint) Name() string {
return "delayPeerNetwork" return "delayPeerNetwork"
} }
func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool {
return config.ClusterSize > 1 && clus.PeerProxy() != nil return config.ClusterSize > 1 && clus.PeerProxy() != nil
} }
@ -213,6 +214,6 @@ func (f dropPeerNetworkFailpoint) Name() string {
return "dropPeerNetwork" return "dropPeerNetwork"
} }
func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool {
return config.ClusterSize > 1 && clus.PeerProxy() != nil return config.ClusterSize > 1 && clus.PeerProxy() != nil
} }

View File

@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
type trigger interface { type trigger interface {
@ -47,7 +48,7 @@ func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.Etc
return nil, nil return nil, nil
} }
func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool {
return true return true
} }
@ -83,7 +84,10 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
return []report.ClientReport{cc.Report()}, nil return []report.ClientReport{cc.Report()}, nil
} }
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool { func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess, profile traffic.Profile) bool {
if profile.ForbidCompaction {
return false
}
// Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy. // Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy.
// TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause. // TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause.
if config.PeerProxy { if config.PeerProxy {

View File

@ -75,7 +75,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
defer forcestopCluster(r.Cluster) defer forcestopCluster(r.Cluster)
if s.failpoint == nil { if s.failpoint == nil {
s.failpoint, err = failpoint.PickRandom(r.Cluster) s.failpoint, err = failpoint.PickRandom(r.Cluster, s.profile)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -107,7 +107,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
defer cancel() defer cancel()
g := errgroup.Group{} g := errgroup.Group{}
var operationReport, watchReport, failpointClientReport []report.ClientReport var operationReport, watchReport, failpointClientReport []report.ClientReport
failpointInjected := make(chan failpoint.Injection, 1) failpointInjected := make(chan report.FailpointInjection, 1)
// using baseTime time-measuring operation to get monotonic clock reading // using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17 // see https://github.com/golang/go/blob/master/src/time/time.go#L17
@ -125,7 +125,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
// Give some time for traffic to reach qps target after injecting failpoint. // Give some time for traffic to reach qps target after injecting failpoint.
time.Sleep(time.Second) time.Sleep(time.Second)
if fr != nil { if fr != nil {
failpointInjected <- fr.Injection failpointInjected <- fr.FailpointInjection
failpointClientReport = fr.Client failpointClientReport = fr.Client
} }
return nil return nil

View File

@ -0,0 +1,29 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package report
import (
"time"
)
type FailpointReport struct {
FailpointInjection
Client []ClientReport
}
type FailpointInjection struct {
Start, End time.Duration
Name string
}

View File

@ -28,24 +28,29 @@ import (
) )
type TrafficProfile struct { type TrafficProfile struct {
Name string
Traffic traffic.Traffic Traffic traffic.Traffic
Profile traffic.Profile Profile traffic.Profile
} }
var trafficProfiles = []TrafficProfile{ var trafficProfiles = []TrafficProfile{
{ {
Name: "EtcdHighTraffic",
Traffic: traffic.EtcdPut, Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile, Profile: traffic.HighTrafficProfile,
}, },
{ {
Name: "EtcdTrafficDeleteLeases",
Traffic: traffic.EtcdPutDeleteLease, Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic, Profile: traffic.LowTraffic,
}, },
{ {
Name: "KubernetesHighTraffic",
Traffic: traffic.Kubernetes, Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile, Profile: traffic.HighTrafficProfile,
}, },
{ {
Name: "KubernetesLowTraffic",
Traffic: traffic.Kubernetes, Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic, Profile: traffic.LowTraffic,
}, },
@ -61,7 +66,6 @@ type testScenario struct {
} }
func exploratoryScenarios(_ *testing.T) []testScenario { func exploratoryScenarios(_ *testing.T) []testScenario {
enableLazyFS := e2e.BinPath.LazyFSAvailable()
randomizableOptions := []e2e.EPClusterOption{ randomizableOptions := []e2e.EPClusterOption{
options.WithClusterOptionGroups( options.WithClusterOptionGroups(
options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)}, options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)},
@ -101,23 +105,9 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
} }
scenarios := []testScenario{} scenarios := []testScenario{}
for _, tp := range trafficProfiles { for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1") name := filepath.Join(tp.Name, "ClusterOfSize1")
clusterOfSize1Options := baseOptions clusterOfSize1Options := baseOptions
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
// Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS.
if enableLazyFS && tp.Profile.MinimalQPS <= 100 {
// Set CompactionBatchLimit to default when LazyFS is enabled, because frequent compaction uses a lot of CPU too.
lazyFSOptions := append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true), e2e.WithCompactionBatchLimit(1000))
scenarios = append(scenarios, testScenario{
name: filepath.Join(name, "LazyFS"),
traffic: tp.Traffic,
profile: tp.Profile,
cluster: *e2e.NewConfig(lazyFSOptions...),
})
// Smaller CompactionBatchLimit without LazyFS to test Compact.
clusterOfSize1Options = append(clusterOfSize1Options, options.WithCompactionBatchLimit(10, 100))
name = filepath.Join(name, "Compact")
}
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: name, name: name,
traffic: tp.Traffic, traffic: tp.Traffic,
@ -127,7 +117,7 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
} }
for _, tp := range trafficProfiles { for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3") name := filepath.Join(tp.Name, "ClusterOfSize3")
clusterOfSize3Options := baseOptions clusterOfSize3Options := baseOptions
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true))
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true))
@ -141,6 +131,25 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
cluster: *e2e.NewConfig(clusterOfSize3Options...), cluster: *e2e.NewConfig(clusterOfSize3Options...),
}) })
} }
if e2e.BinPath.LazyFSAvailable() {
newScenarios := scenarios
for _, s := range scenarios {
// LazyFS increases the load on CPU, so we run it with more lightweight case.
if s.profile.MinimalQPS <= 100 && s.cluster.ClusterSize == 1 {
lazyfsCluster := s.cluster
lazyfsCluster.LazyFSEnabled = true
newScenarios = append(newScenarios, testScenario{
name: filepath.Join(s.name, "LazyFS"),
failpoint: s.failpoint,
cluster: lazyfsCluster,
traffic: s.traffic,
profile: s.profile.WithoutCompaction(),
watch: s.watch,
})
}
}
scenarios = newScenarios
}
return scenarios return scenarios
} }

View File

@ -29,7 +29,7 @@ import (
) )
var ( var (
EtcdPutDeleteLease = etcdTraffic{ EtcdPutDeleteLease Traffic = etcdTraffic{
keyCount: 10, keyCount: 10,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
largePutSize: 32769, largePutSize: 32769,
@ -48,7 +48,7 @@ var (
{choice: Compact, weight: 5}, {choice: Compact, weight: 5},
}, },
} }
EtcdPut = etcdTraffic{ EtcdPut Traffic = etcdTraffic{
keyCount: 10, keyCount: 10,
largePutSize: 32769, largePutSize: 32769,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
@ -72,6 +72,21 @@ type etcdTraffic struct {
largePutSize int largePutSize int
} }
func (t etcdTraffic) WithoutCompact() Traffic {
requests := make([]choiceWeight[etcdRequestType], 0, len(t.requests))
for _, request := range t.requests {
if request.choice != Compact {
requests = append(requests, request)
}
}
return etcdTraffic{
keyCount: t.keyCount,
requests: requests,
leaseTTL: t.leaseTTL,
largePutSize: t.largePutSize,
}
}
func (t etcdTraffic) ExpectUniqueRevision() bool { func (t etcdTraffic) ExpectUniqueRevision() bool {
return false return false
} }

View File

@ -32,7 +32,7 @@ import (
) )
var ( var (
Kubernetes = kubernetesTraffic{ Kubernetes Traffic = kubernetesTraffic{
averageKeyCount: 10, averageKeyCount: 10,
resource: "pods", resource: "pods",
namespace: "default", namespace: "default",
@ -52,12 +52,23 @@ type kubernetesTraffic struct {
writeChoices []choiceWeight[KubernetesRequestType] writeChoices []choiceWeight[KubernetesRequestType]
} }
func (t kubernetesTraffic) ExpectUniqueRevision() bool { func (t kubernetesTraffic) WithoutCompact() Traffic {
return true wcs := make([]choiceWeight[KubernetesRequestType], 0, len(t.writeChoices))
for _, wc := range t.writeChoices {
if wc.choice != KubernetesCompact {
wcs = append(wcs, wc)
}
}
return kubernetesTraffic{
averageKeyCount: t.averageKeyCount,
resource: t.resource,
namespace: t.namespace,
writeChoices: wcs,
}
} }
func (t kubernetesTraffic) Name() string { func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return "Kubernetes" return true
} }
func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {

View File

@ -25,7 +25,6 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
"go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
@ -38,14 +37,12 @@ var (
MultiOpTxnOpCount = 4 MultiOpTxnOpCount = 4
LowTraffic = Profile{ LowTraffic = Profile{
Name: "LowTraffic",
MinimalQPS: 100, MinimalQPS: 100,
MaximalQPS: 200, MaximalQPS: 200,
ClientCount: 8, ClientCount: 8,
MaxNonUniqueRequestConcurrency: 3, MaxNonUniqueRequestConcurrency: 3,
} }
HighTrafficProfile = Profile{ HighTrafficProfile = Profile{
Name: "HighTraffic",
MinimalQPS: 200, MinimalQPS: 200,
MaximalQPS: 1000, MaximalQPS: 1000,
ClientCount: 12, ClientCount: 12,
@ -53,7 +50,7 @@ var (
} }
) )
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.Injection, baseTime time.Time, ids identity.Provider) []report.ClientReport { func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan report.FailpointInjection, baseTime time.Time, ids identity.Provider) []report.ClientReport {
mux := sync.Mutex{} mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC() endpoints := clus.EndpointsGRPC()
@ -61,6 +58,10 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
reports := []report.ClientReport{} reports := []report.ClientReport{}
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)
if profile.ForbidCompaction {
traffic = traffic.WithoutCompact()
}
cc, err := client.NewRecordingClient(endpoints, ids, baseTime) cc, err := client.NewRecordingClient(endpoints, ids, baseTime)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -92,7 +93,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
mux.Unlock() mux.Unlock()
}(c) }(c)
} }
var fr *failpoint.Injection var fr *report.FailpointInjection
select { select {
case frp, ok := <-failpointInjected: case frp, ok := <-failpointInjected:
if !ok { if !ok {
@ -165,15 +166,20 @@ func (ts *trafficStats) QPS() float64 {
} }
type Profile struct { type Profile struct {
Name string
MinimalQPS float64 MinimalQPS float64
MaximalQPS float64 MaximalQPS float64
MaxNonUniqueRequestConcurrency int MaxNonUniqueRequestConcurrency int
ClientCount int ClientCount int
ForbidCompaction bool
}
func (p Profile) WithoutCompaction() Profile {
p.ForbidCompaction = true
return p
} }
type Traffic interface { type Traffic interface {
Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
ExpectUniqueRevision() bool ExpectUniqueRevision() bool
Name() string WithoutCompact() Traffic
} }