From 2de719dea47d02071a3f2a0da04d758edecd7868 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 7 Apr 2024 20:09:24 +0200 Subject: [PATCH] Use WAL persisted request to validate watch Signed-off-by: Marek Siarkowicz --- tests/robustness/model/replay.go | 123 +++--- tests/robustness/report/wal.go | 6 +- tests/robustness/validate/operations.go | 4 +- tests/robustness/validate/validate.go | 163 ++----- tests/robustness/validate/validate_test.go | 478 +++++++++++++-------- tests/robustness/validate/watch.go | 67 +-- 6 files changed, 447 insertions(+), 394 deletions(-) diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go index 38968e351..d839c1976 100644 --- a/tests/robustness/model/replay.go +++ b/tests/robustness/model/replay.go @@ -19,82 +19,91 @@ import ( "strings" ) -func NewReplay(eventHistory []PersistedEvent) *EtcdReplay { - var lastEventRevision int64 = 1 - for _, event := range eventHistory { - if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { - panic("Replay requires a complete event history") +func NewReplay(persistedRequests []EtcdRequest) *EtcdReplay { + state := freshEtcdState() + // Padding for index 0 and 1, so index matches revision.. + revisionToEtcdState := []EtcdState{state, state} + var events []PersistedEvent + for _, request := range persistedRequests { + newState, response := state.Step(request) + if state.Revision != newState.Revision { + revisionToEtcdState = append(revisionToEtcdState, newState) } - lastEventRevision = event.Revision + for _, e := range toWatchEvents(&state, request, response) { + events = append(events, e) + } + state = newState } return &EtcdReplay{ - eventHistory: eventHistory, + revisionToEtcdState: revisionToEtcdState, + Events: events, } } type EtcdReplay struct { - eventHistory []PersistedEvent - - // Cached state and event index used for it's calculation - cachedState *EtcdState - eventHistoryIndex int + revisionToEtcdState []EtcdState + Events []PersistedEvent } func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) { - if revision < 1 { - return EtcdState{}, fmt.Errorf("invalid revision: %d", revision) - } - if r.cachedState == nil || r.cachedState.Revision > revision { - r.reset() + if int(revision) >= len(r.revisionToEtcdState) { + return EtcdState{}, fmt.Errorf("requested revision %d, higher than observed in replay %d", revision, len(r.revisionToEtcdState)-1) } + return r.revisionToEtcdState[revision], nil +} - for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision { - nextRequest, nextRevision, nextIndex := r.next() - newState, _ := r.cachedState.Step(nextRequest) - if newState.Revision != nextRevision { - return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision) +func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent) { + for _, e := range r.Events { + if e.Revision < watch.Revision || !e.Match(watch) { + continue } - r.cachedState = &newState - r.eventHistoryIndex = nextIndex + events = append(events, e) } - if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision { - return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision) - } - return *r.cachedState, nil + return events } -func (r *EtcdReplay) reset() { - state := freshEtcdState() - r.cachedState = &state - r.eventHistoryIndex = 0 -} - -func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) { - revision = r.eventHistory[r.eventHistoryIndex].Revision - index = r.eventHistoryIndex - operations := []EtcdOperation{} - for index < len(r.eventHistory) && r.eventHistory[index].Revision == revision { - event := r.eventHistory[index] - switch event.Type { - case PutOperation: - operations = append(operations, EtcdOperation{ - Type: event.Type, - Put: PutOptions{Key: event.Key, Value: event.Value}, - }) +func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) { + if request.Type != Txn || response.Error != "" { + return events + } + var ops []EtcdOperation + if response.Txn.Failure { + ops = request.Txn.OperationsOnFailure + } else { + ops = request.Txn.OperationsOnSuccess + } + for _, op := range ops { + switch op.Type { + case RangeOperation: case DeleteOperation: - operations = append(operations, EtcdOperation{ - Type: event.Type, - Delete: DeleteOptions{Key: event.Key}, - }) + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Delete.Key, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Delete.Key]; ok { + events = append(events, e) + } + case PutOperation: + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Put.Key, + Value: op.Put.Value, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Put.Key]; !ok { + e.IsCreate = true + } + events = append(events, e) + default: + panic(fmt.Sprintf("unsupported operation type: %v", op)) } - index++ } - return EtcdRequest{ - Type: Txn, - Txn: &TxnRequest{ - OperationsOnSuccess: operations, - }, - }, revision, index + return events } type WatchEvent struct { diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index 47d3383c0..96af2748c 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -181,7 +181,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { case raftReq.Compaction != nil: return nil, nil case raftReq.Txn != nil: - txn := model.TxnRequest{} + txn := model.TxnRequest{ + Conditions: []model.EtcdCondition{}, + OperationsOnSuccess: []model.EtcdOperation{}, + OperationsOnFailure: []model.EtcdOperation{}, + } for _, cmp := range raftReq.Txn.Compare { txn.Conditions = append(txn.Conditions, model.EtcdCondition{ Key: string(cmp.Key), diff --git a/tests/robustness/validate/operations.go b/tests/robustness/validate/operations.go index 9f0baeeae..83cd653a1 100644 --- a/tests/robustness/validate/operations.go +++ b/tests/robustness/validate/operations.go @@ -52,7 +52,7 @@ func validateLinearizableOperationsAndVisualize(lg *zap.Logger, operations []por } } -func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, totalEventHistory []model.PersistedEvent) { +func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, persistedRequests []model.EtcdRequest) { lg.Info("Validating serializable operations") staleReads := filterSerializableReads(operations) if len(staleReads) == 0 { @@ -61,7 +61,7 @@ func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []p sort.Slice(staleReads, func(i, j int) bool { return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision }) - replay := model.NewReplay(totalEventHistory) + replay := model.NewReplay(persistedRequests) for _, read := range staleReads { request := read.Input.(model.EtcdRequest) response := read.Output.(model.MaybeEtcdResponse) diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index a1aa4ddb6..5c49ee685 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -15,13 +15,12 @@ package validate import ( + "encoding/json" "fmt" - "sort" "testing" "time" "github.com/anishathalye/porcupine" - "github.com/google/go-cmp/cmp" "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -30,7 +29,7 @@ import ( // ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private. func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest, timeout time.Duration) (visualize func(basepath string) error) { - err := checkValidationAssumptions(reports) + err := checkValidationAssumptions(reports, persistedRequests) if err != nil { t.Fatalf("Broken validation assumptions: %s", err) } @@ -40,21 +39,12 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report t.Error("Failed linearization, skipping further validation") return visualize } - // TODO: Don't use watch events to get event history. - eventHistory, err := mergeWatchEventHistory(reports) - if err != nil { - t.Errorf("Failed merging watch history to create event history, err: %s", err) - err = validateWatch(lg, cfg, reports, nil) - if err != nil { - t.Errorf("Failed validating watch history, err: %s", err) - } - return visualize - } - err = validateWatch(lg, cfg, reports, eventHistory) + // TODO: Use requests from linearization instead of persisted requests from WAL. + err = validateWatch(lg, cfg, reports, persistedRequests) if err != nil { t.Errorf("Failed validating watch history, err: %s", err) } - validateSerializableOperations(t, lg, patchedOperations, eventHistory) + validateSerializableOperations(t, lg, patchedOperations, persistedRequests) return visualize } @@ -62,7 +52,7 @@ type Config struct { ExpectRevisionUnique bool } -func checkValidationAssumptions(reports []report.ClientReport) error { +func checkValidationAssumptions(reports []report.ClientReport, persistedRequests []model.EtcdRequest) error { err := validatePutOperationUnique(reports) if err != nil { return err @@ -71,11 +61,8 @@ func checkValidationAssumptions(reports []report.ClientReport) error { if err != nil { return err } - err = validateLastOperationAndObservedInWatch(reports) - if err != nil { - return err - } - err = validateObservedAllRevisionsInWatch(reports) + + err = validatePersistedRequestMatchClientRequests(reports, persistedRequests) if err != nil { return err } @@ -129,60 +116,48 @@ func validateEmptyDatabaseAtStart(reports []report.ClientReport) error { return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation") } -func validateLastOperationAndObservedInWatch(reports []report.ClientReport) error { - var lastOperation porcupine.Operation - +func validatePersistedRequestMatchClientRequests(reports []report.ClientReport, persistedRequests []model.EtcdRequest) error { + persistedRequestSet := map[string]model.EtcdRequest{} + for _, request := range persistedRequests { + data, err := json.Marshal(request) + if err != nil { + return err + } + persistedRequestSet[string(data)] = request + } + clientRequests := map[string]porcupine.Operation{} for _, r := range reports { for _, op := range r.KeyValue { - if op.Call > lastOperation.Call { - lastOperation = op + request := op.Input.(model.EtcdRequest) + data, err := json.Marshal(request) + if err != nil { + return err } + clientRequests[string(data)] = op } } - lastResponse := lastOperation.Output.(model.MaybeEtcdResponse) - if lastResponse.PartialResponse || lastResponse.Error != "" { - return fmt.Errorf("last operation %v failed, its success is required to validate watch", lastOperation) - } - for _, r := range reports { - for _, watch := range r.Watch { - for _, watchResp := range watch.Responses { - for _, e := range watchResp.Events { - if e.Revision == lastResponse.Revision { - return nil - } - } - } - } - } - return fmt.Errorf("revision from the last operation %d was not observed in watch, required to validate watch", lastResponse.Revision) -} -func validateObservedAllRevisionsInWatch(reports []report.ClientReport) error { - var maxRevision int64 - for _, r := range reports { - for _, watch := range r.Watch { - for _, watchResp := range watch.Responses { - for _, e := range watchResp.Events { - if e.Revision > maxRevision { - maxRevision = e.Revision - } - } - } + for requestDump, request := range persistedRequestSet { + _, found := clientRequests[requestDump] + // We cannot validate if persisted leaseGrant was sent by client as failed leaseGrant will not return LeaseID to clients. + if request.Type == model.LeaseGrant { + continue + } + + if !found { + return fmt.Errorf("request %+v was not sent by client, required to validate", requestDump) } } - observedRevisions := make([]bool, maxRevision+1) - for _, r := range reports { - for _, watch := range r.Watch { - for _, watchResp := range watch.Responses { - for _, e := range watchResp.Events { - observedRevisions[e.Revision] = true - } - } + + for requestDump, op := range clientRequests { + request := op.Input.(model.EtcdRequest) + response := op.Output.(model.MaybeEtcdResponse) + if response.Error != "" || request.IsRead() { + continue } - } - for i := 2; i < len(observedRevisions); i++ { - if !observedRevisions[i] { - return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i) + _, found := persistedRequestSet[requestDump] + if !found { + return fmt.Errorf("succesful client write %+v was not persisted, required to validate", requestDump) } } return nil @@ -204,59 +179,3 @@ func validateNonConcurrentClientRequests(reports []report.ClientReport) error { } return nil } - -func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) { - type revisionEvents struct { - events []model.PersistedEvent - revision int64 - clientID int - } - revisionToEvents := map[int64]revisionEvents{} - var lastClientID = 0 - var lastRevision int64 - events := []model.PersistedEvent{} - for _, r := range reports { - for _, op := range r.Watch { - for _, resp := range op.Responses { - for _, event := range resp.Events { - if event.Revision == lastRevision && lastClientID == r.ClientID { - events = append(events, event.PersistedEvent) - } else { - if prev, found := revisionToEvents[lastRevision]; found { - // This assumes that there are txn that would be observed differently by two watches. - // TODO: Implement merging events from multiple watches about single revision based on operations. - if diff := cmp.Diff(prev.events, events); diff != "" { - return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff) - } - } else { - revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision} - } - lastClientID = r.ClientID - lastRevision = event.Revision - events = []model.PersistedEvent{event.PersistedEvent} - } - } - } - } - } - if prev, found := revisionToEvents[lastRevision]; found { - if diff := cmp.Diff(prev.events, events); diff != "" { - return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff) - } - } else { - revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision} - } - - var allRevisionEvents []revisionEvents - for _, revEvents := range revisionToEvents { - allRevisionEvents = append(allRevisionEvents, revEvents) - } - sort.Slice(allRevisionEvents, func(i, j int) bool { - return allRevisionEvents[i].revision < allRevisionEvents[j].revision - }) - var eventHistory []model.PersistedEvent - for _, revEvents := range allRevisionEvents { - eventHistory = append(eventHistory, revEvents.events...) - } - return eventHistory, nil -} diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index 256b07898..4a01a55cd 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -61,11 +61,11 @@ func TestDataReports(t *testing.T) { func TestValidateWatch(t *testing.T) { tcs := []struct { - name string - config Config - reports []report.ClientReport - eventHistory []model.PersistedEvent - expectError string + name string + config Config + reports []report.ClientReport + persistedRequests []model.EtcdRequest + expectError string }{ { name: "Ordered, Unique - ordered unique events in one response - pass", @@ -88,6 +88,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, }, { name: "Ordered, Unique - unique ordered events in separate response - pass", @@ -114,6 +118,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, }, { name: "Ordered - unordered events in one response - fail", @@ -136,6 +144,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, expectError: errBrokeOrdered.Error(), }, { @@ -163,6 +175,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, expectError: errBrokeOrdered.Error(), }, { @@ -197,6 +213,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, }, { name: "Unique - duplicated events in one response - fail", @@ -219,6 +239,9 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + }, expectError: errBrokeUnique.Error(), }, { @@ -247,6 +270,9 @@ func TestValidateWatch(t *testing.T) { }, }, expectError: errBrokeUnique.Error(), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + }, }, { name: "Unique - duplicated events in watch requests - pass", @@ -272,7 +298,7 @@ func TestValidateWatch(t *testing.T) { Responses: []model.WatchResponse{ { Events: []model.WatchEvent{ - putWatchEvent("a", "2", 2, true), + putWatchEvent("a", "1", 2, true), }, }, }, @@ -280,6 +306,9 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + }, }, { name: "Unique, Atomic - duplicated revision in one response - pass", @@ -302,6 +331,35 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + { + Type: model.Txn, + LeaseGrant: nil, + LeaseRevoke: nil, + Range: nil, + Txn: &model.TxnRequest{ + Conditions: nil, + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "a", + Value: model.ToValueOrHash("1"), + }, + }, + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "b", + Value: model.ToValueOrHash("2"), + }, + }, + }, + OperationsOnFailure: nil, + }, + Defragment: nil, + }, + }, }, { name: "Unique - duplicated revision in separate watch request - pass", @@ -335,6 +393,9 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + }, }, { name: "Unique revision - duplicated revision in one response - fail", @@ -358,6 +419,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, expectError: errBrokeUnique.Error(), }, { @@ -382,6 +447,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, expectError: errBrokeUnique.Error(), }, { @@ -409,6 +478,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, expectError: errBrokeAtomic.Error(), }, { @@ -438,10 +511,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -474,10 +547,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -499,10 +572,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -525,10 +598,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -564,10 +637,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -593,10 +666,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -631,10 +704,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -687,10 +760,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -719,10 +792,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeBookmarkable.Error(), }, @@ -756,10 +829,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeBookmarkable.Error(), }, @@ -786,8 +859,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{}, - expectError: errBrokeBookmarkable.Error(), + persistedRequests: []model.EtcdRequest{}, + expectError: errBrokeBookmarkable.Error(), }, { name: "Bookmarkable - progress notification lower than watch request - pass", @@ -809,8 +882,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, }, { @@ -836,7 +909,7 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{}, + persistedRequests: []model.EtcdRequest{}, }, { name: "Reliable - missing event before bookmark - fail", @@ -863,10 +936,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -894,10 +967,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + putRequest("c", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -926,10 +999,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("aa", "1", 2, true), - putPersistedEvent("ab", "2", 3, true), - putPersistedEvent("cc", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("aa", "1"), + putRequest("ab", "2"), + putRequest("cc", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -955,10 +1028,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: "", }, @@ -982,8 +1055,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, expectError: "", }, @@ -1008,8 +1081,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, expectError: "", }, @@ -1032,8 +1105,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, expectError: "", }, @@ -1061,8 +1134,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, expectError: "", }, @@ -1086,8 +1159,8 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), }, expectError: errBrokeReliable.Error(), }, @@ -1112,10 +1185,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeReliable.Error(), }, @@ -1140,10 +1213,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("ab", "2", 3, true), - putPersistedEvent("a", "3", 4, false), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("ab", "2"), + putRequest("a", "3"), }, }, { @@ -1168,10 +1241,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("aa", "1", 2, true), - putPersistedEvent("bb", "2", 3, true), - putPersistedEvent("ac", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("aa", "1"), + putRequest("bb", "2"), + putRequest("ac", "3"), }, }, { @@ -1195,10 +1268,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -1222,10 +1295,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -1254,10 +1327,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -1282,10 +1355,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -1309,10 +1382,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, }, { @@ -1338,10 +1411,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("bb", "2", 3, true), - putPersistedEvent("bc", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("bb", "2"), + putRequest("bc", "3"), }, }, { @@ -1365,10 +1438,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("c", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("c", "3"), }, expectError: errBrokeResumable.Error(), }, @@ -1385,7 +1458,7 @@ func TestValidateWatch(t *testing.T) { Responses: []model.WatchResponse{ { Events: []model.WatchEvent{ - putWatchEvent("b", "3", 4, true), + putWatchEvent("b", "3", 4, false), }, }, }, @@ -1393,10 +1466,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("b", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + putRequest("b", "3"), }, expectError: errBrokeResumable.Error(), }, @@ -1422,10 +1495,10 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("bb", "2", 3, true), - putPersistedEvent("bc", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("bb", "2"), + putRequest("bc", "3"), }, expectError: errBrokeResumable.Error(), }, @@ -1452,11 +1525,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, }, { @@ -1482,11 +1555,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokeIsCreate.Error(), }, @@ -1513,11 +1586,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokeIsCreate.Error(), }, @@ -1545,11 +1618,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, }, { @@ -1576,11 +1649,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, }, { @@ -1607,11 +1680,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokePrevKV.Error(), }, @@ -1639,11 +1712,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokePrevKV.Error(), }, @@ -1671,11 +1744,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokePrevKV.Error(), }, @@ -1703,11 +1776,11 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("a", "2", 3, false), - deletePersistedEvent("a", 4), - putPersistedEvent("a", "4", 5, true), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("a", "2"), + deleteRequest("a"), + putRequest("a", "4"), }, expectError: errBrokePrevKV.Error(), }, @@ -1725,7 +1798,6 @@ func TestValidateWatch(t *testing.T) { Events: []model.WatchEvent{ putWatchEvent("a", "1", 2, true), putWatchEvent("b", "2", 3, true), - putWatchEvent("a", "3", 4, false), }, }, }, @@ -1733,10 +1805,9 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("a", "1", 2, true), - putPersistedEvent("b", "2", 3, true), - putPersistedEvent("a", "3", 4, false), + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), }, expectError: errBrokeFilter.Error(), }, @@ -1763,17 +1834,17 @@ func TestValidateWatch(t *testing.T) { }, }, }, - eventHistory: []model.PersistedEvent{ - putPersistedEvent("aa", "1", 2, true), - putPersistedEvent("bb", "2", 3, true), - putPersistedEvent("ac", "3", 4, true), + persistedRequests: []model.EtcdRequest{ + putRequest("aa", "1"), + putRequest("bb", "2"), + putRequest("ac", "3"), }, expectError: errBrokeFilter.Error(), }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - err := validateWatch(zaptest.NewLogger(t), tc.config, tc.reports, tc.eventHistory) + err := validateWatch(zaptest.NewLogger(t), tc.config, tc.reports, tc.persistedRequests) var errStr string if err != nil { errStr = err.Error() @@ -1838,3 +1909,48 @@ func deletePersistedEvent(key string, rev int64) model.PersistedEvent { Revision: rev, } } + +func putRequest(key, value string) model.EtcdRequest { + return model.EtcdRequest{ + Type: model.Txn, + LeaseGrant: nil, + LeaseRevoke: nil, + Range: nil, + Txn: &model.TxnRequest{ + Conditions: nil, + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: key, + Value: model.ToValueOrHash(value), + }, + }, + }, + OperationsOnFailure: nil, + }, + Defragment: nil, + } +} + +func deleteRequest(key string) model.EtcdRequest { + return model.EtcdRequest{ + Type: model.Txn, + LeaseGrant: nil, + LeaseRevoke: nil, + Range: nil, + Txn: &model.TxnRequest{ + Conditions: nil, + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.DeleteOperation, + Delete: model.DeleteOptions{ + Key: key, + }, + }, + }, + OperationsOnFailure: nil, + }, + Defragment: nil, + } +} diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index 4b248971f..1114a43e3 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -37,9 +37,10 @@ var ( errBrokeFilter = errors.New("event not matching watch filter") ) -func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) error { +func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest) error { lg.Info("Validating watch") // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis + replay := model.NewReplay(persistedRequests) for _, r := range reports { err := validateFilter(lg, r) if err != nil { @@ -61,23 +62,21 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev if err != nil { return err } - if eventHistory != nil { - err = validateResumable(lg, eventHistory, r) - if err != nil { - return err - } - err = validateReliable(lg, eventHistory, r) - if err != nil { - return err - } - err = validatePrevKV(lg, r, eventHistory) - if err != nil { - return err - } - err = validateEventIsCreate(lg, r, eventHistory) - if err != nil { - return err - } + err = validateResumable(lg, replay, r) + if err != nil { + return err + } + err = validateReliable(lg, replay, r) + if err != nil { + return err + } + err = validatePrevKV(lg, replay, r) + if err != nil { + return err + } + err = validateIsCreate(lg, replay, r) + if err != nil { + return err } } return nil @@ -182,10 +181,11 @@ func validateAtomic(lg *zap.Logger, report report.ClientReport) (err error) { return err } -func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) { +func validateReliable(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) { for _, watch := range report.Watch { firstRev := firstExpectedRevision(watch) lastRev := lastRevision(watch) + events := replay.EventsForWatch(watch.Request) wantEvents := []model.PersistedEvent{} if firstRev != 0 { for _, e := range events { @@ -214,22 +214,23 @@ func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report repo return err } -func validateResumable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) { - for _, op := range report.Watch { - if op.Request.Revision == 0 { +func validateResumable(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) { + for _, watch := range report.Watch { + if watch.Request.Revision == 0 { continue } + events := replay.EventsForWatch(watch.Request) index := 0 - for index < len(events) && (events[index].Revision < op.Request.Revision || !events[index].Match(op.Request)) { + for index < len(events) && (events[index].Revision < watch.Request.Revision || !events[index].Match(watch.Request)) { index++ } if index == len(events) { continue } - firstEvent := firstWatchEvent(op) + firstEvent := firstWatchEvent(watch) // If watch is resumable, first event it gets should the first event that happened after the requested revision. if firstEvent != nil && events[index] != firstEvent.PersistedEvent { - lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", op.Request), zap.Any("got-event", *firstEvent), zap.Any("want-event", events[index])) + lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("got-event", *firstEvent), zap.Any("want-event", events[index])) err = errBrokeResumable } } @@ -238,8 +239,7 @@ func validateResumable(lg *zap.Logger, events []model.PersistedEvent, report rep // validatePrevKV ensures that a watch response (if configured with WithPrevKV()) returns // the appropriate response. -func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model.PersistedEvent) (err error) { - replay := model.NewReplay(history) +func validatePrevKV(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) { for _, op := range report.Watch { if !op.Request.WithPrevKV { continue @@ -247,7 +247,10 @@ func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model. for _, resp := range op.Responses { for _, event := range resp.Events { // Get state state just before the current event. - state, _ := replay.StateForRevision(event.Revision - 1) + state, err2 := replay.StateForRevision(event.Revision - 1) + if err2 != nil { + panic(err2) + } // TODO(MadhavJivrajani): check if compaction has been run as part // of failpoint injection. If compaction has run, prevKV can be nil // even if it is not a create event. @@ -271,13 +274,15 @@ func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model. return err } -func validateEventIsCreate(lg *zap.Logger, report report.ClientReport, history []model.PersistedEvent) (err error) { - replay := model.NewReplay(history) +func validateIsCreate(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) { for _, op := range report.Watch { for _, resp := range op.Responses { for _, event := range resp.Events { // Get state state just before the current event. - state, _ := replay.StateForRevision(event.Revision - 1) + state, err2 := replay.StateForRevision(event.Revision - 1) + if err2 != nil { + panic(err2) + } // A create event will not have an entry in our history and a non-create // event *should* have an entry in our history. if _, prevKeyExists := state.KeyValues[event.Key]; event.IsCreate == prevKeyExists {