test/robustness: Create dedicated traffic package

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-05-08 19:37:15 +02:00
parent f6161673af
commit ad20230e07
8 changed files with 558 additions and 490 deletions

View File

@ -31,7 +31,8 @@ import (
)
const (
triggerTimeout = time.Minute
triggerTimeout = time.Minute
waitBetweenFailpointTriggers = time.Second
)
var (

View File

@ -27,88 +27,7 @@ import (
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
const (
// waitBetweenFailpointTriggers
waitBetweenFailpointTriggers = time.Second
)
var (
LowTraffic = trafficConfig{
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
requestProgress: false,
traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 45},
{choice: LargePut, weight: 5},
{choice: Delete, weight: 10},
{choice: MultiOpTxn, weight: 10},
{choice: PutWithLease, weight: 10},
{choice: LeaseRevoke, weight: 10},
{choice: CompareAndSet, weight: 10},
},
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: false,
traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 85},
{choice: MultiOpTxn, weight: 10},
{choice: LargePut, weight: 5},
},
},
}
KubernetesTraffic = trafficConfig{
name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: kubernetesTraffic{
averageKeyCount: 5,
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 75},
{choice: KubernetesDelete, weight: 15},
{choice: KubernetesCreate, weight: 10},
},
},
}
ReqProgTraffic = trafficConfig{
name: "RequestProgressTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: true,
traffic: etcdTraffic{
keyCount: 10,
largePutSize: 8196,
leaseTTL: DefaultLeaseTTL,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 95},
{choice: LargePut, weight: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic, KubernetesTraffic,
}
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func TestRobustness(t *testing.T) {
@ -121,12 +40,12 @@ func TestRobustness(t *testing.T) {
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
traffic *trafficConfig
traffic *traffic.Config
}
scenarios := []scenario{}
for _, traffic := range trafficList {
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
scenarios = append(scenarios, scenario{
name: "ClusterOfSize1/" + traffic.name,
name: "ClusterOfSize1/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(
@ -149,7 +68,7 @@ func TestRobustness(t *testing.T) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
}
scenarios = append(scenarios, scenario{
name: "ClusterOfSize3/" + traffic.name,
name: "ClusterOfSize3/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(clusterOfSize3Options...),
@ -174,7 +93,7 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, scenario{
name: "Issue13766",
failpoint: KillFailpoint,
traffic: &HighTraffic,
traffic: &traffic.HighTraffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
),
@ -182,7 +101,7 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, scenario{
name: "Issue15220",
failpoint: RandomFailpoint,
traffic: &ReqProgTraffic,
traffic: &traffic.ReqProgTraffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
@ -191,7 +110,7 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, scenario{
name: "Issue15271",
failpoint: BlackholeUntilSnapshot,
traffic: &HighTraffic,
traffic: &traffic.HighTraffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
@ -201,7 +120,7 @@ func TestRobustness(t *testing.T) {
}
for _, scenario := range scenarios {
if scenario.traffic == nil {
scenario.traffic = &defaultTraffic
scenario.traffic = &traffic.LowTraffic
}
t.Run(scenario.name, func(t *testing.T) {
@ -218,7 +137,7 @@ func TestRobustness(t *testing.T) {
}
}
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *trafficConfig, failpoint FailpointConfig) {
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *traffic.Config, failpoint FailpointConfig) {
r := report{lg: lg}
var err error
r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
@ -238,7 +157,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
forcestopCluster(r.clus)
watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.clus, r.responses, traffic.requestProgress || watchProgressNotifyEnabled)
validateWatchResponses(t, r.clus, r.responses, traffic.RequestProgress || watchProgressNotifyEnabled)
r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)
@ -249,7 +168,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
panicked = false
}
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, tCfg traffic.Config, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
g := errgroup.Group{}
finishTraffic := make(chan struct{})
@ -262,12 +181,12 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
operations = traffic.SimulateTraffic(ctx, t, lg, clus, tCfg, finishTraffic)
maxRevisionChan <- operationsMaxRevision(operations)
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress)
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, tCfg.RequestProgress)
return nil
})
g.Wait()

View File

@ -1,377 +0,0 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package robustness
import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
"time"
"github.com/anishathalye/porcupine"
"go.uber.org/zap"
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
MultiOpTxnOpCount = 4
)
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()
ids := identity.NewIdProvider()
lm := identity.NewLeaseIdStorage()
h := model.History{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now()
cc, err := NewClient(endpoints, ids, startTime)
if err != nil {
t.Fatal(err)
}
defer cc.Close()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient, clientId int) {
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
mux.Lock()
h = h.Merge(c.history.History)
mux.Unlock()
}(c, i)
}
wg.Wait()
endTime := time.Now()
// Ensure that last operation is succeeds
time.Sleep(time.Second)
err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
}
h = h.Merge(cc.history.History)
operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))
qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
lg.Info("Average traffic", zap.Float64("qps", qps))
if qps < config.minimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps)
}
return operations
}
type trafficConfig struct {
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
requestProgress bool // Request progress notifications while watching this traffic
}
type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
}
type etcdTraffic struct {
keyCount int
writeChoices []choiceWeight[etcdRequestType]
leaseTTL int64
largePutSize int
}
type etcdRequestType string
const (
Put etcdRequestType = "put"
LargePut etcdRequestType = "largePut"
Delete etcdRequestType = "delete"
MultiOpTxn etcdRequestType = "multiOpTxn"
PutWithLease etcdRequestType = "putWithLease"
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
)
type kubernetesTraffic struct {
averageKeyCount int
resource string
namespace string
writeChoices []choiceWeight[KubernetesRequestType]
}
type KubernetesRequestType string
const (
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
)
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true)
if err != nil {
continue
}
limiter.Wait(ctx)
err = t.Write(ctx, c, ids, objects)
if err != nil {
continue
}
limiter.Wait(ctx)
}
}
func (t kubernetesTraffic) Write(ctx context.Context, c *recordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
if len(objects) < t.averageKeyCount/2 {
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
} else {
randomPod := objects[rand.Intn(len(objects))]
if len(objects) > t.averageKeyCount*3/2 {
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
} else {
op := pickRandom(t.writeChoices)
switch op {
case KubernetesDelete:
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
case KubernetesUpdate:
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision)
case KubernetesCreate:
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
}
}
cancel()
return err
}
func (t kubernetesTraffic) generateKey() string {
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
}
func (t kubernetesTraffic) Range(ctx context.Context, c *recordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Range(ctx, key, withPrefix)
cancel()
return resp, err
}
func (t kubernetesTraffic) Create(ctx context.Context, c *recordingClient, key, value string) error {
return t.Update(ctx, c, key, value, 0)
}
func (t kubernetesTraffic) Update(ctx context.Context, c *recordingClient, key, value string, expectedRevision int64) error {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision)
cancel()
return err
}
func (t kubernetesTraffic) Delete(ctx context.Context, c *recordingClient, key string, expectedRevision int64) error {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
err := c.CompareRevisionAndDelete(ctx, key, expectedRevision)
cancel()
return err
}
func (t etcdTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
resp, err := t.Read(ctx, c, key)
if err != nil {
continue
}
limiter.Wait(ctx)
err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp)
if err != nil {
continue
}
limiter.Wait(ctx)
}
}
func (t etcdTraffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
return resp, err
}
func (t etcdTraffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error
switch pickRandom(t.writeChoices) {
case Put:
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId()))
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case MultiOpTxn:
err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id))
case CompareAndSet:
var expectRevision int64
if lastValues != nil {
expectRevision = lastValues.ModRevision
}
err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision)
case PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL)
if err == nil {
lm.AddLeaseId(cid, leaseId)
limiter.Wait(ctx)
}
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId)
putCancel()
}
case LeaseRevoke:
leaseId := lm.LeaseId(cid)
if leaseId != 0 {
err = c.LeaseRevoke(writeCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
lm.RemoveLeaseId(cid)
}
}
case Defragment:
err = c.Defragment(writeCtx)
default:
panic("invalid choice")
}
cancel()
return err
}
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount)
opTypes := make([]model.OperationType, 4)
atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType()
if opTypes[i] == model.Put {
atLeastOnePut = true
}
}
// Ensure at least one put to make operation unique
if !atLeastOnePut {
opTypes[0] = model.Put
}
for i, opType := range opTypes {
key := fmt.Sprintf("%d", keys[i])
switch opType {
case model.Range:
ops = append(ops, clientv3.OpGet(key))
case model.Put:
value := fmt.Sprintf("%d", ids.RequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.Delete:
ops = append(ops, clientv3.OpDelete(key))
default:
panic("unsuported choice type")
}
}
return ops
}
func (t etcdTraffic) pickOperationType() model.OperationType {
roll := rand.Int() % 100
if roll < 10 {
return model.Delete
}
if roll < 50 {
return model.Range
}
return model.Put
}
func randString(size int) string {
data := strings.Builder{}
data.Grow(size)
for i := 0; i < size; i++ {
data.WriteByte(byte(int('a') + rand.Intn(26)))
}
return data.String()
}
type choiceWeight[T any] struct {
choice T
weight int
}
func pickRandom[T any](choices []choiceWeight[T]) T {
sum := 0
for _, op := range choices {
sum += op.weight
}
roll := rand.Int() % sum
for _, op := range choices {
if roll < op.weight {
return op.choice
}
roll -= op.weight
}
panic("unexpected")
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package robustness
package traffic
import (
"context"
@ -27,13 +27,13 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/model"
)
type recordingClient struct {
type RecordingClient struct {
client clientv3.Client
history *model.AppendableHistory
baseTime time.Time
}
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*recordingClient, error) {
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
@ -43,18 +43,18 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
if err != nil {
return nil, err
}
return &recordingClient{
return &RecordingClient{
client: *cc,
history: model.NewAppendableHistory(ids),
baseTime: baseTime,
}, nil
}
func (c *recordingClient) Close() error {
func (c *RecordingClient) Close() error {
return c.client.Close()
}
func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
resp, err := c.Range(ctx, key, false)
if err != nil || len(resp) == 0 {
return nil, err
@ -65,7 +65,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue
panic(fmt.Sprintf("Unexpected response size: %d", len(resp)))
}
func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
callTime := time.Since(c.baseTime)
ops := []clientv3.OpOption{}
if withPrefix {
@ -80,7 +80,7 @@ func (c *recordingClient) Range(ctx context.Context, key string, withPrefix bool
return resp.Kvs, nil
}
func (c *recordingClient) Put(ctx context.Context, key, value string) error {
func (c *RecordingClient) Put(ctx context.Context, key, value string) error {
callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value)
returnTime := time.Since(c.baseTime)
@ -88,7 +88,7 @@ func (c *recordingClient) Put(ctx context.Context, key, value string) error {
return err
}
func (c *recordingClient) Delete(ctx context.Context, key string) error {
func (c *RecordingClient) Delete(ctx context.Context, key string) error {
callTime := time.Since(c.baseTime)
resp, err := c.client.Delete(ctx, key)
returnTime := time.Since(c.baseTime)
@ -96,7 +96,7 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error {
return nil
}
func (c *recordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
callTime := time.Since(c.baseTime)
resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)).Commit()
returnTime := time.Since(c.baseTime)
@ -104,7 +104,7 @@ func (c *recordingClient) CompareRevisionAndDelete(ctx context.Context, key stri
return err
}
func (c *recordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
callTime := time.Since(c.baseTime)
resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)).Commit()
returnTime := time.Since(c.baseTime)
@ -112,7 +112,7 @@ func (c *recordingClient) CompareRevisionAndPut(ctx context.Context, key, value
return err
}
func (c *recordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn {
func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn {
txn := c.client.Txn(ctx)
var cmp clientv3.Cmp
if expectedRevision == 0 {
@ -127,7 +127,7 @@ func (c *recordingClient) compareRevisionTxn(ctx context.Context, key string, ex
)
}
func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error {
func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error {
callTime := time.Since(c.baseTime)
txn := c.client.Txn(ctx)
resp, err := txn.If(
@ -140,7 +140,7 @@ func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []cli
return err
}
func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Since(c.baseTime)
@ -152,7 +152,7 @@ func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err
return leaseId, err
}
func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
returnTime := time.Since(c.baseTime)
@ -160,7 +160,7 @@ func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error
return err
}
func (c *recordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
callTime := time.Since(c.baseTime)
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
resp, err := c.client.Put(ctx, key, value, opts)
@ -169,7 +169,7 @@ func (c *recordingClient) PutWithLease(ctx context.Context, key string, value st
return err
}
func (c *recordingClient) Defragment(ctx context.Context) error {
func (c *RecordingClient) Defragment(ctx context.Context) error {
callTime := time.Since(c.baseTime)
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
returnTime := time.Since(c.baseTime)

View File

@ -0,0 +1,235 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
import (
"context"
"fmt"
"math/rand"
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
var (
LowTraffic = Config{
Name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 45},
{choice: LargePut, weight: 5},
{choice: Delete, weight: 10},
{choice: MultiOpTxn, weight: 10},
{choice: PutWithLease, weight: 10},
{choice: LeaseRevoke, weight: 10},
{choice: CompareAndSet, weight: 10},
},
},
}
ReqProgTraffic = Config{
Name: "RequestProgressTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
RequestProgress: true,
traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 45},
{choice: LargePut, weight: 5},
{choice: Delete, weight: 10},
{choice: MultiOpTxn, weight: 10},
{choice: PutWithLease, weight: 10},
{choice: LeaseRevoke, weight: 10},
{choice: CompareAndSet, weight: 10},
},
},
}
HighTraffic = Config{
Name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 85},
{choice: MultiOpTxn, weight: 10},
{choice: LargePut, weight: 5},
},
},
}
)
type etcdTraffic struct {
keyCount int
writeChoices []choiceWeight[etcdRequestType]
leaseTTL int64
largePutSize int
}
type etcdRequestType string
const (
Put etcdRequestType = "put"
LargePut etcdRequestType = "largePut"
Delete etcdRequestType = "delete"
MultiOpTxn etcdRequestType = "multiOpTxn"
PutWithLease etcdRequestType = "putWithLease"
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
)
func (t etcdTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
resp, err := t.Read(ctx, c, key)
if err != nil {
continue
}
limiter.Wait(ctx)
err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp)
if err != nil {
continue
}
limiter.Wait(ctx)
}
}
func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) (*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
return resp, err
}
func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error
switch etcdRequestType(pickRandom(t.writeChoices)) {
case Put:
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId()))
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case MultiOpTxn:
err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id))
case CompareAndSet:
var expectRevision int64
if lastValues != nil {
expectRevision = lastValues.ModRevision
}
err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision)
case PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL)
if err == nil {
lm.AddLeaseId(cid, leaseId)
limiter.Wait(ctx)
}
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId)
putCancel()
}
case LeaseRevoke:
leaseId := lm.LeaseId(cid)
if leaseId != 0 {
err = c.LeaseRevoke(writeCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
lm.RemoveLeaseId(cid)
}
}
case Defragment:
err = c.Defragment(writeCtx)
default:
panic("invalid choice")
}
cancel()
return err
}
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount)
opTypes := make([]model.OperationType, 4)
atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType()
if opTypes[i] == model.Put {
atLeastOnePut = true
}
}
// Ensure at least one put to make operation unique
if !atLeastOnePut {
opTypes[0] = model.Put
}
for i, opType := range opTypes {
key := fmt.Sprintf("%d", keys[i])
switch opType {
case model.Range:
ops = append(ops, clientv3.OpGet(key))
case model.Put:
value := fmt.Sprintf("%d", ids.RequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.Delete:
ops = append(ops, clientv3.OpDelete(key))
default:
panic("unsuported choice type")
}
}
return ops
}
func (t etcdTraffic) pickOperationType() model.OperationType {
roll := rand.Int() % 100
if roll < 10 {
return model.Delete
}
if roll < 50 {
return model.Range
}
return model.Put
}

