From 09c462e2ea5bb1e7289c4f6f437e9ed40008b996 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sat, 24 Jun 2023 19:02:44 +0200 Subject: [PATCH] tests/robustness: Add resumable validation and improve reliable validation Signed-off-by: Marek Siarkowicz --- tests/robustness/model/replay.go | 16 ++++++ tests/robustness/traffic/client.go | 13 ++--- tests/robustness/validate/operations.go | 2 - tests/robustness/validate/watch.go | 70 +++++++++++++++++++++---- 4 files changed, 80 insertions(+), 21 deletions(-) diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go index 38db6797f..d5e159e7b 100644 --- a/tests/robustness/model/replay.go +++ b/tests/robustness/model/replay.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "strings" ) func NewReplay(eventHistory []WatchEvent) *EtcdReplay { @@ -106,3 +107,18 @@ type Event struct { Key string Value ValueOrHash } + +func (e Event) Match(request WatchRequest) bool { + if request.WithPrefix { + return strings.HasPrefix(e.Key, request.Key) + } else { + return e.Key == request.Key + } +} + +type WatchRequest struct { + Key string + Revision int64 + WithPrefix bool + WithProgressNotify bool +} diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 15bdea83b..faf8fcfb9 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -46,17 +46,10 @@ type RecordingClient struct { } type WatchOperation struct { - Request WatchRequest + Request model.WatchRequest Responses []WatchResponse } -type WatchRequest struct { - Key string - Revision int64 - WithPrefix bool - WithProgressNotify bool -} - type WatchResponse struct { Events []model.WatchEvent IsProgressNotify bool @@ -228,7 +221,7 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR } func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan { - request := WatchRequest{ + request := model.WatchRequest{ Key: key, Revision: rev, WithPrefix: withPrefix, @@ -238,7 +231,7 @@ func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, with } -func (c *RecordingClient) watch(ctx context.Context, request WatchRequest) clientv3.WatchChan { +func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest) clientv3.WatchChan { ops := []clientv3.OpOption{} if request.WithPrefix { ops = append(ops, clientv3.WithPrefix()) diff --git a/tests/robustness/validate/operations.go b/tests/robustness/validate/operations.go index b83189c5f..896c9d42a 100644 --- a/tests/robustness/validate/operations.go +++ b/tests/robustness/validate/operations.go @@ -45,8 +45,6 @@ func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []p t.Fatalf("Unknown Linearization") } lg.Info("Validating serializable operations") - // TODO: Use linearization result instead of event history to get order of events - // This is currently impossible as porcupine doesn't expose operation order created during linearization. validateSerializableOperations(t, operations, eventHistory) return visualize } diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index ab991e12c..9bb5c1f99 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -30,12 +30,15 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []m validateOrdered(t, r) validateUnique(t, cfg.ExpectRevisionUnique, r) validateAtomic(t, r) - // TODO: Validate Resumable validateBookmarkable(t, r) } + // TODO: Use linearization result instead of event history to get order of events + // This is currently impossible as porcupine doesn't expose operation order created during linearization. eventHistory := mergeWatchEventHistory(t, reports) - // TODO: Validate that each watch report is reliable, not only the longest one. - validateReliable(t, eventHistory) + for _, r := range reports { + validateReliable(t, eventHistory, r) + validateResumable(t, eventHistory, r) + } return eventHistory } @@ -107,16 +110,63 @@ func validateAtomic(t *testing.T, report traffic.ClientReport) { } } -func validateReliable(t *testing.T, events []model.WatchEvent) { - var lastEventRevision int64 = 1 - for _, event := range events { - if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { - t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision) +func validateReliable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) { + for _, op := range report.Watch { + index := 0 + revision := firstRevision(op) + for index < len(events) && events[index].Revision < revision { + index++ + } + if index == len(events) { + continue + } + for _, resp := range op.Responses { + for _, event := range resp.Events { + if events[index].Match(op.Request) && events[index] != event { + t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, event missing: %+v", events[index]) + } + index++ + } } - lastEventRevision = event.Revision } } +func validateResumable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) { + for _, op := range report.Watch { + index := 0 + revision := op.Request.Revision + for index < len(events) && (events[index].Revision < revision || !events[index].Match(op.Request)) { + index++ + } + if index == len(events) { + continue + } + firstEvent := firstWatchEvent(op) + // If watch is resumable, first event it gets should the first event that happened after the requested revision. + if firstEvent != nil && events[index] != *firstEvent { + t.Errorf("Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window, watch request: %+v, event missing: %+v", op.Request, events[index]) + } + } +} + +func firstRevision(op traffic.WatchOperation) int64 { + for _, resp := range op.Responses { + for _, event := range resp.Events { + return event.Revision + } + } + return 0 +} + +func firstWatchEvent(op traffic.WatchOperation) *model.WatchEvent { + for _, resp := range op.Responses { + for _, event := range resp.Events { + return &event + } + } + return nil +} + func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []model.WatchEvent { type revisionEvents struct { events []model.WatchEvent @@ -135,6 +185,8 @@ func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []mode events = append(events, event) } else { if prev, found := revisionToEvents[lastRevision]; found { + // This assumes that there are txn that would be observed differently by two watches. + // TODO: Implement merging events from multiple watches about single revision based on operations. if diff := cmp.Diff(prev.events, events); diff != "" { t.Errorf("Events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff) }