From ad20230e07241f7c8af1750eefd4d3b09a58cc54 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 8 May 2023 19:37:15 +0200 Subject: [PATCH] test/robustness: Create dedicated traffic package Signed-off-by: Marek Siarkowicz --- tests/robustness/failpoints.go | 3 +- tests/robustness/linearizability_test.go | 109 +------ tests/robustness/traffic.go | 377 ----------------------- tests/robustness/{ => traffic}/client.go | 34 +- tests/robustness/traffic/etcd.go | 235 ++++++++++++++ tests/robustness/traffic/kubernetes.go | 138 +++++++++ tests/robustness/traffic/random.go | 49 +++ tests/robustness/traffic/traffic.go | 103 +++++++ 8 files changed, 558 insertions(+), 490 deletions(-) delete mode 100644 tests/robustness/traffic.go rename tests/robustness/{ => traffic}/client.go (83%) create mode 100644 tests/robustness/traffic/etcd.go create mode 100644 tests/robustness/traffic/kubernetes.go create mode 100644 tests/robustness/traffic/random.go create mode 100644 tests/robustness/traffic/traffic.go diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index 907d57692..a70a9fbb4 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -31,7 +31,8 @@ import ( ) const ( - triggerTimeout = time.Minute + triggerTimeout = time.Minute + waitBetweenFailpointTriggers = time.Second ) var ( diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 5ecc9e2f2..a00846055 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -27,88 +27,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/model" -) - -const ( - // waitBetweenFailpointTriggers - waitBetweenFailpointTriggers = time.Second -) - -var ( - LowTraffic = trafficConfig{ - name: "LowTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, - requestProgress: false, - traffic: etcdTraffic{ - keyCount: 10, - leaseTTL: DefaultLeaseTTL, - largePutSize: 32769, - writeChoices: []choiceWeight[etcdRequestType]{ - {choice: Put, weight: 45}, - {choice: LargePut, weight: 5}, - {choice: Delete, weight: 10}, - {choice: MultiOpTxn, weight: 10}, - {choice: PutWithLease, weight: 10}, - {choice: LeaseRevoke, weight: 10}, - {choice: CompareAndSet, weight: 10}, - }, - }, - } - HighTraffic = trafficConfig{ - name: "HighTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, - requestProgress: false, - traffic: etcdTraffic{ - keyCount: 10, - largePutSize: 32769, - leaseTTL: DefaultLeaseTTL, - writeChoices: []choiceWeight[etcdRequestType]{ - {choice: Put, weight: 85}, - {choice: MultiOpTxn, weight: 10}, - {choice: LargePut, weight: 5}, - }, - }, - } - KubernetesTraffic = trafficConfig{ - name: "Kubernetes", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, - traffic: kubernetesTraffic{ - averageKeyCount: 5, - resource: "pods", - namespace: "default", - writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 75}, - {choice: KubernetesDelete, weight: 15}, - {choice: KubernetesCreate, weight: 10}, - }, - }, - } - ReqProgTraffic = trafficConfig{ - name: "RequestProgressTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, - requestProgress: true, - traffic: etcdTraffic{ - keyCount: 10, - largePutSize: 8196, - leaseTTL: DefaultLeaseTTL, - writeChoices: []choiceWeight[etcdRequestType]{ - {choice: Put, weight: 95}, - {choice: LargePut, weight: 5}, - }, - }, - } - defaultTraffic = LowTraffic - trafficList = []trafficConfig{ - LowTraffic, HighTraffic, KubernetesTraffic, - } + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) func TestRobustness(t *testing.T) { @@ -121,12 +40,12 @@ func TestRobustness(t *testing.T) { name string failpoint Failpoint config e2e.EtcdProcessClusterConfig - traffic *trafficConfig + traffic *traffic.Config } scenarios := []scenario{} - for _, traffic := range trafficList { + for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { scenarios = append(scenarios, scenario{ - name: "ClusterOfSize1/" + traffic.name, + name: "ClusterOfSize1/" + traffic.Name, failpoint: RandomFailpoint, traffic: &traffic, config: *e2e.NewConfig( @@ -149,7 +68,7 @@ func TestRobustness(t *testing.T) { clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) } scenarios = append(scenarios, scenario{ - name: "ClusterOfSize3/" + traffic.name, + name: "ClusterOfSize3/" + traffic.Name, failpoint: RandomFailpoint, traffic: &traffic, config: *e2e.NewConfig(clusterOfSize3Options...), @@ -174,7 +93,7 @@ func TestRobustness(t *testing.T) { scenarios = append(scenarios, scenario{ name: "Issue13766", failpoint: KillFailpoint, - traffic: &HighTraffic, + traffic: &traffic.HighTraffic, config: *e2e.NewConfig( e2e.WithSnapshotCount(100), ), @@ -182,7 +101,7 @@ func TestRobustness(t *testing.T) { scenarios = append(scenarios, scenario{ name: "Issue15220", failpoint: RandomFailpoint, - traffic: &ReqProgTraffic, + traffic: &traffic.ReqProgTraffic, config: *e2e.NewConfig( e2e.WithClusterSize(1), ), @@ -191,7 +110,7 @@ func TestRobustness(t *testing.T) { scenarios = append(scenarios, scenario{ name: "Issue15271", failpoint: BlackholeUntilSnapshot, - traffic: &HighTraffic, + traffic: &traffic.HighTraffic, config: *e2e.NewConfig( e2e.WithSnapshotCount(100), e2e.WithPeerProxy(true), @@ -201,7 +120,7 @@ func TestRobustness(t *testing.T) { } for _, scenario := range scenarios { if scenario.traffic == nil { - scenario.traffic = &defaultTraffic + scenario.traffic = &traffic.LowTraffic } t.Run(scenario.name, func(t *testing.T) { @@ -218,7 +137,7 @@ func TestRobustness(t *testing.T) { } } -func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *trafficConfig, failpoint FailpointConfig) { +func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *traffic.Config, failpoint FailpointConfig) { r := report{lg: lg} var err error r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config)) @@ -238,7 +157,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 forcestopCluster(r.clus) watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 - validateWatchResponses(t, r.clus, r.responses, traffic.requestProgress || watchProgressNotifyEnabled) + validateWatchResponses(t, r.clus, r.responses, traffic.RequestProgress || watchProgressNotifyEnabled) r.events = watchEvents(r.responses) validateEventsMatch(t, r.events) @@ -249,7 +168,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2 panicked = false } -func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) { +func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, tCfg traffic.Config, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) { g := errgroup.Group{} finishTraffic := make(chan struct{}) @@ -262,12 +181,12 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic) + operations = traffic.SimulateTraffic(ctx, t, lg, clus, tCfg, finishTraffic) maxRevisionChan <- operationsMaxRevision(operations) return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, tCfg.RequestProgress) return nil }) g.Wait() diff --git a/tests/robustness/traffic.go b/tests/robustness/traffic.go deleted file mode 100644 index 9fb5335ae..000000000 --- a/tests/robustness/traffic.go +++ /dev/null @@ -1,377 +0,0 @@ -// Copyright 2022 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package robustness - -import ( - "context" - "fmt" - "math/rand" - "strings" - "sync" - "testing" - "time" - - "github.com/anishathalye/porcupine" - "go.uber.org/zap" - "golang.org/x/time/rate" - - "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/pkg/v3/stringutil" - "go.etcd.io/etcd/tests/v3/framework/e2e" - "go.etcd.io/etcd/tests/v3/robustness/identity" - "go.etcd.io/etcd/tests/v3/robustness/model" -) - -var ( - DefaultLeaseTTL int64 = 7200 - RequestTimeout = 40 * time.Millisecond - MultiOpTxnOpCount = 4 -) - -func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation { - mux := sync.Mutex{} - endpoints := clus.EndpointsGRPC() - - ids := identity.NewIdProvider() - lm := identity.NewLeaseIdStorage() - h := model.History{} - limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) - - startTime := time.Now() - cc, err := NewClient(endpoints, ids, startTime) - if err != nil { - t.Fatal(err) - } - defer cc.Close() - wg := sync.WaitGroup{} - for i := 0; i < config.clientCount; i++ { - wg.Add(1) - c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime) - if err != nil { - t.Fatal(err) - } - go func(c *recordingClient, clientId int) { - defer wg.Done() - defer c.Close() - - config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish) - mux.Lock() - h = h.Merge(c.history.History) - mux.Unlock() - }(c, i) - } - wg.Wait() - endTime := time.Now() - - // Ensure that last operation is succeeds - time.Sleep(time.Second) - err = cc.Put(ctx, "tombstone", "true") - if err != nil { - t.Error(err) - } - h = h.Merge(cc.history.History) - - operations := h.Operations() - lg.Info("Recorded operations", zap.Int("count", len(operations))) - - qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second) - lg.Info("Average traffic", zap.Float64("qps", qps)) - if qps < config.minimalQPS { - t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) - } - return operations -} - -type trafficConfig struct { - name string - minimalQPS float64 - maximalQPS float64 - clientCount int - traffic Traffic - requestProgress bool // Request progress notifications while watching this traffic -} - -type Traffic interface { - Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) -} - -type etcdTraffic struct { - keyCount int - writeChoices []choiceWeight[etcdRequestType] - leaseTTL int64 - largePutSize int -} - -type etcdRequestType string - -const ( - Put etcdRequestType = "put" - LargePut etcdRequestType = "largePut" - Delete etcdRequestType = "delete" - MultiOpTxn etcdRequestType = "multiOpTxn" - PutWithLease etcdRequestType = "putWithLease" - LeaseRevoke etcdRequestType = "leaseRevoke" - CompareAndSet etcdRequestType = "compareAndSet" - Defragment etcdRequestType = "defragment" -) - -type kubernetesTraffic struct { - averageKeyCount int - resource string - namespace string - writeChoices []choiceWeight[KubernetesRequestType] -} - -type KubernetesRequestType string - -const ( - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" - KubernetesDelete KubernetesRequestType = "delete" -) - -func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { - for { - select { - case <-ctx.Done(): - return - case <-finish: - return - default: - } - objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true) - if err != nil { - continue - } - limiter.Wait(ctx) - err = t.Write(ctx, c, ids, objects) - if err != nil { - continue - } - limiter.Wait(ctx) - } -} - -func (t kubernetesTraffic) Write(ctx context.Context, c *recordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) { - writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - if len(objects) < t.averageKeyCount/2 { - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) - } else { - randomPod := objects[rand.Intn(len(objects))] - if len(objects) > t.averageKeyCount*3/2 { - err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) - } else { - op := pickRandom(t.writeChoices) - switch op { - case KubernetesDelete: - err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) - case KubernetesUpdate: - err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) - case KubernetesCreate: - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) - default: - panic(fmt.Sprintf("invalid choice: %q", op)) - } - } - } - cancel() - return err -} - -func (t kubernetesTraffic) generateKey() string { - return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) -} - -func (t kubernetesTraffic) Range(ctx context.Context, c *recordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Range(ctx, key, withPrefix) - cancel() - return resp, err -} - -func (t kubernetesTraffic) Create(ctx context.Context, c *recordingClient, key, value string) error { - return t.Update(ctx, c, key, value, 0) -} - -func (t kubernetesTraffic) Update(ctx context.Context, c *recordingClient, key, value string, expectedRevision int64) error { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision) - cancel() - return err -} - -func (t kubernetesTraffic) Delete(ctx context.Context, c *recordingClient, key string, expectedRevision int64) error { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := c.CompareRevisionAndDelete(ctx, key, expectedRevision) - cancel() - return err -} - -func (t etcdTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { - - for { - select { - case <-ctx.Done(): - return - case <-finish: - return - default: - } - key := fmt.Sprintf("%d", rand.Int()%t.keyCount) - // Execute one read per one write to avoid operation history include too many failed writes when etcd is down. - resp, err := t.Read(ctx, c, key) - if err != nil { - continue - } - limiter.Wait(ctx) - err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) - if err != nil { - continue - } - limiter.Wait(ctx) - } -} - -func (t etcdTraffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) { - getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Get(getCtx, key) - cancel() - return resp, err -} - -func (t etcdTraffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { - writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - - var err error - switch pickRandom(t.writeChoices) { - case Put: - err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId())) - case LargePut: - err = c.Put(writeCtx, key, randString(t.largePutSize)) - case Delete: - err = c.Delete(writeCtx, key) - case MultiOpTxn: - err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id)) - case CompareAndSet: - var expectRevision int64 - if lastValues != nil { - expectRevision = lastValues.ModRevision - } - err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) - case PutWithLease: - leaseId := lm.LeaseId(cid) - if leaseId == 0 { - leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL) - if err == nil { - lm.AddLeaseId(cid, leaseId) - limiter.Wait(ctx) - } - } - if leaseId != 0 { - putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) - err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId) - putCancel() - } - case LeaseRevoke: - leaseId := lm.LeaseId(cid) - if leaseId != 0 { - err = c.LeaseRevoke(writeCtx, leaseId) - //if LeaseRevoke has failed, do not remove the mapping. - if err == nil { - lm.RemoveLeaseId(cid) - } - } - case Defragment: - err = c.Defragment(writeCtx) - default: - panic("invalid choice") - } - cancel() - return err -} - -func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { - keys := rand.Perm(t.keyCount) - opTypes := make([]model.OperationType, 4) - - atLeastOnePut := false - for i := 0; i < MultiOpTxnOpCount; i++ { - opTypes[i] = t.pickOperationType() - if opTypes[i] == model.Put { - atLeastOnePut = true - } - } - // Ensure at least one put to make operation unique - if !atLeastOnePut { - opTypes[0] = model.Put - } - - for i, opType := range opTypes { - key := fmt.Sprintf("%d", keys[i]) - switch opType { - case model.Range: - ops = append(ops, clientv3.OpGet(key)) - case model.Put: - value := fmt.Sprintf("%d", ids.RequestId()) - ops = append(ops, clientv3.OpPut(key, value)) - case model.Delete: - ops = append(ops, clientv3.OpDelete(key)) - default: - panic("unsuported choice type") - } - } - return ops -} - -func (t etcdTraffic) pickOperationType() model.OperationType { - roll := rand.Int() % 100 - if roll < 10 { - return model.Delete - } - if roll < 50 { - return model.Range - } - return model.Put -} - -func randString(size int) string { - data := strings.Builder{} - data.Grow(size) - for i := 0; i < size; i++ { - data.WriteByte(byte(int('a') + rand.Intn(26))) - } - return data.String() -} - -type choiceWeight[T any] struct { - choice T - weight int -} - -func pickRandom[T any](choices []choiceWeight[T]) T { - sum := 0 - for _, op := range choices { - sum += op.weight - } - roll := rand.Int() % sum - for _, op := range choices { - if roll < op.weight { - return op.choice - } - roll -= op.weight - } - panic("unexpected") -} diff --git a/tests/robustness/client.go b/tests/robustness/traffic/client.go similarity index 83% rename from tests/robustness/client.go rename to tests/robustness/traffic/client.go index b8486d1f9..c008e22ca 100644 --- a/tests/robustness/client.go +++ b/tests/robustness/traffic/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package robustness +package traffic import ( "context" @@ -27,13 +27,13 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -type recordingClient struct { +type RecordingClient struct { client clientv3.Client history *model.AppendableHistory baseTime time.Time } -func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*recordingClient, error) { +func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), @@ -43,18 +43,18 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (* if err != nil { return nil, err } - return &recordingClient{ + return &RecordingClient{ client: *cc, history: model.NewAppendableHistory(ids), baseTime: baseTime, }, nil } -func (c *recordingClient) Close() error { +func (c *RecordingClient) Close() error { return c.client.Close() } -func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { +func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { resp, err := c.Range(ctx, key, false) if err != nil || len(resp) == 0 { return nil, err @@ -65,7 +65,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue panic(fmt.Sprintf("Unexpected response size: %d", len(resp))) } -func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { +func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { callTime := time.Since(c.baseTime) ops := []clientv3.OpOption{} if withPrefix { @@ -80,7 +80,7 @@ func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool return resp.Kvs, nil } -func (c *recordingClient) Put(ctx context.Context, key, value string) error { +func (c *RecordingClient) Put(ctx context.Context, key, value string) error { callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) @@ -88,7 +88,7 @@ func (c *recordingClient) Put(ctx context.Context, key, value string) error { return err } -func (c *recordingClient) Delete(ctx context.Context, key string) error { +func (c *RecordingClient) Delete(ctx context.Context, key string) error { callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) @@ -96,7 +96,7 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error { return nil } -func (c *recordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { +func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { callTime := time.Since(c.baseTime) resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)).Commit() returnTime := time.Since(c.baseTime) @@ -104,7 +104,7 @@ func (c *recordingClient) CompareRevisionAndDelete(ctx context.Context, key stri return err } -func (c *recordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { +func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { callTime := time.Since(c.baseTime) resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)).Commit() returnTime := time.Since(c.baseTime) @@ -112,7 +112,7 @@ func (c *recordingClient) CompareRevisionAndPut(ctx context.Context, key, value return err } -func (c *recordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn { +func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn { txn := c.client.Txn(ctx) var cmp clientv3.Cmp if expectedRevision == 0 { @@ -127,7 +127,7 @@ func (c *recordingClient) compareRevisionTxn(ctx context.Context, key string, ex ) } -func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { +func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { callTime := time.Since(c.baseTime) txn := c.client.Txn(ctx) resp, err := txn.If( @@ -140,7 +140,7 @@ func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []cli return err } -func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { +func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) @@ -152,7 +152,7 @@ func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err return leaseId, err } -func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { +func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) @@ -160,7 +160,7 @@ func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error return err } -func (c *recordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { +func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { callTime := time.Since(c.baseTime) opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) resp, err := c.client.Put(ctx, key, value, opts) @@ -169,7 +169,7 @@ func (c *recordingClient) PutWithLease(ctx context.Context, key string, value st return err } -func (c *recordingClient) Defragment(ctx context.Context) error { +func (c *RecordingClient) Defragment(ctx context.Context) error { callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go new file mode 100644 index 000000000..9e3233e5f --- /dev/null +++ b/tests/robustness/traffic/etcd.go @@ -0,0 +1,235 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +import ( + "context" + "fmt" + "math/rand" + + "golang.org/x/time/rate" + + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/model" +) + +var ( + LowTraffic = Config{ + Name: "LowTraffic", + minimalQPS: 100, + maximalQPS: 200, + clientCount: 8, + traffic: etcdTraffic{ + keyCount: 10, + leaseTTL: DefaultLeaseTTL, + largePutSize: 32769, + writeChoices: []choiceWeight[etcdRequestType]{ + {choice: Put, weight: 45}, + {choice: LargePut, weight: 5}, + {choice: Delete, weight: 10}, + {choice: MultiOpTxn, weight: 10}, + {choice: PutWithLease, weight: 10}, + {choice: LeaseRevoke, weight: 10}, + {choice: CompareAndSet, weight: 10}, + }, + }, + } + ReqProgTraffic = Config{ + Name: "RequestProgressTraffic", + minimalQPS: 100, + maximalQPS: 200, + clientCount: 8, + RequestProgress: true, + traffic: etcdTraffic{ + keyCount: 10, + leaseTTL: DefaultLeaseTTL, + largePutSize: 32769, + writeChoices: []choiceWeight[etcdRequestType]{ + {choice: Put, weight: 45}, + {choice: LargePut, weight: 5}, + {choice: Delete, weight: 10}, + {choice: MultiOpTxn, weight: 10}, + {choice: PutWithLease, weight: 10}, + {choice: LeaseRevoke, weight: 10}, + {choice: CompareAndSet, weight: 10}, + }, + }, + } + HighTraffic = Config{ + Name: "HighTraffic", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + traffic: etcdTraffic{ + keyCount: 10, + largePutSize: 32769, + leaseTTL: DefaultLeaseTTL, + writeChoices: []choiceWeight[etcdRequestType]{ + {choice: Put, weight: 85}, + {choice: MultiOpTxn, weight: 10}, + {choice: LargePut, weight: 5}, + }, + }, + } +) + +type etcdTraffic struct { + keyCount int + writeChoices []choiceWeight[etcdRequestType] + leaseTTL int64 + largePutSize int +} + +type etcdRequestType string + +const ( + Put etcdRequestType = "put" + LargePut etcdRequestType = "largePut" + Delete etcdRequestType = "delete" + MultiOpTxn etcdRequestType = "multiOpTxn" + PutWithLease etcdRequestType = "putWithLease" + LeaseRevoke etcdRequestType = "leaseRevoke" + CompareAndSet etcdRequestType = "compareAndSet" + Defragment etcdRequestType = "defragment" +) + +func (t etcdTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { + + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + default: + } + key := fmt.Sprintf("%d", rand.Int()%t.keyCount) + // Execute one read per one write to avoid operation history include too many failed writes when etcd is down. + resp, err := t.Read(ctx, c, key) + if err != nil { + continue + } + limiter.Wait(ctx) + err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) + if err != nil { + continue + } + limiter.Wait(ctx) + } +} + +func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) (*mvccpb.KeyValue, error) { + getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := c.Get(getCtx, key) + cancel() + return resp, err +} + +func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { + writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + + var err error + switch etcdRequestType(pickRandom(t.writeChoices)) { + case Put: + err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId())) + case LargePut: + err = c.Put(writeCtx, key, randString(t.largePutSize)) + case Delete: + err = c.Delete(writeCtx, key) + case MultiOpTxn: + err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id)) + case CompareAndSet: + var expectRevision int64 + if lastValues != nil { + expectRevision = lastValues.ModRevision + } + err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) + case PutWithLease: + leaseId := lm.LeaseId(cid) + if leaseId == 0 { + leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL) + if err == nil { + lm.AddLeaseId(cid, leaseId) + limiter.Wait(ctx) + } + } + if leaseId != 0 { + putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) + err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId) + putCancel() + } + case LeaseRevoke: + leaseId := lm.LeaseId(cid) + if leaseId != 0 { + err = c.LeaseRevoke(writeCtx, leaseId) + //if LeaseRevoke has failed, do not remove the mapping. + if err == nil { + lm.RemoveLeaseId(cid) + } + } + case Defragment: + err = c.Defragment(writeCtx) + default: + panic("invalid choice") + } + cancel() + return err +} + +func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { + keys := rand.Perm(t.keyCount) + opTypes := make([]model.OperationType, 4) + + atLeastOnePut := false + for i := 0; i < MultiOpTxnOpCount; i++ { + opTypes[i] = t.pickOperationType() + if opTypes[i] == model.Put { + atLeastOnePut = true + } + } + // Ensure at least one put to make operation unique + if !atLeastOnePut { + opTypes[0] = model.Put + } + + for i, opType := range opTypes { + key := fmt.Sprintf("%d", keys[i]) + switch opType { + case model.Range: + ops = append(ops, clientv3.OpGet(key)) + case model.Put: + value := fmt.Sprintf("%d", ids.RequestId()) + ops = append(ops, clientv3.OpPut(key, value)) + case model.Delete: + ops = append(ops, clientv3.OpDelete(key)) + default: + panic("unsuported choice type") + } + } + return ops +} + +func (t etcdTraffic) pickOperationType() model.OperationType { + roll := rand.Int() % 100 + if roll < 10 { + return model.Delete + } + if roll < 50 { + return model.Range + } + return model.Put +} diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go new file mode 100644 index 000000000..376da6f55 --- /dev/null +++ b/tests/robustness/traffic/kubernetes.go @@ -0,0 +1,138 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +import ( + "context" + "fmt" + "math/rand" + + "golang.org/x/time/rate" + + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/pkg/v3/stringutil" + "go.etcd.io/etcd/tests/v3/robustness/identity" +) + +var ( + KubernetesTraffic = Config{ + Name: "Kubernetes", + minimalQPS: 200, + maximalQPS: 1000, + clientCount: 12, + traffic: kubernetesTraffic{ + averageKeyCount: 5, + resource: "pods", + namespace: "default", + writeChoices: []choiceWeight[KubernetesRequestType]{ + {choice: KubernetesUpdate, weight: 75}, + {choice: KubernetesDelete, weight: 15}, + {choice: KubernetesCreate, weight: 10}, + }, + }, + } +) + +type kubernetesTraffic struct { + averageKeyCount int + resource string + namespace string + writeChoices []choiceWeight[KubernetesRequestType] +} + +type KubernetesRequestType string + +const ( + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" +) + +func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + default: + } + objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true) + if err != nil { + continue + } + limiter.Wait(ctx) + err = t.Write(ctx, c, ids, objects) + if err != nil { + continue + } + limiter.Wait(ctx) + } +} + +func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) { + writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + if len(objects) < t.averageKeyCount/2 { + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + } else { + randomPod := objects[rand.Intn(len(objects))] + if len(objects) > t.averageKeyCount*3/2 { + err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + } else { + op := KubernetesRequestType(pickRandom(t.writeChoices)) + switch op { + case KubernetesDelete: + err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + case KubernetesUpdate: + err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) + case KubernetesCreate: + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + default: + panic(fmt.Sprintf("invalid choice: %q", op)) + } + } + } + cancel() + return err +} + +func (t kubernetesTraffic) generateKey() string { + return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) +} + +func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := c.Range(ctx, key, withPrefix) + cancel() + return resp, err +} + +func (t kubernetesTraffic) Create(ctx context.Context, c *RecordingClient, key, value string) error { + return t.Update(ctx, c, key, value, 0) +} + +func (t kubernetesTraffic) Update(ctx context.Context, c *RecordingClient, key, value string, expectedRevision int64) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision) + cancel() + return err +} + +func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key string, expectedRevision int64) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + err := c.CompareRevisionAndDelete(ctx, key, expectedRevision) + cancel() + return err +} diff --git a/tests/robustness/traffic/random.go b/tests/robustness/traffic/random.go new file mode 100644 index 000000000..974c34ebb --- /dev/null +++ b/tests/robustness/traffic/random.go @@ -0,0 +1,49 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +import ( + "math/rand" + "strings" +) + +func randString(size int) string { + data := strings.Builder{} + data.Grow(size) + for i := 0; i < size; i++ { + data.WriteByte(byte(int('a') + rand.Intn(26))) + } + return data.String() +} + +type choiceWeight[T any] struct { + choice T + weight int +} + +func pickRandom[T any](choices []choiceWeight[T]) T { + sum := 0 + for _, op := range choices { + sum += op.weight + } + roll := rand.Int() % sum + for _, op := range choices { + if roll < op.weight { + return op.choice + } + roll -= op.weight + } + panic("unexpected") +} diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go new file mode 100644 index 000000000..bf4c8513e --- /dev/null +++ b/tests/robustness/traffic/traffic.go @@ -0,0 +1,103 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traffic + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/anishathalye/porcupine" + "go.uber.org/zap" + "golang.org/x/time/rate" + + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/model" +) + +var ( + DefaultLeaseTTL int64 = 7200 + RequestTimeout = 40 * time.Millisecond + MultiOpTxnOpCount = 4 +) + +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}) []porcupine.Operation { + mux := sync.Mutex{} + endpoints := clus.EndpointsGRPC() + + ids := identity.NewIdProvider() + lm := identity.NewLeaseIdStorage() + h := model.History{} + limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) + + startTime := time.Now() + cc, err := NewClient(endpoints, ids, startTime) + if err != nil { + t.Fatal(err) + } + defer cc.Close() + wg := sync.WaitGroup{} + for i := 0; i < config.clientCount; i++ { + wg.Add(1) + c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime) + if err != nil { + t.Fatal(err) + } + go func(c *RecordingClient, clientId int) { + defer wg.Done() + defer c.Close() + + config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish) + mux.Lock() + h = h.Merge(c.history.History) + mux.Unlock() + }(c, i) + } + wg.Wait() + endTime := time.Now() + + // Ensure that last operation is succeeds + time.Sleep(time.Second) + err = cc.Put(ctx, "tombstone", "true") + if err != nil { + t.Error(err) + } + h = h.Merge(cc.history.History) + + operations := h.Operations() + lg.Info("Recorded operations", zap.Int("count", len(operations))) + + qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second) + lg.Info("Average traffic", zap.Float64("qps", qps)) + if qps < config.minimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) + } + return operations +} + +type Config struct { + Name string + minimalQPS float64 + maximalQPS float64 + clientCount int + traffic Traffic + RequestProgress bool // Request progress notifications while watching this traffic +} + +type Traffic interface { + Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) +}