View File

@ -0,0 +1,138 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
import (
"context"
"fmt"
"math/rand"
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)
var (
KubernetesTraffic = Config{
Name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: kubernetesTraffic{
averageKeyCount: 5,
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 75},
{choice: KubernetesDelete, weight: 15},
{choice: KubernetesCreate, weight: 10},
},
},
}
)
type kubernetesTraffic struct {
averageKeyCount int
resource string
namespace string
writeChoices []choiceWeight[KubernetesRequestType]
}
type KubernetesRequestType string
const (
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
)
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
}
objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true)
if err != nil {
continue
}
limiter.Wait(ctx)
err = t.Write(ctx, c, ids, objects)
if err != nil {
continue
}
limiter.Wait(ctx)
}
}
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
if len(objects) < t.averageKeyCount/2 {
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
} else {
randomPod := objects[rand.Intn(len(objects))]
if len(objects) > t.averageKeyCount*3/2 {
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
} else {
op := KubernetesRequestType(pickRandom(t.writeChoices))
switch op {
case KubernetesDelete:
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
case KubernetesUpdate:
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision)
case KubernetesCreate:
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
}
}
cancel()
return err
}
func (t kubernetesTraffic) generateKey() string {
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
}
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Range(ctx, key, withPrefix)
cancel()
return resp, err
}
func (t kubernetesTraffic) Create(ctx context.Context, c *RecordingClient, key, value string) error {
return t.Update(ctx, c, key, value, 0)
}
func (t kubernetesTraffic) Update(ctx context.Context, c *RecordingClient, key, value string, expectedRevision int64) error {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision)
cancel()
return err
}
func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key string, expectedRevision int64) error {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
err := c.CompareRevisionAndDelete(ctx, key, expectedRevision)
cancel()
return err
}

