diff --git a/tests/linearizability/history.go b/tests/linearizability/history.go index d0a3888a9..f61673099 100644 --- a/tests/linearizability/history.go +++ b/tests/linearizability/history.go @@ -197,19 +197,23 @@ func getRequest(key string) EtcdRequest { } func getResponse(value string, revision int64) EtcdResponse { - return EtcdResponse{Result: []EtcdOperationResult{{Value: value}}, Revision: revision} + return EtcdResponse{OpsResult: []EtcdOperationResult{{Value: value}}, Revision: revision} } func failedResponse(err error) EtcdResponse { return EtcdResponse{Err: err} } +func unknownResponse(revision int64) EtcdResponse { + return EtcdResponse{ResultUnknown: true, Revision: revision} +} + func putRequest(key, value string) EtcdRequest { return EtcdRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}} } func putResponse(revision int64) EtcdResponse { - return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision} + return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision} } func deleteRequest(key string) EtcdRequest { @@ -217,7 +221,7 @@ func deleteRequest(key string) EtcdRequest { } func deleteResponse(deleted int64, revision int64) EtcdResponse { - return EtcdResponse{Result: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision} + return EtcdResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision} } func txnRequest(key, expectValue, newValue string) EtcdRequest { @@ -229,7 +233,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse { if succeeded { result = []EtcdOperationResult{{}} } - return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision} + return EtcdResponse{OpsResult: result, TxnResult: !succeeded, Revision: revision} } func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest { @@ -241,7 +245,7 @@ func leaseGrantRequest(leaseID int64) EtcdRequest { } func leaseGrantResponse(revision int64) EtcdResponse { - return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision} + return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision} } func leaseRevokeRequest(leaseID int64) EtcdRequest { @@ -249,7 +253,7 @@ func leaseRevokeRequest(leaseID int64) EtcdRequest { } func leaseRevokeResponse(revision int64) EtcdResponse { - return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision} + return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision} } type history struct { diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 4735b4816..23f199bb8 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -18,6 +18,7 @@ import ( "context" "os" "path/filepath" + "sort" "strings" "sync" "testing" @@ -25,6 +26,7 @@ import ( "github.com/anishathalye/porcupine" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" @@ -102,7 +104,9 @@ func TestLinearizability(t *testing.T) { clientCount: 8, traffic: DefaultTraffic, }) - validateEventsMatch(t, events) + longestHistory, remainingEvents := pickLongestHistory(events) + validateEventsMatch(t, longestHistory, remainingEvents) + operations = patchOperationBasedOnWatchEvents(operations, longestHistory) checkOperationsAndPersistResults(t, operations, clus) }) } @@ -133,6 +137,75 @@ func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProces return operations, events } +func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation { + newOperations := make([]porcupine.Operation, 0, len(operations)) + persisted := map[EtcdOperation]watchEvent{} + for _, op := range watchEvents { + persisted[op.Op] = op + } + lastObservedEventTime := watchEvents[len(watchEvents)-1].Time + + for _, op := range operations { + resp := op.Output.(EtcdResponse) + if resp.Err == nil || op.Call > lastObservedEventTime.UnixNano() { + // No need to patch successfully requests and cannot patch requests outside observed window. + newOperations = append(newOperations, op) + continue + } + event, hasUniqueWriteOperation := matchWatchEvent(op, persisted) + if event != nil { + // Set revision and time based on watchEvent. + op.Return = event.Time.UnixNano() + op.Output = EtcdResponse{ + Revision: event.Revision, + ResultUnknown: true, + } + newOperations = append(newOperations, op) + continue + } + if hasWriteOperation(op) && !hasUniqueWriteOperation { + // Leave operation as it is as we cannot match non-unique operations to watch events. + newOperations = append(newOperations, op) + continue + } + // Remove non persisted operations + } + return newOperations +} + +func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) { + request := op.Input.(EtcdRequest) + for _, etcdOp := range request.Ops { + if isWrite(etcdOp.Type) && inUnique(etcdOp.Type) { + // We expect all put to be unique as they write unique value. + hasUniqueWriteOperation = true + opType := etcdOp.Type + if opType == PutWithLease { + opType = Put + } + event, ok := watchEvents[EtcdOperation{ + Type: opType, + Key: etcdOp.Key, + Value: etcdOp.Value, + }] + if ok { + return &event, hasUniqueWriteOperation + } + } + } + return nil, hasUniqueWriteOperation +} + +func hasWriteOperation(op porcupine.Operation) bool { + request := op.Input.(EtcdRequest) + for _, etcdOp := range request.Ops { + if isWrite(etcdOp.Type) { + return true + } + } + return false +} + func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) { var err error successes := 0 @@ -213,20 +286,18 @@ type trafficConfig struct { traffic Traffic } -func validateEventsMatch(t *testing.T, ops [][]watchEvent) { - // Move longest history to ops[0] - maxLength := len(ops[0]) - for i := 1; i < len(ops); i++ { - if len(ops[i]) > maxLength { - maxLength = len(ops[i]) - ops[0], ops[i] = ops[i], ops[0] - } - } +func pickLongestHistory(ops [][]watchEvent) (longest []watchEvent, rest [][]watchEvent) { + sort.Slice(ops, func(i, j int) bool { + return len(ops[i]) > len(ops[j]) + }) + return ops[0], ops[1:] +} - for i := 1; i < len(ops); i++ { - length := len(ops[i]) +func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]watchEvent) { + for i := 0; i < len(other); i++ { + length := len(other[i]) // We compare prefix of watch events, as we are not guaranteed to collect all events from each node. - if diff := cmp.Diff(ops[0][:length], ops[i][:length]); diff != "" { + if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" { t.Errorf("Events in watches do not match, %s", diff) } } diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index 2309eeaa5..738890a74 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -35,6 +35,14 @@ const ( LeaseRevoke OperationType = "leaseRevoke" ) +func isWrite(t OperationType) bool { + return t == Put || t == Delete || t == PutWithLease || t == LeaseRevoke || t == LeaseGrant +} + +func inUnique(t OperationType) bool { + return t == Put || t == PutWithLease +} + type EtcdRequest struct { Conds []EtcdCondition Ops []EtcdOperation @@ -53,10 +61,15 @@ type EtcdOperation struct { } type EtcdResponse struct { - Err error - Revision int64 - TxnFailure bool - Result []EtcdOperationResult + Err error + Revision int64 + ResultUnknown bool + TxnResult bool + OpsResult []EtcdOperationResult +} + +func Match(r1, r2 EtcdResponse) bool { + return ((r1.ResultUnknown || r2.ResultUnknown) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2) } type EtcdOperationResult struct { @@ -70,7 +83,6 @@ type EtcdLease struct { LeaseID int64 Keys map[string]struct{} } - type PossibleStates []EtcdState type EtcdState struct { @@ -131,12 +143,15 @@ func describeEtcdResponse(ops []EtcdOperation, response EtcdResponse) string { if response.Err != nil { return fmt.Sprintf("err: %q", response.Err) } - if response.TxnFailure { + if response.ResultUnknown { + return fmt.Sprintf("unknown, rev: %d", response.Revision) + } + if response.TxnResult { return fmt.Sprintf("txn failed, rev: %d", response.Revision) } - respDescription := make([]string, len(response.Result)) - for i := range response.Result { - respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.Result[i]) + respDescription := make([]string, len(response.OpsResult)) + for i := range response.OpsResult { + respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.OpsResult[i]) } respDescription = append(respDescription, fmt.Sprintf("rev: %d", response.Revision)) return strings.Join(respDescription, ", ") @@ -190,7 +205,7 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) { if len(states) == 0 { // states were not initialized - if response.Err != nil { + if response.Err != nil || response.ResultUnknown { return true, nil } return true, PossibleStates{initState(request, response)} @@ -211,11 +226,11 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState { KeyLeases: map[string]int64{}, Leases: map[int64]EtcdLease{}, } - if response.TxnFailure { + if response.TxnResult { return state } for i, op := range request.Ops { - opResp := response.Result[i] + opResp := response.OpsResult[i] switch op.Type { case Get: if opResp.Value != "" { @@ -263,7 +278,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo newStates := make(PossibleStates, 0, len(states)) for _, s := range states { newState, expectResponse := applyRequestToSingleState(s, request) - if reflect.DeepEqual(expectResponse, response) { + if Match(expectResponse, response) { newStates = append(newStates, newState) } } @@ -280,7 +295,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc } } if !success { - return s, EtcdResponse{Revision: s.Revision, TxnFailure: true} + return s, EtcdResponse{Revision: s.Revision, TxnResult: true} } newKVs := map[string]string{} for k, v := range s.KeyValues { @@ -346,7 +361,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc s.Revision += 1 } - return s, EtcdResponse{Result: opResp, Revision: s.Revision} + return s, EtcdResponse{OpsResult: opResp, Revision: s.Revision} } func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState { diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go index 9a82ff611..6ca882ce1 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model_test.go @@ -569,6 +569,11 @@ func TestModelDescribe(t *testing.T) { resp: failedResponse(errors.New("failed")), expectDescribe: `put("key4", "4") -> err: "failed"`, }, + { + req: putRequest("key4b", "4b"), + resp: unknownResponse(42), + expectDescribe: `put("key4b", "4b") -> unknown, rev: 42`, + }, { req: deleteRequest("key5"), resp: deleteResponse(1, 5), @@ -599,3 +604,150 @@ func TestModelDescribe(t *testing.T) { assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp)) } } + +func TestModelResponseMatch(t *testing.T) { + tcs := []struct { + resp1 EtcdResponse + resp2 EtcdResponse + expectMatch bool + }{ + { + resp1: getResponse("a", 1), + resp2: getResponse("a", 1), + expectMatch: true, + }, + { + resp1: getResponse("a", 1), + resp2: getResponse("b", 1), + expectMatch: false, + }, + { + resp1: getResponse("a", 1), + resp2: getResponse("a", 2), + expectMatch: false, + }, + { + resp1: getResponse("a", 1), + resp2: failedResponse(errors.New("failed request")), + expectMatch: false, + }, + { + resp1: getResponse("a", 1), + resp2: unknownResponse(1), + expectMatch: true, + }, + { + resp1: getResponse("a", 1), + resp2: unknownResponse(0), + expectMatch: false, + }, + { + resp1: putResponse(3), + resp2: putResponse(3), + expectMatch: true, + }, + { + resp1: putResponse(3), + resp2: putResponse(4), + expectMatch: false, + }, + { + resp1: putResponse(3), + resp2: failedResponse(errors.New("failed request")), + expectMatch: false, + }, + { + resp1: putResponse(3), + resp2: unknownResponse(3), + expectMatch: true, + }, + { + resp1: putResponse(3), + resp2: unknownResponse(0), + expectMatch: false, + }, + { + resp1: deleteResponse(1, 5), + resp2: deleteResponse(1, 5), + expectMatch: true, + }, + { + resp1: deleteResponse(1, 5), + resp2: deleteResponse(0, 5), + expectMatch: false, + }, + { + resp1: deleteResponse(1, 5), + resp2: deleteResponse(1, 6), + expectMatch: false, + }, + { + resp1: deleteResponse(1, 5), + resp2: failedResponse(errors.New("failed request")), + expectMatch: false, + }, + { + resp1: deleteResponse(1, 5), + resp2: unknownResponse(5), + expectMatch: true, + }, + { + resp1: deleteResponse(0, 5), + resp2: unknownResponse(0), + expectMatch: false, + }, + { + resp1: deleteResponse(1, 5), + resp2: unknownResponse(0), + expectMatch: false, + }, + { + resp1: deleteResponse(0, 5), + resp2: unknownResponse(2), + expectMatch: false, + }, + { + resp1: txnResponse(false, 7), + resp2: txnResponse(false, 7), + expectMatch: true, + }, + { + resp1: txnResponse(true, 7), + resp2: txnResponse(false, 7), + expectMatch: false, + }, + { + resp1: txnResponse(false, 7), + resp2: txnResponse(false, 8), + expectMatch: false, + }, + { + resp1: txnResponse(false, 7), + resp2: failedResponse(errors.New("failed request")), + expectMatch: false, + }, + { + resp1: txnResponse(true, 7), + resp2: unknownResponse(7), + expectMatch: true, + }, + { + resp1: txnResponse(false, 7), + resp2: unknownResponse(7), + expectMatch: true, + }, + { + resp1: txnResponse(true, 7), + resp2: unknownResponse(0), + expectMatch: false, + }, + { + resp1: txnResponse(false, 7), + resp2: unknownResponse(0), + expectMatch: false, + }, + } + for i, tc := range tcs { + assert.Equal(t, tc.expectMatch, Match(tc.resp1, tc.resp2), "%d %+v %+v", i, tc.resp1, tc.resp2) + } +} diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index c0c279e81..28fa017bb 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -67,6 +67,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli } for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) { lastRevision = resp.Header.Revision + time := time.Now() for _, event := range resp.Events { var op OperationType switch event.Type { @@ -76,10 +77,13 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli op = Delete } events = append(events, watchEvent{ - Op: op, - Key: string(event.Kv.Key), - Value: string(event.Kv.Value), + Time: time, Revision: event.Kv.ModRevision, + Op: EtcdOperation{ + Type: op, + Key: string(event.Kv.Key), + Value: string(event.Kv.Value), + }, }) } if resp.Err() != nil { @@ -90,8 +94,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli } type watchEvent struct { - Op OperationType - Key string - Value string + Op EtcdOperation Revision int64 + Time time.Time }