tests/robustness: Expect revions to be unique for Kubernetes Traffic

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-05-16 12:34:01 +02:00
parent f3c9db9c46
commit 4872b679a5
5 changed files with 42 additions and 23 deletions

View File

@ -65,7 +65,10 @@ func TestRobustness(t *testing.T) {
name: "ClusterOfSize3/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: traffic,
cluster: *e2e.NewConfig(clusterOfSize3Options...),
watch: watchConfig{
expectUniqueRevision: traffic.Traffic.ExpectUniqueRevision(),
},
cluster: *e2e.NewConfig(clusterOfSize3Options...),
})
}
scenarios = append(scenarios, testScenario{
@ -157,7 +160,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled)
r.visualizeHistory = validateCorrectness(t, lg, r.clientReports)
r.visualizeHistory = validateCorrectness(t, lg, s.watch, r.clientReports)
panicked = false
}
@ -211,8 +214,8 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
return clus.ConcurrentStop()
}
func validateCorrectness(t *testing.T, lg *zap.Logger, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, reports)
func validateCorrectness(t *testing.T, lg *zap.Logger, cfg watchConfig, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, cfg, reports)
operations := operationsFromClientReports(reports)
return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations)
}

View File

@ -33,7 +33,7 @@ var (
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: etcdTraffic{
Traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
@ -53,7 +53,7 @@ var (
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: etcdTraffic{
Traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
@ -73,6 +73,10 @@ type etcdTraffic struct {
largePutSize int
}
func (t etcdTraffic) ExpectUniqueRevision() bool {
return false
}
type etcdRequestType string
const (

View File

@ -36,7 +36,7 @@ var (
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: kubernetesTraffic{
Traffic: kubernetesTraffic{
averageKeyCount: 5,
resource: "pods",
namespace: "default",
@ -56,6 +56,10 @@ type kubernetesTraffic struct {
writeChoices []choiceWeight[KubernetesRequestType]
}
func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true
}
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
kc := &kubernetesClient{client: c}
s := newStorage()

View File

@ -59,7 +59,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, c, limiter, ids, lm, finish)
config.Traffic.Run(ctx, c, limiter, ids, lm, finish)
mux.Lock()
reports = append(reports, c.Report())
mux.Unlock()
@ -95,9 +95,10 @@ type Config struct {
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
Traffic Traffic
}
type Traffic interface {
Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
ExpectUniqueRevision() bool
}

View File

@ -74,7 +74,8 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
}
type watchConfig struct {
requestProgress bool
requestProgress bool
expectUniqueRevision bool
}
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
@ -140,14 +141,14 @@ func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 {
return maxRevision
}
func validateWatchCorrectness(t *testing.T, reports []traffic.ClientReport) {
func validateWatchCorrectness(t *testing.T, cfg watchConfig, reports []traffic.ClientReport) {
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports {
validateOrdered(t, r)
validateUnique(t, r)
validateUnique(t, cfg.expectUniqueRevision, r)
validateAtomic(t, r)
// TODO: Validate Resumable
validateBookmarkable(t, r)
// TODO: Validate presumable
}
validateEventsMatch(t, reports)
// Expects that longest history encompasses all events.
@ -202,19 +203,25 @@ func validateOrdered(t *testing.T, report traffic.ClientReport) {
}
}
func validateUnique(t *testing.T, report traffic.ClientReport) {
type revisionKey struct {
revision int64
key string
}
uniqueOperations := map[revisionKey]struct{}{}
func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.ClientReport) {
uniqueOperations := map[interface{}]struct{}{}
for _, resp := range report.Watch {
for _, event := range resp.Events {
rk := revisionKey{key: event.Op.Key, revision: event.Revision}
if _, found := uniqueOperations[rk]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", rk.key, rk.revision, report.ClientId)
var key interface{}
if expectUniqueRevision {
key = event.Revision
} else {
key = struct {
revision int64
key string
}{event.Revision, event.Op.Key}
}
uniqueOperations[rk] = struct{}{}
if _, found := uniqueOperations[key]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Op.Key, event.Revision, report.ClientId)
}
uniqueOperations[key] = struct{}{}
}
}
}