From fc1863086ce39a7f4b83230a5bb3d8d4cc37e5ac Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 6 Jun 2024 13:22:10 +0800 Subject: [PATCH] tests/robustness: unlock Delete/LeaseRevoke ops We should return token to that bucket if `nonUniqueWriteLimiter.Take()` return true. After unlock Delete/LeaseRevoke ops, the model should be updated for replay function. There are two updates for `toWatchEvents`. 1. When leaveRevokes op has deleted few keys, we should generate `delete-operation` events based on alphabetical order of deleted keys. 2. When putWithLease op hits non-exist lease, we should ignore that update event. Signed-off-by: Wei Fu --- tests/robustness/main_test.go | 2 + tests/robustness/model/deterministic.go | 85 +++++++++++++--------- tests/robustness/model/replay.go | 85 +++++++++++++++------- tests/robustness/report/wal.go | 2 +- tests/robustness/traffic/etcd.go | 6 +- tests/robustness/traffic/kubernetes.go | 8 +- tests/robustness/validate/patch_history.go | 2 + 7 files changed, 124 insertions(+), 66 deletions(-) diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index 1d078574f..a4b6d763a 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "math/rand" "testing" "time" @@ -36,6 +37,7 @@ import ( var testRunner = framework.E2eTestRunner func TestMain(m *testing.M) { + rand.Seed(time.Now().UnixNano()) testRunner.TestMain(m) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index f59bb91ab..57da61372 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "hash/fnv" + "maps" "reflect" "sort" @@ -75,6 +76,20 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState } +func (s EtcdState) DeepCopy() EtcdState { + newState := EtcdState{Revision: s.Revision} + + newState.KeyValues = maps.Clone(s.KeyValues) + newState.KeyLeases = maps.Clone(s.KeyLeases) + + newLeases := map[int64]EtcdLease{} + for key, val := range s.Leases { + newLeases[key] = val.DeepCopy() + } + newState.Leases = newLeases + return newState +} + func freshEtcdState() EtcdState { return EtcdState{ Revision: 1, @@ -86,25 +101,22 @@ func freshEtcdState() EtcdState { // Step handles a successful request, returning updated state and response it would generate. func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { - newKVs := map[string]ValueRevision{} - for k, v := range s.KeyValues { - newKVs[k] = v - } - s.KeyValues = newKVs + newState := s.DeepCopy() + switch request.Type { case Range: - if request.Range.Revision == 0 || request.Range.Revision == s.Revision { - resp := s.getRange(request.Range.RangeOptions) - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} + if request.Range.Revision == 0 || request.Range.Revision == newState.Revision { + resp := newState.getRange(request.Range.RangeOptions) + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: newState.Revision}} } - if request.Range.Revision > s.Revision { - return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()} + if request.Range.Revision > newState.Revision { + return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()} } - return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}} + return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}} case Txn: failure := false for _, cond := range request.Txn.Conditions { - if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision { + if val := newState.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision { failure = true break } @@ -119,27 +131,27 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { switch op.Type { case RangeOperation: opResp[i] = EtcdOperationResult{ - RangeResponse: s.getRange(op.Range), + RangeResponse: newState.getRange(op.Range), } case PutOperation: - _, leaseExists := s.Leases[op.Put.LeaseID] + _, leaseExists := newState.Leases[op.Put.LeaseID] if op.Put.LeaseID != 0 && !leaseExists { break } - s.KeyValues[op.Put.Key] = ValueRevision{ + newState.KeyValues[op.Put.Key] = ValueRevision{ Value: op.Put.Value, - ModRevision: s.Revision + 1, + ModRevision: newState.Revision + 1, } increaseRevision = true - s = detachFromOldLease(s, op.Put.Key) + newState = detachFromOldLease(newState, op.Put.Key) if leaseExists { - s = attachToNewLease(s, op.Put.LeaseID, op.Put.Key) + newState = attachToNewLease(newState, op.Put.LeaseID, op.Put.Key) } case DeleteOperation: - if _, ok := s.KeyValues[op.Delete.Key]; ok { - delete(s.KeyValues, op.Delete.Key) + if _, ok := newState.KeyValues[op.Delete.Key]; ok { + delete(newState.KeyValues, op.Delete.Key) increaseRevision = true - s = detachFromOldLease(s, op.Delete.Key) + newState = detachFromOldLease(newState, op.Delete.Key) opResp[i].Deleted = 1 } default: @@ -147,37 +159,37 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { } } if increaseRevision { - s.Revision++ + newState.Revision++ } - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision}} + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: newState.Revision}} case LeaseGrant: lease := EtcdLease{ LeaseID: request.LeaseGrant.LeaseID, Keys: map[string]struct{}{}, } - s.Leases[request.LeaseGrant.LeaseID] = lease - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseGrant: &LeaseGrantReponse{}}} + newState.Leases[request.LeaseGrant.LeaseID] = lease + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseGrant: &LeaseGrantReponse{}}} case LeaseRevoke: //Delete the keys attached to the lease keyDeleted := false - for key := range s.Leases[request.LeaseRevoke.LeaseID].Keys { + for key := range newState.Leases[request.LeaseRevoke.LeaseID].Keys { //same as delete. - if _, ok := s.KeyValues[key]; ok { + if _, ok := newState.KeyValues[key]; ok { if !keyDeleted { keyDeleted = true } - delete(s.KeyValues, key) - delete(s.KeyLeases, key) + delete(newState.KeyValues, key) + delete(newState.KeyLeases, key) } } //delete the lease - delete(s.Leases, request.LeaseRevoke.LeaseID) + delete(newState.Leases, request.LeaseRevoke.LeaseID) if keyDeleted { - s.Revision++ + newState.Revision++ } - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} case Defragment: - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}} + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}} default: panic(fmt.Sprintf("Unknown request type: %v", request.Type)) } @@ -377,6 +389,13 @@ type EtcdLease struct { Keys map[string]struct{} } +func (el EtcdLease) DeepCopy() EtcdLease { + return EtcdLease{ + LeaseID: el.LeaseID, + Keys: maps.Clone(el.Keys), + } +} + type ValueRevision struct { Value ValueOrHash ModRevision int64 diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go index d839c1976..b8ea78468 100644 --- a/tests/robustness/model/replay.go +++ b/tests/robustness/model/replay.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "sort" "strings" ) @@ -63,44 +64,72 @@ func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent } func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) { - if request.Type != Txn || response.Error != "" { + if response.Error != "" { return events } - var ops []EtcdOperation - if response.Txn.Failure { - ops = request.Txn.OperationsOnFailure - } else { - ops = request.Txn.OperationsOnSuccess - } - for _, op := range ops { - switch op.Type { - case RangeOperation: - case DeleteOperation: - e := PersistedEvent{ - Event: Event{ - Type: op.Type, - Key: op.Delete.Key, - }, - Revision: response.Revision, - } - if _, ok := prevState.KeyValues[op.Delete.Key]; ok { + + switch request.Type { + case Txn: + var ops []EtcdOperation + if response.Txn.Failure { + ops = request.Txn.OperationsOnFailure + } else { + ops = request.Txn.OperationsOnSuccess + } + for _, op := range ops { + switch op.Type { + case RangeOperation: + case DeleteOperation: + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Delete.Key, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Delete.Key]; ok { + events = append(events, e) + } + case PutOperation: + _, leaseExists := prevState.Leases[op.Put.LeaseID] + if op.Put.LeaseID != 0 && !leaseExists { + break + } + + e := PersistedEvent{ + Event: Event{ + Type: op.Type, + Key: op.Put.Key, + Value: op.Put.Value, + }, + Revision: response.Revision, + } + if _, ok := prevState.KeyValues[op.Put.Key]; !ok { + e.IsCreate = true + } events = append(events, e) + default: + panic(fmt.Sprintf("unsupported operation type: %v", op)) } - case PutOperation: + } + case LeaseRevoke: + deletedKeys := []string{} + for key := range prevState.Leases[request.LeaseRevoke.LeaseID].Keys { + if _, ok := prevState.KeyValues[key]; ok { + deletedKeys = append(deletedKeys, key) + } + } + + sort.Strings(deletedKeys) + for _, key := range deletedKeys { e := PersistedEvent{ Event: Event{ - Type: op.Type, - Key: op.Put.Key, - Value: op.Put.Value, + Type: DeleteOperation, + Key: key, }, Revision: response.Revision, } - if _, ok := prevState.KeyValues[op.Put.Key]; !ok { - e.IsCreate = true - } events = append(events, e) - default: - panic(fmt.Sprintf("unsupported operation type: %v", op)) } } return events diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index c1d8e5dec..9ef212332 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -172,7 +172,7 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { case raftReq.LeaseRevoke != nil: return &model.EtcdRequest{ Type: model.LeaseRevoke, - LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseGrant.ID}, + LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseRevoke.ID}, }, nil case raftReq.LeaseGrant != nil: return &model.EtcdRequest{ diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 4e80c633d..44ad21ddf 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -115,10 +115,12 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter return default: } + shouldReturn := false + // Avoid multiple failed writes in a row if lastOperationSucceeded { choices := t.requests - if !nonUniqueWriteLimiter.Take() { + if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn { choices = filterOutNonUniqueEtcdWrites(choices) } requestType = pickRandom(choices) @@ -126,7 +128,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter requestType = Get } rev, err := client.Request(ctx, requestType, lastRev) - if requestType == Delete || requestType == LeaseRevoke { + if shouldReturn { nonUniqueWriteLimiter.Return() } lastOperationSucceeded = err == nil diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 7ba278a90..4fbf8d0a7 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -154,15 +154,16 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticDelete(writeCtx, key, rev) nonUniqueWriteLimiter.Return() } else { + shouldReturn := false + choices := t.writeChoices - if !nonUniqueWriteLimiter.Take() { + if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn { choices = filterOutNonUniqueKubernetesWrites(t.writeChoices) } op := pickRandom(choices) switch op { case KubernetesDelete: _, err = kc.OptimisticDelete(writeCtx, key, rev) - nonUniqueWriteLimiter.Return() case KubernetesUpdate: _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: @@ -170,6 +171,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids default: panic(fmt.Sprintf("invalid choice: %q", op)) } + if shouldReturn { + nonUniqueWriteLimiter.Return() + } } } if err != nil { diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 44185de99..03e23e5bc 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste } } case model.LeaseGrant: + case model.LeaseRevoke: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati } case model.Range: case model.LeaseGrant: + case model.LeaseRevoke: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) }