Merge b2a21be6cc2b174ad5901995fab6534521e33be9 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Marek Siarkowicz 2024-09-26 22:00:06 +01:00 committed by GitHub
commit 71c8f37482
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 360 additions and 238 deletions

View File

@ -46,56 +46,56 @@ func TestValidateSerializableOperations(t *testing.T) {
},
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(1, keyValue("a", "1", 2)),
Output: rangeResponse(1, keyValueRevision("a", "1", 2)),
},
{
Input: rangeRequest("a", "z", 3, 0),
Output: rangeResponse(2,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
{
Input: rangeRequest("a", "z", 4, 0),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 3),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 4),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 2),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
{
Input: rangeRequest("b\x00", "z", 4, 2),
Output: rangeResponse(1,
keyValue("c", "3", 4),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("b", "", 4, 0),
Output: rangeResponse(1,
keyValue("b", "2", 3),
keyValueRevision("b", "2", 3),
),
},
{
@ -115,9 +115,9 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 4, 0),
Output: rangeResponse(3,
keyValue("c", "3", 4),
keyValue("b", "2", 3),
keyValue("a", "1", 2),
keyValueRevision("c", "3", 4),
keyValueRevision("b", "2", 3),
keyValueRevision("a", "1", 2),
),
},
},
@ -149,7 +149,7 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(3,
keyValue("b", "2", 3),
keyValueRevision("b", "2", 3),
),
},
},
@ -166,8 +166,8 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
},
@ -284,7 +284,7 @@ func errorResponse(err error) model.MaybeEtcdResponse {
}
}
func keyValue(key, value string, rev int64) model.KeyValue {
func keyValueRevision(key, value string, rev int64) model.KeyValue {
return model.KeyValue{
Key: key,
ValueRevision: model.ValueRevision{

View File

@ -19,16 +19,17 @@ import (
"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
)
func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation {
allOperations := relevantOperations(reports)
uniqueEvents := uniqueWatchEvents(reports)
operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests)
return patchOperations(allOperations, uniqueEvents, operationsReturnTime)
watchRevision := requestRevision(reports)
clientRequestsCount := countClientRequests(reports)
returnTime := returnTime(allOperations, reports, persistedRequests)
persistedRequestsCount := countPersistedRequests(persistedRequests)
return patchOperations(allOperations, clientRequestsCount, watchRevision, returnTime, persistedRequestsCount)
}
func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
@ -46,27 +47,8 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
return ops
}
func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]client.TimedWatchEvent {
persisted := map[model.Event]client.TimedWatchEvent{}
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
for _, event := range resp.Events {
responseTime := resp.Time
if prev, found := persisted[event.Event]; found && prev.Time < responseTime {
responseTime = prev.Time
}
persisted[event.Event] = client.TimedWatchEvent{Time: responseTime, WatchEvent: event}
}
}
}
}
return persisted
}
func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation {
func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount *requestStats) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents)
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
@ -76,35 +58,62 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
newOperations = append(newOperations, op)
continue
}
var resourceVersion int64
if op.Call <= lastObservedOperation.Call {
matchingEvent := matchWatchEvent(request.Txn, watchEvents)
if matchingEvent != nil {
eventTime := matchingEvent.Time.Nanoseconds()
// Set revision and time based on watchEvent.
if eventTime < op.Return {
op.Return = eventTime
txnCanBeDiscarded := true
txnUniquellyPersisted := false
var txnRevision int64 = 0
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
unique := clientRequestCount.Put[kv] == 1
if unique {
revision, ok := watchRevision.Put[kv]
if ok {
txnRevision = revision
}
if t, ok := returnTime.Put[kv]; ok && t < op.Return {
op.Return = t
}
}
resourceVersion = matchingEvent.Revision
_, ok := persistedRequestCount.Put[kv]
if ok {
txnCanBeDiscarded = false
if unique {
txnUniquellyPersisted = true
}
}
case model.DeleteOperation:
unique := clientRequestCount.Delete[operation.Delete] == 1
if unique {
revision, ok := watchRevision.Delete[operation.Delete]
if ok {
txnRevision = revision
}
if t, ok := returnTime.Delete[operation.Delete]; ok && t < op.Return {
op.Return = t
}
}
_, ok := persistedRequestCount.Delete[operation.Delete]
if ok {
txnCanBeDiscarded = false
if unique {
txnUniquellyPersisted = true
}
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
}
persistedReturnTime := matchReturnTime(request, persistedOperations)
if persistedReturnTime != nil {
// Set return time based on persisted return time.
if *persistedReturnTime < op.Return {
op.Return = *persistedReturnTime
}
if txnCanBeDiscarded {
// Remove non persisted operations
continue
}
if isUniqueTxn(request.Txn) {
if persistedReturnTime == nil {
// Remove non persisted operations
continue
if txnUniquellyPersisted {
if txnRevision != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision}
} else {
if resourceVersion != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
}
// Leave operation as it is as we cannot discard it.
@ -113,113 +122,32 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
return newOperations
}
func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent) porcupine.Operation {
var maxCallTime int64
var lastOperation porcupine.Operation
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type != model.Txn {
continue
}
event := matchWatchEvent(request.Txn, watchEvents)
if event != nil && op.Call > maxCallTime {
maxCallTime = op.Call
lastOperation = op
}
func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) *requestStats {
earliestReturnTime := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
return lastOperation
}
func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]client.TimedWatchEvent) *client.TimedWatchEvent {
for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) {
if etcdOp.Type == model.PutOperation {
event, ok := watchEvents[model.Event{
Type: etcdOp.Type,
Key: etcdOp.Put.Key,
Value: etcdOp.Put.Value,
}]
if ok {
return &event
}
}
}
return nil
}
func isUniqueTxn(request *model.TxnRequest) bool {
return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure))
}
func hasWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation || etcdOp.Type == model.DeleteOperation {
return true
}
}
return false
}
func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation {
return true
}
}
return false
}
func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 {
operationReturnTime := operationReturnTime(allOperations)
persisted := map[model.EtcdOperation]int64{}
lastReturnTime := maxReturnTime(operationReturnTime)
for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
hasPut := false
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
if _, found := persisted[op]; found {
panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op))
}
hasPut = true
persisted[op] = lastReturnTime
}
if hasPut {
newReturnTime := requestReturnTime(operationReturnTime, request)
if newReturnTime != -1 {
lastReturnTime = min(lastReturnTime, newReturnTime)
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return persisted
}
func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
newOperations := map[model.EtcdOperation]int64{}
for _, op := range operations {
var lastReturnTime int64 = 0
for _, op := range allOperations {
request := op.Input.(model.EtcdRequest)
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
switch etcdOp.Type {
case model.PutOperation:
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if t, ok := earliestReturnTime.Put[kv]; !ok || t > op.Return {
earliestReturnTime.Put[kv] = op.Return
}
case model.DeleteOperation:
if t, ok := earliestReturnTime.Delete[etcdOp.Delete]; !ok || t > op.Return {
earliestReturnTime.Delete[etcdOp.Delete] = op.Return
}
earliestReturnTime.Delete[etcdOp.Delete] = op.Return
case model.RangeOperation:
default:
panic(fmt.Sprintf("Unknown operation type: %q", etcdOp.Type))
}
if _, found := newOperations[etcdOp]; found {
panic("Unexpected duplicate event in persisted requests.")
}
newOperations[etcdOp] = op.Return
}
case model.Range:
case model.LeaseGrant:
@ -228,45 +156,154 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return newOperations
}
func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 {
var maxReturnTime int64
for _, returnTime := range operationTime {
if returnTime > maxReturnTime {
maxReturnTime = returnTime
if op.Return > lastReturnTime {
lastReturnTime = op.Return
}
}
return maxReturnTime
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
if t, ok := earliestReturnTime.Put[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime.Put[kv] = resp.Time.Nanoseconds()
}
case model.DeleteOperation:
del := model.DeleteOptions{Key: event.Key}
if t, ok := earliestReturnTime.Delete[del]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime.Delete[del] = resp.Time.Nanoseconds()
}
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}
for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
switch op.Type {
case model.PutOperation:
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
returnTime, ok := earliestReturnTime.Put[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime.Put[kv] = lastReturnTime
}
case model.DeleteOperation:
returnTime, ok := earliestReturnTime.Delete[op.Delete]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime.Delete[op.Delete] = lastReturnTime
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("Unknown operation type: %q", op.Type))
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return earliestReturnTime
}
func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
func countClientRequests(reports []report.ClientReport) *requestStats {
counter := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, client := range reports {
for _, op := range client.KeyValue {
request := op.Input.(model.EtcdRequest)
countRequest(counter, request)
}
}
return counter
}
func countPersistedRequests(requests []model.EtcdRequest) *requestStats {
counter := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, request := range requests {
countRequest(counter, request)
}
return counter
}
func countRequest(counter *requestStats, request model.EtcdRequest) {
switch request.Type {
case model.Txn:
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if time, found := operationTime[op]; found {
return time
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
counter.Put[kv] += 1
case model.DeleteOperation:
counter.Delete[operation.Delete] += 1
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
}
return -1
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
case model.Defragment:
case model.Range:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
panic(fmt.Sprintf("unknown request type %q", request.Type))
}
}
func matchReturnTime(request model.EtcdRequest, persistedOperations map[model.EtcdOperation]int64) *int64 {
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
if returnTime, found := persistedOperations[etcdOp]; found {
return &returnTime
func requestRevision(reports []report.ClientReport) *requestStats {
requestRevision := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
requestRevision.Put[kv] = event.Revision
case model.DeleteOperation:
del := model.DeleteOptions{Key: event.Key}
requestRevision.Delete[del] = event.Revision
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}
return nil
return requestRevision
}
type requestStats struct {
Put map[keyValue]int64
Delete map[model.DeleteOptions]int64
}
type keyValue struct {
Key string
Value model.ValueOrHash
}

View File

@ -71,7 +71,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put remains if there is a matching event, return time based on next persisted request",
name: "failed put remains if there is a matching event, uniqueness allows for return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key1", "value", 1, 2, nil, errors.New("failed"))
h.AppendPut("key2", "value", 3, 4, &clientv3.PutResponse{}, nil)
@ -86,18 +86,34 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put remains if there is a matching event, revision and return time based on watch",
name: "failed put remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key", "value", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
putRequest("key", "value"),
},
watchOperations: watchPutEvent("key", "value", 2, 3),
watchOperations: watchResponse(3, putEvent("key", "value", 2)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed put remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key", "value", 1, 2, nil, errors.New("failed"))
h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
putRequest("key", "value"),
putRequest("key", "value"),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put is dropped if event has different key",
historyFunc: func(h *model.AppendableHistory) {
@ -125,7 +141,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put with lease remains if there is a matching event, return time untouched",
name: "failed put with lease remains if there is a matching persisted request, return time untouched",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
},
@ -137,7 +153,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put with lease remains if there is a matching event, return time based on next persisted request",
name: "failed put with lease remains if there is a matching event, uniqueness allows return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key1", "value", 123, 1, 2, nil, errors.New("failed"))
h.AppendPutWithLease("key2", "value", 234, 3, 4, &clientv3.PutResponse{}, nil)
@ -151,6 +167,35 @@ func TestPatchHistory(t *testing.T) {
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put with lease remains if there is a matching event, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
putRequestWithLease("key", "value", 123),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed put with lease remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
h.AppendPutWithLease("key", "value", 321, 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
putRequestWithLease("key", "value", 123),
putRequestWithLease("key", "value", 321),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put is dropped",
historyFunc: func(h *model.AppendableHistory) {
@ -175,15 +220,64 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed delete remains, time untouched regardless of persisted event and watch",
name: "failed delete is dropped",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{},
expectedRemainingOperations: []porcupine.Operation{},
},
{
name: "failed delete remains if there is a matching persisted request, time untouched",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
name: "failed delete remains if there is a matching persisted request, uniqueness allows return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
putRequest("key", "value"),
},
watchOperations: watchDeleteEvent("key", 2, 3),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed delete remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
},
watchOperations: watchResponse(3, deleteEvent("key", 2)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed delete remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
h.AppendDelete("key", 3, 4, &clientv3.DeleteResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
deleteRequest("key"),
},
watchOperations: watchResponse(3, deleteEvent("key", 2), deleteEvent("key", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
@ -325,49 +419,40 @@ func putResponse(result ...model.EtcdOperationResult) model.MaybeEtcdResponse {
return model.MaybeEtcdResponse{EtcdResponse: model.EtcdResponse{Txn: &model.TxnResponse{Results: result}}}
}
func watchPutEvent(key, value string, revision, responseTime int64) []model.WatchOperation {
func watchResponse(responseTime int64, events ...model.WatchEvent) []model.WatchOperation {
return []model.WatchOperation{
{
Responses: []model.WatchResponse{
{
Time: time.Duration(responseTime),
Events: []model.WatchEvent{
{
PersistedEvent: model.PersistedEvent{
Event: model.Event{
Type: model.PutOperation,
Key: key,
Value: model.ToValueOrHash(value),
},
Revision: revision,
},
},
},
Time: time.Duration(responseTime),
Events: events,
},
},
},
}
}
func watchDeleteEvent(key string, revision, responseTime int64) []model.WatchOperation {
return []model.WatchOperation{
{
Responses: []model.WatchResponse{
{
Time: time.Duration(responseTime),
Events: []model.WatchEvent{
{
PersistedEvent: model.PersistedEvent{
Event: model.Event{
Type: model.DeleteOperation,
Key: key,
},
Revision: revision,
},
},
},
},
func putEvent(key, value string, revision int64) model.WatchEvent {
return model.WatchEvent{
PersistedEvent: model.PersistedEvent{
Event: model.Event{
Type: model.PutOperation,
Key: key,
Value: model.ToValueOrHash(value),
},
Revision: revision,
},
}
}
func deleteEvent(key string, revision int64) model.WatchEvent {
return model.WatchEvent{
PersistedEvent: model.PersistedEvent{
Event: model.Event{
Type: model.DeleteOperation,
Key: key,
},
Revision: revision,
},
}
}