View File

@ -0,0 +1,49 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
import (
"math/rand"
"strings"
)
func randString(size int) string {
data := strings.Builder{}
data.Grow(size)
for i := 0; i < size; i++ {
data.WriteByte(byte(int('a') + rand.Intn(26)))
}
return data.String()
}
type choiceWeight[T any] struct {
choice T
weight int
}
func pickRandom[T any](choices []choiceWeight[T]) T {
sum := 0
for _, op := range choices {
sum += op.weight
}
roll := rand.Int() % sum
for _, op := range choices {
if roll < op.weight {
return op.choice
}
roll -= op.weight
}
panic("unexpected")
}

View File

@ -0,0 +1,103 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package traffic
import (
"context"
"sync"
"testing"
"time"
"github.com/anishathalye/porcupine"
"go.uber.org/zap"
"golang.org/x/time/rate"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
MultiOpTxnOpCount = 4
)
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()
ids := identity.NewIdProvider()
lm := identity.NewLeaseIdStorage()
h := model.History{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now()
cc, err := NewClient(endpoints, ids, startTime)
if err != nil {
t.Fatal(err)
}
defer cc.Close()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
if err != nil {
t.Fatal(err)
}
go func(c *RecordingClient, clientId int) {
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
mux.Lock()
h = h.Merge(c.history.History)
mux.Unlock()
}(c, i)
}
wg.Wait()
endTime := time.Now()
// Ensure that last operation is succeeds
time.Sleep(time.Second)
err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
}
h = h.Merge(cc.history.History)
operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))
qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
lg.Info("Average traffic", zap.Float64("qps", qps))
if qps < config.minimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps)
}
return operations
}
type Config struct {
Name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
RequestProgress bool // Request progress notifications while watching this traffic
}
type Traffic interface {
Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
}