From 0ee0da4d1e191bb087d5d8fde36d438e961a7249 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 23 Jun 2024 13:28:14 +0200 Subject: [PATCH 1/7] Don't validate watch patching time as we no longer use watch to decide to discard Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 34 +++++----------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index b77a61e92..9a85d2dea 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -66,7 +66,6 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]client.Tim func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) - lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents) for _, op := range operations { request := op.Input.(model.EtcdRequest) @@ -77,16 +76,14 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve continue } var resourceVersion int64 - if op.Call <= lastObservedOperation.Call { - matchingEvent := matchWatchEvent(request.Txn, watchEvents) - if matchingEvent != nil { - eventTime := matchingEvent.Time.Nanoseconds() - // Set revision and time based on watchEvent. - if eventTime < op.Return { - op.Return = eventTime - } - resourceVersion = matchingEvent.Revision + matchingEvent := matchWatchEvent(request.Txn, watchEvents) + if matchingEvent != nil { + eventTime := matchingEvent.Time.Nanoseconds() + // Set revision and time based on watchEvent. + if eventTime < op.Return { + op.Return = eventTime } + resourceVersion = matchingEvent.Revision } persistedReturnTime := matchReturnTime(request, persistedOperations) if persistedReturnTime != nil { @@ -113,23 +110,6 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve return newOperations } -func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent) porcupine.Operation { - var maxCallTime int64 - var lastOperation porcupine.Operation - for _, op := range operations { - request := op.Input.(model.EtcdRequest) - if request.Type != model.Txn { - continue - } - event := matchWatchEvent(request.Txn, watchEvents) - if event != nil && op.Call > maxCallTime { - maxCallTime = op.Call - lastOperation = op - } - } - return lastOperation -} - func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]client.TimedWatchEvent) *client.TimedWatchEvent { for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { if etcdOp.Type == model.PutOperation { From a47ad53548c02d35ce85a8433754813541fbe7dc Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 23 Jun 2024 13:48:14 +0200 Subject: [PATCH 2/7] Refactor patchOperations to handle each operation separetly, which is useful to implement patching for non-put Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 79 +++++++++------------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 9a85d2dea..c79f049aa 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -75,30 +75,45 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve newOperations = append(newOperations, op) continue } - var resourceVersion int64 - matchingEvent := matchWatchEvent(request.Txn, watchEvents) - if matchingEvent != nil { - eventTime := matchingEvent.Time.Nanoseconds() - // Set revision and time based on watchEvent. - if eventTime < op.Return { - op.Return = eventTime - } - resourceVersion = matchingEvent.Revision - } - persistedReturnTime := matchReturnTime(request, persistedOperations) - if persistedReturnTime != nil { - // Set return time based on persisted return time. - if *persistedReturnTime < op.Return { - op.Return = *persistedReturnTime + txnPersisted := false + var txnRevision int64 = 0 + for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + switch operation.Type { + case model.PutOperation: + event, ok := watchEvents[model.Event{ + Type: operation.Type, + Key: operation.Put.Key, + Value: operation.Put.Value, + }] + if ok { + eventTime := event.Time.Nanoseconds() + // Set revision and time based on watchEvent. + if eventTime < op.Return { + op.Return = eventTime + } + txnRevision = event.Revision + } + persistedReturnTime, ok := persistedOperations[operation] + if ok { + // Set return time based on persisted return time. + if persistedReturnTime < op.Return { + op.Return = persistedReturnTime + } + txnPersisted = true + } + case model.DeleteOperation: + case model.RangeOperation: + default: + panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } if isUniqueTxn(request.Txn) { - if persistedReturnTime == nil { + if !txnPersisted { // Remove non persisted operations continue } else { - if resourceVersion != 0 { - op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion} + if txnRevision != 0 { + op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision} } else { op.Output = model.MaybeEtcdResponse{Persisted: true} } @@ -110,22 +125,6 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve return newOperations } -func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]client.TimedWatchEvent) *client.TimedWatchEvent { - for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { - if etcdOp.Type == model.PutOperation { - event, ok := watchEvents[model.Event{ - Type: etcdOp.Type, - Key: etcdOp.Put.Key, - Value: etcdOp.Put.Value, - }] - if ok { - return &event - } - } - } - return nil -} - func isUniqueTxn(request *model.TxnRequest) bool { return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure)) } @@ -238,15 +237,3 @@ func requestReturnTime(operationTime map[model.EtcdOperation]int64, request mode panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } } - -func matchReturnTime(request model.EtcdRequest, persistedOperations map[model.EtcdOperation]int64) *int64 { - for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if etcdOp.Type != model.PutOperation { - continue - } - if returnTime, found := persistedOperations[etcdOp]; found { - return &returnTime - } - } - return nil -} From 1dc21656a9aa0d5ffd73e4b4f5828e8068edc097 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 23 Jun 2024 14:31:15 +0200 Subject: [PATCH 3/7] Unify patchOperation arguments to be map from keyValue to int64 Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/operations_test.go | 46 ++++---- tests/robustness/validate/patch_history.go | 108 ++++++++++++------- 2 files changed, 92 insertions(+), 62 deletions(-) diff --git a/tests/robustness/validate/operations_test.go b/tests/robustness/validate/operations_test.go index 3117322c7..c6b7afab9 100644 --- a/tests/robustness/validate/operations_test.go +++ b/tests/robustness/validate/operations_test.go @@ -46,56 +46,56 @@ func TestValidateSerializableOperations(t *testing.T) { }, { Input: rangeRequest("a", "z", 2, 0), - Output: rangeResponse(1, keyValue("a", "1", 2)), + Output: rangeResponse(1, keyValueRevision("a", "1", 2)), }, { Input: rangeRequest("a", "z", 3, 0), Output: rangeResponse(2, - keyValue("a", "1", 2), - keyValue("b", "2", 3), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), ), }, { Input: rangeRequest("a", "z", 4, 0), Output: rangeResponse(3, - keyValue("a", "1", 2), - keyValue("b", "2", 3), - keyValue("c", "3", 4), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), + keyValueRevision("c", "3", 4), ), }, { Input: rangeRequest("a", "z", 4, 3), Output: rangeResponse(3, - keyValue("a", "1", 2), - keyValue("b", "2", 3), - keyValue("c", "3", 4), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), + keyValueRevision("c", "3", 4), ), }, { Input: rangeRequest("a", "z", 4, 4), Output: rangeResponse(3, - keyValue("a", "1", 2), - keyValue("b", "2", 3), - keyValue("c", "3", 4), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), + keyValueRevision("c", "3", 4), ), }, { Input: rangeRequest("a", "z", 4, 2), Output: rangeResponse(3, - keyValue("a", "1", 2), - keyValue("b", "2", 3), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), ), }, { Input: rangeRequest("b\x00", "z", 4, 2), Output: rangeResponse(1, - keyValue("c", "3", 4), + keyValueRevision("c", "3", 4), ), }, { Input: rangeRequest("b", "", 4, 0), Output: rangeResponse(1, - keyValue("b", "2", 3), + keyValueRevision("b", "2", 3), ), }, { @@ -115,9 +115,9 @@ func TestValidateSerializableOperations(t *testing.T) { { Input: rangeRequest("a", "z", 4, 0), Output: rangeResponse(3, - keyValue("c", "3", 4), - keyValue("b", "2", 3), - keyValue("a", "1", 2), + keyValueRevision("c", "3", 4), + keyValueRevision("b", "2", 3), + keyValueRevision("a", "1", 2), ), }, }, @@ -149,7 +149,7 @@ func TestValidateSerializableOperations(t *testing.T) { { Input: rangeRequest("a", "z", 2, 0), Output: rangeResponse(3, - keyValue("b", "2", 3), + keyValueRevision("b", "2", 3), ), }, }, @@ -166,8 +166,8 @@ func TestValidateSerializableOperations(t *testing.T) { { Input: rangeRequest("a", "z", 2, 0), Output: rangeResponse(3, - keyValue("a", "1", 2), - keyValue("b", "2", 3), + keyValueRevision("a", "1", 2), + keyValueRevision("b", "2", 3), ), }, }, @@ -284,7 +284,7 @@ func errorResponse(err error) model.MaybeEtcdResponse { } } -func keyValue(key, value string, rev int64) model.KeyValue { +func keyValueRevision(key, value string, rev int64) model.KeyValue { return model.KeyValue{ Key: key, ValueRevision: model.ValueRevision{ diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index c79f049aa..1daf2f7e1 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -19,16 +19,16 @@ import ( "github.com/anishathalye/porcupine" - "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/report" ) func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := relevantOperations(reports) - uniqueEvents := uniqueWatchEvents(reports) - operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests) - return patchOperations(allOperations, uniqueEvents, operationsReturnTime) + watchRevision := requestRevision(reports) + watchReturnTime := watchReturnTime(reports) + persistedReturnTime := persistedReturnTime(allOperations, persistedRequests) + return patchOperations(allOperations, watchRevision, watchReturnTime, persistedReturnTime) } func relevantOperations(reports []report.ClientReport) []porcupine.Operation { @@ -46,25 +46,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]client.TimedWatchEvent { - persisted := map[model.Event]client.TimedWatchEvent{} - for _, r := range reports { - for _, op := range r.Watch { - for _, resp := range op.Responses { - for _, event := range resp.Events { - responseTime := resp.Time - if prev, found := persisted[event.Event]; found && prev.Time < responseTime { - responseTime = prev.Time - } - persisted[event.Event] = client.TimedWatchEvent{Time: responseTime, WatchEvent: event} - } - } - } - } - return persisted -} - -func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, watchRevision, watchReturnTime, persistedReturnTime map[keyValue]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -80,20 +62,15 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { switch operation.Type { case model.PutOperation: - event, ok := watchEvents[model.Event{ - Type: operation.Type, - Key: operation.Put.Key, - Value: operation.Put.Value, - }] + kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} + revision, ok := watchRevision[kv] if ok { - eventTime := event.Time.Nanoseconds() - // Set revision and time based on watchEvent. - if eventTime < op.Return { - op.Return = eventTime - } - txnRevision = event.Revision + txnRevision = revision } - persistedReturnTime, ok := persistedOperations[operation] + if t, ok := watchReturnTime[kv]; ok && t < op.Return { + op.Return = t + } + persistedReturnTime, ok := persistedReturnTime[kv] if ok { // Set return time based on persisted return time. if persistedReturnTime < op.Return { @@ -147,9 +124,9 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { return false } -func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 { +func persistedReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[keyValue]int64 { operationReturnTime := operationReturnTime(allOperations) - persisted := map[model.EtcdOperation]int64{} + persisted := map[keyValue]int64{} lastReturnTime := maxReturnTime(operationReturnTime) @@ -163,11 +140,12 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste if op.Type != model.PutOperation { continue } - if _, found := persisted[op]; found { + kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} + if _, found := persisted[kv]; found { panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op)) } hasPut = true - persisted[op] = lastReturnTime + persisted[kv] = lastReturnTime } if hasPut { newReturnTime := requestReturnTime(operationReturnTime, request) @@ -237,3 +215,55 @@ func requestReturnTime(operationTime map[model.EtcdOperation]int64, request mode panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } } + +func watchReturnTime(reports []report.ClientReport) map[keyValue]int64 { + earliestTime := map[keyValue]int64{} + for _, client := range reports { + for _, watch := range client.Watch { + for _, resp := range watch.Responses { + + for _, event := range resp.Events { + switch event.Type { + case model.RangeOperation: + case model.PutOperation: + kv := keyValue{Key: event.Key, Value: event.Value} + if t, ok := earliestTime[kv]; !ok || t > resp.Time.Nanoseconds() { + earliestTime[kv] = resp.Time.Nanoseconds() + } + case model.DeleteOperation: + default: + panic(fmt.Sprintf("unknown event type %q", event.Type)) + } + } + } + } + } + return earliestTime +} + +func requestRevision(reports []report.ClientReport) map[keyValue]int64 { + requestRevision := map[keyValue]int64{} + for _, client := range reports { + for _, watch := range client.Watch { + for _, resp := range watch.Responses { + for _, event := range resp.Events { + switch event.Type { + case model.RangeOperation: + case model.PutOperation: + kv := keyValue{Key: event.Key, Value: event.Value} + requestRevision[kv] = event.Revision + case model.DeleteOperation: + default: + panic(fmt.Sprintf("unknown event type %q", event.Type)) + } + } + } + } + } + return requestRevision +} + +type keyValue struct { + Key string + Value model.ValueOrHash +} From eeeda9e6892836ca98955fe0cd2d75a7c093c5c2 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 23 Jun 2024 23:08:07 +0200 Subject: [PATCH 4/7] Unify calculating returnTime Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 161 ++++++++++----------- 1 file changed, 74 insertions(+), 87 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 1daf2f7e1..ccd4c1e81 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -26,9 +26,9 @@ import ( func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := relevantOperations(reports) watchRevision := requestRevision(reports) - watchReturnTime := watchReturnTime(reports) - persistedReturnTime := persistedReturnTime(allOperations, persistedRequests) - return patchOperations(allOperations, watchRevision, watchReturnTime, persistedReturnTime) + returnTime := returnTime(allOperations, reports, persistedRequests) + persistedRequestsCount := countPersistedRequests(persistedRequests) + return patchOperations(allOperations, watchRevision, returnTime, persistedRequestsCount) } func relevantOperations(reports []report.ClientReport) []porcupine.Operation { @@ -46,7 +46,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func patchOperations(operations []porcupine.Operation, watchRevision, watchReturnTime, persistedReturnTime map[keyValue]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -67,15 +67,11 @@ func patchOperations(operations []porcupine.Operation, watchRevision, watchRetur if ok { txnRevision = revision } - if t, ok := watchReturnTime[kv]; ok && t < op.Return { + if t, ok := returnTime[kv]; ok && t < op.Return { op.Return = t } - persistedReturnTime, ok := persistedReturnTime[kv] + _, ok = persistedRequestCount[kv] if ok { - // Set return time based on persisted return time. - if persistedReturnTime < op.Return { - op.Return = persistedReturnTime - } txnPersisted = true } case model.DeleteOperation: @@ -124,48 +120,10 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { return false } -func persistedReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[keyValue]int64 { - operationReturnTime := operationReturnTime(allOperations) - persisted := map[keyValue]int64{} - - lastReturnTime := maxReturnTime(operationReturnTime) - - for i := len(persistedRequests) - 1; i >= 0; i-- { - request := persistedRequests[i] - switch request.Type { - case model.Txn: - hasPut := false - lastReturnTime-- - for _, op := range request.Txn.OperationsOnSuccess { - if op.Type != model.PutOperation { - continue - } - kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} - if _, found := persisted[kv]; found { - panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op)) - } - hasPut = true - persisted[kv] = lastReturnTime - } - if hasPut { - newReturnTime := requestReturnTime(operationReturnTime, request) - if newReturnTime != -1 { - lastReturnTime = min(lastReturnTime, newReturnTime) - } - } - case model.LeaseGrant: - case model.LeaseRevoke: - case model.Compact: - default: - panic(fmt.Sprintf("Unknown request type: %q", request.Type)) - } - } - return persisted -} - -func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 { - newOperations := map[model.EtcdOperation]int64{} - for _, op := range operations { +func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 { + earliestReturnTime := map[keyValue]int64{} + var lastReturnTime int64 = 0 + for _, op := range allOperations { request := op.Input.(model.EtcdRequest) switch request.Type { case model.Txn: @@ -173,10 +131,11 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati if etcdOp.Type != model.PutOperation { continue } - if _, found := newOperations[etcdOp]; found { + kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} + if _, found := earliestReturnTime[kv]; found { panic("Unexpected duplicate event in persisted requests.") } - newOperations[etcdOp] = op.Return + earliestReturnTime[kv] = op.Return } case model.Range: case model.LeaseGrant: @@ -185,39 +144,11 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } - } - return newOperations -} - -func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 { - var maxReturnTime int64 - for _, returnTime := range operationTime { - if returnTime > maxReturnTime { - maxReturnTime = returnTime + if op.Return > lastReturnTime { + lastReturnTime = op.Return } } - return maxReturnTime -} -func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 { - switch request.Type { - case model.Txn: - for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if op.Type != model.PutOperation { - continue - } - if time, found := operationTime[op]; found { - return time - } - } - return -1 - default: - panic(fmt.Sprintf("Unknown request type: %q", request.Type)) - } -} - -func watchReturnTime(reports []report.ClientReport) map[keyValue]int64 { - earliestTime := map[keyValue]int64{} for _, client := range reports { for _, watch := range client.Watch { for _, resp := range watch.Responses { @@ -227,8 +158,8 @@ func watchReturnTime(reports []report.ClientReport) map[keyValue]int64 { case model.RangeOperation: case model.PutOperation: kv := keyValue{Key: event.Key, Value: event.Value} - if t, ok := earliestTime[kv]; !ok || t > resp.Time.Nanoseconds() { - earliestTime[kv] = resp.Time.Nanoseconds() + if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime[kv] = resp.Time.Nanoseconds() } case model.DeleteOperation: default: @@ -238,7 +169,63 @@ func watchReturnTime(reports []report.ClientReport) map[keyValue]int64 { } } } - return earliestTime + + for i := len(persistedRequests) - 1; i >= 0; i-- { + request := persistedRequests[i] + switch request.Type { + case model.Txn: + lastReturnTime-- + for _, op := range request.Txn.OperationsOnSuccess { + if op.Type != model.PutOperation { + continue + } + kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} + returnTime, ok := earliestReturnTime[kv] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime[kv] = lastReturnTime + } + } + case model.LeaseGrant: + case model.LeaseRevoke: + case model.Compact: + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } + } + return earliestReturnTime +} + +func countPersistedRequests(requests []model.EtcdRequest) map[keyValue]int64 { + counter := map[keyValue]int64{} + for _, request := range requests { + countRequest(counter, request) + } + return counter +} + +func countRequest(counter map[keyValue]int64, request model.EtcdRequest) { + switch request.Type { + case model.Txn: + for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + switch operation.Type { + case model.PutOperation: + kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} + counter[kv] += 1 + case model.DeleteOperation: + case model.RangeOperation: + default: + panic(fmt.Sprintf("unknown operation type %q", operation.Type)) + } + } + case model.LeaseGrant: + case model.LeaseRevoke: + case model.Compact: + case model.Defragment: + case model.Range: + default: + panic(fmt.Sprintf("unknown request type %q", request.Type)) + } } func requestRevision(reports []report.ClientReport) map[keyValue]int64 { From 8ffe92169f358fa260bd87114ff2b610da827d3c Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 27 Jun 2024 22:36:45 +0200 Subject: [PATCH 5/7] Refactor watch response util functions for unit tests Signed-off-by: Marek Siarkowicz --- .../robustness/validate/patch_history_test.go | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index ff18097ad..202d6411a 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -93,7 +93,7 @@ func TestPatchHistory(t *testing.T) { persistedRequest: []model.EtcdRequest{ putRequest("key", "value"), }, - watchOperations: watchPutEvent("key", "value", 2, 3), + watchOperations: watchResponse(3, putEvent("key", "value", 2)), expectedRemainingOperations: []porcupine.Operation{ {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, }, @@ -183,7 +183,7 @@ func TestPatchHistory(t *testing.T) { persistedRequest: []model.EtcdRequest{ putRequest("key", "value"), }, - watchOperations: watchDeleteEvent("key", 2, 3), + watchOperations: watchResponse(3, deleteEvent("key", 2)), expectedRemainingOperations: []porcupine.Operation{ {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, @@ -325,49 +325,40 @@ func putResponse(result ...model.EtcdOperationResult) model.MaybeEtcdResponse { return model.MaybeEtcdResponse{EtcdResponse: model.EtcdResponse{Txn: &model.TxnResponse{Results: result}}} } -func watchPutEvent(key, value string, revision, responseTime int64) []model.WatchOperation { +func watchResponse(responseTime int64, events ...model.WatchEvent) []model.WatchOperation { return []model.WatchOperation{ { Responses: []model.WatchResponse{ { - Time: time.Duration(responseTime), - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: model.Event{ - Type: model.PutOperation, - Key: key, - Value: model.ToValueOrHash(value), - }, - Revision: revision, - }, - }, - }, + Time: time.Duration(responseTime), + Events: events, }, }, }, } } -func watchDeleteEvent(key string, revision, responseTime int64) []model.WatchOperation { - return []model.WatchOperation{ - { - Responses: []model.WatchResponse{ - { - Time: time.Duration(responseTime), - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: model.Event{ - Type: model.DeleteOperation, - Key: key, - }, - Revision: revision, - }, - }, - }, - }, +func putEvent(key, value string, revision int64) model.WatchEvent { + return model.WatchEvent{ + PersistedEvent: model.PersistedEvent{ + Event: model.Event{ + Type: model.PutOperation, + Key: key, + Value: model.ToValueOrHash(value), }, + Revision: revision, + }, + } +} + +func deleteEvent(key string, revision int64) model.WatchEvent { + return model.WatchEvent{ + PersistedEvent: model.PersistedEvent{ + Event: model.Event{ + Type: model.DeleteOperation, + Key: key, + }, + Revision: revision, }, } } From a7c38430ad39eda0bf01025a0873534e82837644 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 23 Jun 2024 23:28:44 +0200 Subject: [PATCH 6/7] Use client request count for uniqness Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 74 +++++++++++-------- .../robustness/validate/patch_history_test.go | 51 ++++++++++++- 2 files changed, 93 insertions(+), 32 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index ccd4c1e81..3b7815963 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -26,9 +26,10 @@ import ( func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := relevantOperations(reports) watchRevision := requestRevision(reports) + clientRequestsCount := countClientRequests(reports) returnTime := returnTime(allOperations, reports, persistedRequests) persistedRequestsCount := countPersistedRequests(persistedRequests) - return patchOperations(allOperations, watchRevision, returnTime, persistedRequestsCount) + return patchOperations(allOperations, clientRequestsCount, watchRevision, returnTime, persistedRequestsCount) } func relevantOperations(reports []report.ClientReport) []porcupine.Operation { @@ -46,7 +47,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func patchOperations(operations []porcupine.Operation, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -63,14 +64,16 @@ func patchOperations(operations []porcupine.Operation, watchRevision, returnTime switch operation.Type { case model.PutOperation: kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - revision, ok := watchRevision[kv] - if ok { - txnRevision = revision + if count := clientRequestCount[kv]; count == 1 { + revision, ok := watchRevision[kv] + if ok { + txnRevision = revision + } + if t, ok := returnTime[kv]; ok && t < op.Return { + op.Return = t + } } - if t, ok := returnTime[kv]; ok && t < op.Return { - op.Return = t - } - _, ok = persistedRequestCount[kv] + _, ok := persistedRequestCount[kv] if ok { txnPersisted = true } @@ -80,7 +83,7 @@ func patchOperations(operations []porcupine.Operation, watchRevision, returnTime panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } - if isUniqueTxn(request.Txn) { + if isUniqueTxn(request.Txn, clientRequestCount) { if !txnPersisted { // Remove non persisted operations continue @@ -98,26 +101,29 @@ func patchOperations(operations []porcupine.Operation, watchRevision, returnTime return newOperations } -func isUniqueTxn(request *model.TxnRequest) bool { - return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure)) +func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool { + return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount) } -func hasWriteOperation(ops []model.EtcdOperation) bool { - for _, etcdOp := range ops { - if etcdOp.Type == model.PutOperation || etcdOp.Type == model.DeleteOperation { - return true +func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool { + hasUniqueWrite := false + hasWrite := false + for _, operation := range ops { + switch operation.Type { + case model.PutOperation: + hasWrite = true + kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} + if count := clientRequestCount[kv]; count == 1 { + hasUniqueWrite = true + } + case model.DeleteOperation: + hasWrite = true + case model.RangeOperation: + default: + panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } - return false -} - -func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { - for _, etcdOp := range ops { - if etcdOp.Type == model.PutOperation { - return true - } - } - return false + return hasUniqueWrite || !hasWrite } func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 { @@ -132,10 +138,9 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo continue } kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} - if _, found := earliestReturnTime[kv]; found { - panic("Unexpected duplicate event in persisted requests.") + if t, ok := earliestReturnTime[kv]; !ok || t > op.Return { + earliestReturnTime[kv] = op.Return } - earliestReturnTime[kv] = op.Return } case model.Range: case model.LeaseGrant: @@ -196,6 +201,17 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo return earliestReturnTime } +func countClientRequests(reports []report.ClientReport) map[keyValue]int64 { + counter := map[keyValue]int64{} + for _, client := range reports { + for _, op := range client.KeyValue { + request := op.Input.(model.EtcdRequest) + countRequest(counter, request) + } + } + return counter +} + func countPersistedRequests(requests []model.EtcdRequest) map[keyValue]int64 { counter := map[keyValue]int64{} for _, request := range requests { diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index 202d6411a..b52a6be4b 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -71,7 +71,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put remains if there is a matching event, return time based on next persisted request", + name: "failed put remains if there is a matching event, uniqueness allows for return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendPut("key1", "value", 1, 2, nil, errors.New("failed")) h.AppendPut("key2", "value", 3, 4, &clientv3.PutResponse{}, nil) @@ -86,7 +86,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put remains if there is a matching event, revision and return time based on watch", + name: "failed put remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch", historyFunc: func(h *model.AppendableHistory) { h.AppendPut("key", "value", 1, 2, nil, errors.New("failed")) }, @@ -98,6 +98,22 @@ func TestPatchHistory(t *testing.T) { {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, }, }, + { + name: "failed put remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPut("key", "value", 1, 2, nil, errors.New("failed")) + h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + putRequest("key", "value"), + putRequest("key", "value"), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, { name: "failed put is dropped if event has different key", historyFunc: func(h *model.AppendableHistory) { @@ -137,7 +153,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put with lease remains if there is a matching event, return time based on next persisted request", + name: "failed put with lease remains if there is a matching event, uniqueness allows return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendPutWithLease("key1", "value", 123, 1, 2, nil, errors.New("failed")) h.AppendPutWithLease("key2", "value", 234, 3, 4, &clientv3.PutResponse{}, nil) @@ -151,6 +167,35 @@ func TestPatchHistory(t *testing.T) { {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, }, }, + { + name: "failed put with lease remains if there is a matching event, uniqueness allows for revision and return time to be based on watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + putRequestWithLease("key", "value", 123), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, + }, + }, + { + name: "failed put with lease remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) + h.AppendPutWithLease("key", "value", 321, 3, 4, &clientv3.PutResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + putRequestWithLease("key", "value", 123), + putRequestWithLease("key", "value", 321), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, { name: "failed put is dropped", historyFunc: func(h *model.AppendableHistory) { From b2a21be6cc2b174ad5901995fab6534521e33be9 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 27 Jun 2024 23:29:58 +0200 Subject: [PATCH 7/7] Patch delete request based on their uniqness in client requests Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 171 +++++++++++------- .../robustness/validate/patch_history_test.go | 53 +++++- 2 files changed, 155 insertions(+), 69 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 3b7815963..842b55bd8 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -47,7 +47,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount *requestStats) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -58,41 +58,62 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch newOperations = append(newOperations, op) continue } - txnPersisted := false + txnCanBeDiscarded := true + txnUniquellyPersisted := false var txnRevision int64 = 0 for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { switch operation.Type { case model.PutOperation: kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - if count := clientRequestCount[kv]; count == 1 { - revision, ok := watchRevision[kv] + unique := clientRequestCount.Put[kv] == 1 + if unique { + revision, ok := watchRevision.Put[kv] if ok { txnRevision = revision } - if t, ok := returnTime[kv]; ok && t < op.Return { + if t, ok := returnTime.Put[kv]; ok && t < op.Return { op.Return = t } } - _, ok := persistedRequestCount[kv] + _, ok := persistedRequestCount.Put[kv] if ok { - txnPersisted = true + txnCanBeDiscarded = false + if unique { + txnUniquellyPersisted = true + } } case model.DeleteOperation: + unique := clientRequestCount.Delete[operation.Delete] == 1 + if unique { + revision, ok := watchRevision.Delete[operation.Delete] + if ok { + txnRevision = revision + } + if t, ok := returnTime.Delete[operation.Delete]; ok && t < op.Return { + op.Return = t + } + } + _, ok := persistedRequestCount.Delete[operation.Delete] + if ok { + txnCanBeDiscarded = false + if unique { + txnUniquellyPersisted = true + } + } case model.RangeOperation: default: panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } - if isUniqueTxn(request.Txn, clientRequestCount) { - if !txnPersisted { - // Remove non persisted operations - continue + if txnCanBeDiscarded { + // Remove non persisted operations + continue + } + if txnUniquellyPersisted { + if txnRevision != 0 { + op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision} } else { - if txnRevision != 0 { - op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision} - } else { - op.Output = model.MaybeEtcdResponse{Persisted: true} - } + op.Output = model.MaybeEtcdResponse{Persisted: true} } } // Leave operation as it is as we cannot discard it. @@ -101,45 +122,31 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch return newOperations } -func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool { - return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount) -} - -func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool { - hasUniqueWrite := false - hasWrite := false - for _, operation := range ops { - switch operation.Type { - case model.PutOperation: - hasWrite = true - kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - if count := clientRequestCount[kv]; count == 1 { - hasUniqueWrite = true - } - case model.DeleteOperation: - hasWrite = true - case model.RangeOperation: - default: - panic(fmt.Sprintf("unknown operation type %q", operation.Type)) - } +func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) *requestStats { + earliestReturnTime := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, } - return hasUniqueWrite || !hasWrite -} - -func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 { - earliestReturnTime := map[keyValue]int64{} var lastReturnTime int64 = 0 for _, op := range allOperations { request := op.Input.(model.EtcdRequest) switch request.Type { case model.Txn: for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if etcdOp.Type != model.PutOperation { - continue - } - kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} - if t, ok := earliestReturnTime[kv]; !ok || t > op.Return { - earliestReturnTime[kv] = op.Return + switch etcdOp.Type { + case model.PutOperation: + kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} + if t, ok := earliestReturnTime.Put[kv]; !ok || t > op.Return { + earliestReturnTime.Put[kv] = op.Return + } + case model.DeleteOperation: + if t, ok := earliestReturnTime.Delete[etcdOp.Delete]; !ok || t > op.Return { + earliestReturnTime.Delete[etcdOp.Delete] = op.Return + } + earliestReturnTime.Delete[etcdOp.Delete] = op.Return + case model.RangeOperation: + default: + panic(fmt.Sprintf("Unknown operation type: %q", etcdOp.Type)) } } case model.Range: @@ -163,10 +170,14 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo case model.RangeOperation: case model.PutOperation: kv := keyValue{Key: event.Key, Value: event.Value} - if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() { - earliestReturnTime[kv] = resp.Time.Nanoseconds() + if t, ok := earliestReturnTime.Put[kv]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime.Put[kv] = resp.Time.Nanoseconds() } case model.DeleteOperation: + del := model.DeleteOptions{Key: event.Key} + if t, ok := earliestReturnTime.Delete[del]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime.Delete[del] = resp.Time.Nanoseconds() + } default: panic(fmt.Sprintf("unknown event type %q", event.Type)) } @@ -181,14 +192,23 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo case model.Txn: lastReturnTime-- for _, op := range request.Txn.OperationsOnSuccess { - if op.Type != model.PutOperation { - continue - } - kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} - returnTime, ok := earliestReturnTime[kv] - if ok { - lastReturnTime = min(returnTime, lastReturnTime) - earliestReturnTime[kv] = lastReturnTime + switch op.Type { + case model.PutOperation: + kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} + returnTime, ok := earliestReturnTime.Put[kv] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime.Put[kv] = lastReturnTime + } + case model.DeleteOperation: + returnTime, ok := earliestReturnTime.Delete[op.Delete] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime.Delete[op.Delete] = lastReturnTime + } + case model.RangeOperation: + default: + panic(fmt.Sprintf("Unknown operation type: %q", op.Type)) } } case model.LeaseGrant: @@ -201,8 +221,11 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo return earliestReturnTime } -func countClientRequests(reports []report.ClientReport) map[keyValue]int64 { - counter := map[keyValue]int64{} +func countClientRequests(reports []report.ClientReport) *requestStats { + counter := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, client := range reports { for _, op := range client.KeyValue { request := op.Input.(model.EtcdRequest) @@ -212,23 +235,27 @@ func countClientRequests(reports []report.ClientReport) map[keyValue]int64 { return counter } -func countPersistedRequests(requests []model.EtcdRequest) map[keyValue]int64 { - counter := map[keyValue]int64{} +func countPersistedRequests(requests []model.EtcdRequest) *requestStats { + counter := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, request := range requests { countRequest(counter, request) } return counter } -func countRequest(counter map[keyValue]int64, request model.EtcdRequest) { +func countRequest(counter *requestStats, request model.EtcdRequest) { switch request.Type { case model.Txn: for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { switch operation.Type { case model.PutOperation: kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - counter[kv] += 1 + counter.Put[kv] += 1 case model.DeleteOperation: + counter.Delete[operation.Delete] += 1 case model.RangeOperation: default: panic(fmt.Sprintf("unknown operation type %q", operation.Type)) @@ -244,8 +271,11 @@ func countRequest(counter map[keyValue]int64, request model.EtcdRequest) { } } -func requestRevision(reports []report.ClientReport) map[keyValue]int64 { - requestRevision := map[keyValue]int64{} +func requestRevision(reports []report.ClientReport) *requestStats { + requestRevision := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, client := range reports { for _, watch := range client.Watch { for _, resp := range watch.Responses { @@ -254,8 +284,10 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 { case model.RangeOperation: case model.PutOperation: kv := keyValue{Key: event.Key, Value: event.Value} - requestRevision[kv] = event.Revision + requestRevision.Put[kv] = event.Revision case model.DeleteOperation: + del := model.DeleteOptions{Key: event.Key} + requestRevision.Delete[del] = event.Revision default: panic(fmt.Sprintf("unknown event type %q", event.Type)) } @@ -266,6 +298,11 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 { return requestRevision } +type requestStats struct { + Put map[keyValue]int64 + Delete map[model.DeleteOptions]int64 +} + type keyValue struct { Key string Value model.ValueOrHash diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index b52a6be4b..2852654a4 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -141,7 +141,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put with lease remains if there is a matching event, return time untouched", + name: "failed put with lease remains if there is a matching persisted request, return time untouched", historyFunc: func(h *model.AppendableHistory) { h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) }, @@ -220,15 +220,64 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed delete remains, time untouched regardless of persisted event and watch", + name: "failed delete is dropped", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{}, + expectedRemainingOperations: []porcupine.Operation{}, + }, + { + name: "failed delete remains if there is a matching persisted request, time untouched", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + }, + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, uniqueness allows return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendDelete("key", 1, 2, nil, errors.New("failed")) h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil) }, persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), putRequest("key", "value"), }, + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + }, watchOperations: watchResponse(3, deleteEvent("key", 2)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + h.AppendDelete("key", 3, 4, &clientv3.DeleteResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + deleteRequest("key"), + }, + watchOperations: watchResponse(3, deleteEvent("key", 2), deleteEvent("key", 3)), expectedRemainingOperations: []porcupine.Operation{ {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 4, Output: putResponse(model.EtcdOperationResult{})},