From b2a21be6cc2b174ad5901995fab6534521e33be9 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 27 Jun 2024 23:29:58 +0200 Subject: [PATCH] Patch delete request based on their uniqness in client requests Signed-off-by: Marek Siarkowicz --- tests/robustness/validate/patch_history.go | 171 +++++++++++------- .../robustness/validate/patch_history_test.go | 53 +++++- 2 files changed, 155 insertions(+), 69 deletions(-) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 3b7815963..842b55bd8 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -47,7 +47,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount *requestStats) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -58,41 +58,62 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch newOperations = append(newOperations, op) continue } - txnPersisted := false + txnCanBeDiscarded := true + txnUniquellyPersisted := false var txnRevision int64 = 0 for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { switch operation.Type { case model.PutOperation: kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - if count := clientRequestCount[kv]; count == 1 { - revision, ok := watchRevision[kv] + unique := clientRequestCount.Put[kv] == 1 + if unique { + revision, ok := watchRevision.Put[kv] if ok { txnRevision = revision } - if t, ok := returnTime[kv]; ok && t < op.Return { + if t, ok := returnTime.Put[kv]; ok && t < op.Return { op.Return = t } } - _, ok := persistedRequestCount[kv] + _, ok := persistedRequestCount.Put[kv] if ok { - txnPersisted = true + txnCanBeDiscarded = false + if unique { + txnUniquellyPersisted = true + } } case model.DeleteOperation: + unique := clientRequestCount.Delete[operation.Delete] == 1 + if unique { + revision, ok := watchRevision.Delete[operation.Delete] + if ok { + txnRevision = revision + } + if t, ok := returnTime.Delete[operation.Delete]; ok && t < op.Return { + op.Return = t + } + } + _, ok := persistedRequestCount.Delete[operation.Delete] + if ok { + txnCanBeDiscarded = false + if unique { + txnUniquellyPersisted = true + } + } case model.RangeOperation: default: panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } - if isUniqueTxn(request.Txn, clientRequestCount) { - if !txnPersisted { - // Remove non persisted operations - continue + if txnCanBeDiscarded { + // Remove non persisted operations + continue + } + if txnUniquellyPersisted { + if txnRevision != 0 { + op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision} } else { - if txnRevision != 0 { - op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision} - } else { - op.Output = model.MaybeEtcdResponse{Persisted: true} - } + op.Output = model.MaybeEtcdResponse{Persisted: true} } } // Leave operation as it is as we cannot discard it. @@ -101,45 +122,31 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch return newOperations } -func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool { - return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount) -} - -func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool { - hasUniqueWrite := false - hasWrite := false - for _, operation := range ops { - switch operation.Type { - case model.PutOperation: - hasWrite = true - kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - if count := clientRequestCount[kv]; count == 1 { - hasUniqueWrite = true - } - case model.DeleteOperation: - hasWrite = true - case model.RangeOperation: - default: - panic(fmt.Sprintf("unknown operation type %q", operation.Type)) - } +func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) *requestStats { + earliestReturnTime := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, } - return hasUniqueWrite || !hasWrite -} - -func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 { - earliestReturnTime := map[keyValue]int64{} var lastReturnTime int64 = 0 for _, op := range allOperations { request := op.Input.(model.EtcdRequest) switch request.Type { case model.Txn: for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if etcdOp.Type != model.PutOperation { - continue - } - kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} - if t, ok := earliestReturnTime[kv]; !ok || t > op.Return { - earliestReturnTime[kv] = op.Return + switch etcdOp.Type { + case model.PutOperation: + kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} + if t, ok := earliestReturnTime.Put[kv]; !ok || t > op.Return { + earliestReturnTime.Put[kv] = op.Return + } + case model.DeleteOperation: + if t, ok := earliestReturnTime.Delete[etcdOp.Delete]; !ok || t > op.Return { + earliestReturnTime.Delete[etcdOp.Delete] = op.Return + } + earliestReturnTime.Delete[etcdOp.Delete] = op.Return + case model.RangeOperation: + default: + panic(fmt.Sprintf("Unknown operation type: %q", etcdOp.Type)) } } case model.Range: @@ -163,10 +170,14 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo case model.RangeOperation: case model.PutOperation: kv := keyValue{Key: event.Key, Value: event.Value} - if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() { - earliestReturnTime[kv] = resp.Time.Nanoseconds() + if t, ok := earliestReturnTime.Put[kv]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime.Put[kv] = resp.Time.Nanoseconds() } case model.DeleteOperation: + del := model.DeleteOptions{Key: event.Key} + if t, ok := earliestReturnTime.Delete[del]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime.Delete[del] = resp.Time.Nanoseconds() + } default: panic(fmt.Sprintf("unknown event type %q", event.Type)) } @@ -181,14 +192,23 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo case model.Txn: lastReturnTime-- for _, op := range request.Txn.OperationsOnSuccess { - if op.Type != model.PutOperation { - continue - } - kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} - returnTime, ok := earliestReturnTime[kv] - if ok { - lastReturnTime = min(returnTime, lastReturnTime) - earliestReturnTime[kv] = lastReturnTime + switch op.Type { + case model.PutOperation: + kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} + returnTime, ok := earliestReturnTime.Put[kv] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime.Put[kv] = lastReturnTime + } + case model.DeleteOperation: + returnTime, ok := earliestReturnTime.Delete[op.Delete] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime.Delete[op.Delete] = lastReturnTime + } + case model.RangeOperation: + default: + panic(fmt.Sprintf("Unknown operation type: %q", op.Type)) } } case model.LeaseGrant: @@ -201,8 +221,11 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo return earliestReturnTime } -func countClientRequests(reports []report.ClientReport) map[keyValue]int64 { - counter := map[keyValue]int64{} +func countClientRequests(reports []report.ClientReport) *requestStats { + counter := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, client := range reports { for _, op := range client.KeyValue { request := op.Input.(model.EtcdRequest) @@ -212,23 +235,27 @@ func countClientRequests(reports []report.ClientReport) map[keyValue]int64 { return counter } -func countPersistedRequests(requests []model.EtcdRequest) map[keyValue]int64 { - counter := map[keyValue]int64{} +func countPersistedRequests(requests []model.EtcdRequest) *requestStats { + counter := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, request := range requests { countRequest(counter, request) } return counter } -func countRequest(counter map[keyValue]int64, request model.EtcdRequest) { +func countRequest(counter *requestStats, request model.EtcdRequest) { switch request.Type { case model.Txn: for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { switch operation.Type { case model.PutOperation: kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} - counter[kv] += 1 + counter.Put[kv] += 1 case model.DeleteOperation: + counter.Delete[operation.Delete] += 1 case model.RangeOperation: default: panic(fmt.Sprintf("unknown operation type %q", operation.Type)) @@ -244,8 +271,11 @@ func countRequest(counter map[keyValue]int64, request model.EtcdRequest) { } } -func requestRevision(reports []report.ClientReport) map[keyValue]int64 { - requestRevision := map[keyValue]int64{} +func requestRevision(reports []report.ClientReport) *requestStats { + requestRevision := &requestStats{ + Put: map[keyValue]int64{}, + Delete: map[model.DeleteOptions]int64{}, + } for _, client := range reports { for _, watch := range client.Watch { for _, resp := range watch.Responses { @@ -254,8 +284,10 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 { case model.RangeOperation: case model.PutOperation: kv := keyValue{Key: event.Key, Value: event.Value} - requestRevision[kv] = event.Revision + requestRevision.Put[kv] = event.Revision case model.DeleteOperation: + del := model.DeleteOptions{Key: event.Key} + requestRevision.Delete[del] = event.Revision default: panic(fmt.Sprintf("unknown event type %q", event.Type)) } @@ -266,6 +298,11 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 { return requestRevision } +type requestStats struct { + Put map[keyValue]int64 + Delete map[model.DeleteOptions]int64 +} + type keyValue struct { Key string Value model.ValueOrHash diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index b52a6be4b..2852654a4 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -141,7 +141,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put with lease remains if there is a matching event, return time untouched", + name: "failed put with lease remains if there is a matching persisted request, return time untouched", historyFunc: func(h *model.AppendableHistory) { h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) }, @@ -220,15 +220,64 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed delete remains, time untouched regardless of persisted event and watch", + name: "failed delete is dropped", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{}, + expectedRemainingOperations: []porcupine.Operation{}, + }, + { + name: "failed delete remains if there is a matching persisted request, time untouched", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + }, + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, uniqueness allows return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendDelete("key", 1, 2, nil, errors.New("failed")) h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil) }, persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), putRequest("key", "value"), }, + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + }, watchOperations: watchResponse(3, deleteEvent("key", 2)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, + }, + }, + { + name: "failed delete remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendDelete("key", 1, 2, nil, errors.New("failed")) + h.AppendDelete("key", 3, 4, &clientv3.DeleteResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + deleteRequest("key"), + deleteRequest("key"), + }, + watchOperations: watchResponse(3, deleteEvent("key", 2), deleteEvent("key", 3)), expectedRemainingOperations: []porcupine.Operation{ {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 4, Output: putResponse(model.EtcdOperationResult{})},