mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15078 from serathius/linearizability-patch-operations
tests: Use watch events to patch history to speed up linearization
This commit is contained in:
commit
2b45023364
@ -197,19 +197,23 @@ func getRequest(key string) EtcdRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getResponse(value string, revision int64) EtcdResponse {
|
func getResponse(value string, revision int64) EtcdResponse {
|
||||||
return EtcdResponse{Result: []EtcdOperationResult{{Value: value}}, Revision: revision}
|
return EtcdResponse{OpsResult: []EtcdOperationResult{{Value: value}}, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func failedResponse(err error) EtcdResponse {
|
func failedResponse(err error) EtcdResponse {
|
||||||
return EtcdResponse{Err: err}
|
return EtcdResponse{Err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func unknownResponse(revision int64) EtcdResponse {
|
||||||
|
return EtcdResponse{ResultUnknown: true, Revision: revision}
|
||||||
|
}
|
||||||
|
|
||||||
func putRequest(key, value string) EtcdRequest {
|
func putRequest(key, value string) EtcdRequest {
|
||||||
return EtcdRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}
|
return EtcdRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func putResponse(revision int64) EtcdResponse {
|
func putResponse(revision int64) EtcdResponse {
|
||||||
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
|
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteRequest(key string) EtcdRequest {
|
func deleteRequest(key string) EtcdRequest {
|
||||||
@ -217,7 +221,7 @@ func deleteRequest(key string) EtcdRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func deleteResponse(deleted int64, revision int64) EtcdResponse {
|
func deleteResponse(deleted int64, revision int64) EtcdResponse {
|
||||||
return EtcdResponse{Result: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision}
|
return EtcdResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func txnRequest(key, expectValue, newValue string) EtcdRequest {
|
func txnRequest(key, expectValue, newValue string) EtcdRequest {
|
||||||
@ -229,7 +233,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
|
|||||||
if succeeded {
|
if succeeded {
|
||||||
result = []EtcdOperationResult{{}}
|
result = []EtcdOperationResult{{}}
|
||||||
}
|
}
|
||||||
return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision}
|
return EtcdResponse{OpsResult: result, TxnResult: !succeeded, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
|
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
|
||||||
@ -241,7 +245,7 @@ func leaseGrantRequest(leaseID int64) EtcdRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func leaseGrantResponse(revision int64) EtcdResponse {
|
func leaseGrantResponse(revision int64) EtcdResponse {
|
||||||
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
|
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func leaseRevokeRequest(leaseID int64) EtcdRequest {
|
func leaseRevokeRequest(leaseID int64) EtcdRequest {
|
||||||
@ -249,7 +253,7 @@ func leaseRevokeRequest(leaseID int64) EtcdRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func leaseRevokeResponse(revision int64) EtcdResponse {
|
func leaseRevokeResponse(revision int64) EtcdResponse {
|
||||||
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
|
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
type history struct {
|
type history struct {
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -25,6 +26,7 @@ import (
|
|||||||
|
|
||||||
"github.com/anishathalye/porcupine"
|
"github.com/anishathalye/porcupine"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
@ -102,7 +104,9 @@ func TestLinearizability(t *testing.T) {
|
|||||||
clientCount: 8,
|
clientCount: 8,
|
||||||
traffic: DefaultTraffic,
|
traffic: DefaultTraffic,
|
||||||
})
|
})
|
||||||
validateEventsMatch(t, events)
|
longestHistory, remainingEvents := pickLongestHistory(events)
|
||||||
|
validateEventsMatch(t, longestHistory, remainingEvents)
|
||||||
|
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
|
||||||
checkOperationsAndPersistResults(t, operations, clus)
|
checkOperationsAndPersistResults(t, operations, clus)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -133,6 +137,75 @@ func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProces
|
|||||||
return operations, events
|
return operations, events
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
|
||||||
|
newOperations := make([]porcupine.Operation, 0, len(operations))
|
||||||
|
persisted := map[EtcdOperation]watchEvent{}
|
||||||
|
for _, op := range watchEvents {
|
||||||
|
persisted[op.Op] = op
|
||||||
|
}
|
||||||
|
lastObservedEventTime := watchEvents[len(watchEvents)-1].Time
|
||||||
|
|
||||||
|
for _, op := range operations {
|
||||||
|
resp := op.Output.(EtcdResponse)
|
||||||
|
if resp.Err == nil || op.Call > lastObservedEventTime.UnixNano() {
|
||||||
|
// No need to patch successfully requests and cannot patch requests outside observed window.
|
||||||
|
newOperations = append(newOperations, op)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
event, hasUniqueWriteOperation := matchWatchEvent(op, persisted)
|
||||||
|
if event != nil {
|
||||||
|
// Set revision and time based on watchEvent.
|
||||||
|
op.Return = event.Time.UnixNano()
|
||||||
|
op.Output = EtcdResponse{
|
||||||
|
Revision: event.Revision,
|
||||||
|
ResultUnknown: true,
|
||||||
|
}
|
||||||
|
newOperations = append(newOperations, op)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if hasWriteOperation(op) && !hasUniqueWriteOperation {
|
||||||
|
// Leave operation as it is as we cannot match non-unique operations to watch events.
|
||||||
|
newOperations = append(newOperations, op)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Remove non persisted operations
|
||||||
|
}
|
||||||
|
return newOperations
|
||||||
|
}
|
||||||
|
|
||||||
|
func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) {
|
||||||
|
request := op.Input.(EtcdRequest)
|
||||||
|
for _, etcdOp := range request.Ops {
|
||||||
|
if isWrite(etcdOp.Type) && inUnique(etcdOp.Type) {
|
||||||
|
// We expect all put to be unique as they write unique value.
|
||||||
|
hasUniqueWriteOperation = true
|
||||||
|
opType := etcdOp.Type
|
||||||
|
if opType == PutWithLease {
|
||||||
|
opType = Put
|
||||||
|
}
|
||||||
|
event, ok := watchEvents[EtcdOperation{
|
||||||
|
Type: opType,
|
||||||
|
Key: etcdOp.Key,
|
||||||
|
Value: etcdOp.Value,
|
||||||
|
}]
|
||||||
|
if ok {
|
||||||
|
return &event, hasUniqueWriteOperation
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, hasUniqueWriteOperation
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasWriteOperation(op porcupine.Operation) bool {
|
||||||
|
request := op.Input.(EtcdRequest)
|
||||||
|
for _, etcdOp := range request.Ops {
|
||||||
|
if isWrite(etcdOp.Type) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
||||||
var err error
|
var err error
|
||||||
successes := 0
|
successes := 0
|
||||||
@ -213,20 +286,18 @@ type trafficConfig struct {
|
|||||||
traffic Traffic
|
traffic Traffic
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateEventsMatch(t *testing.T, ops [][]watchEvent) {
|
func pickLongestHistory(ops [][]watchEvent) (longest []watchEvent, rest [][]watchEvent) {
|
||||||
// Move longest history to ops[0]
|
sort.Slice(ops, func(i, j int) bool {
|
||||||
maxLength := len(ops[0])
|
return len(ops[i]) > len(ops[j])
|
||||||
for i := 1; i < len(ops); i++ {
|
})
|
||||||
if len(ops[i]) > maxLength {
|
return ops[0], ops[1:]
|
||||||
maxLength = len(ops[i])
|
}
|
||||||
ops[0], ops[i] = ops[i], ops[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 1; i < len(ops); i++ {
|
func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]watchEvent) {
|
||||||
length := len(ops[i])
|
for i := 0; i < len(other); i++ {
|
||||||
|
length := len(other[i])
|
||||||
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
|
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
|
||||||
if diff := cmp.Diff(ops[0][:length], ops[i][:length]); diff != "" {
|
if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
|
||||||
t.Errorf("Events in watches do not match, %s", diff)
|
t.Errorf("Events in watches do not match, %s", diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,14 @@ const (
|
|||||||
LeaseRevoke OperationType = "leaseRevoke"
|
LeaseRevoke OperationType = "leaseRevoke"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func isWrite(t OperationType) bool {
|
||||||
|
return t == Put || t == Delete || t == PutWithLease || t == LeaseRevoke || t == LeaseGrant
|
||||||
|
}
|
||||||
|
|
||||||
|
func inUnique(t OperationType) bool {
|
||||||
|
return t == Put || t == PutWithLease
|
||||||
|
}
|
||||||
|
|
||||||
type EtcdRequest struct {
|
type EtcdRequest struct {
|
||||||
Conds []EtcdCondition
|
Conds []EtcdCondition
|
||||||
Ops []EtcdOperation
|
Ops []EtcdOperation
|
||||||
@ -55,8 +63,13 @@ type EtcdOperation struct {
|
|||||||
type EtcdResponse struct {
|
type EtcdResponse struct {
|
||||||
Err error
|
Err error
|
||||||
Revision int64
|
Revision int64
|
||||||
TxnFailure bool
|
ResultUnknown bool
|
||||||
Result []EtcdOperationResult
|
TxnResult bool
|
||||||
|
OpsResult []EtcdOperationResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func Match(r1, r2 EtcdResponse) bool {
|
||||||
|
return ((r1.ResultUnknown || r2.ResultUnknown) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdOperationResult struct {
|
type EtcdOperationResult struct {
|
||||||
@ -70,7 +83,6 @@ type EtcdLease struct {
|
|||||||
LeaseID int64
|
LeaseID int64
|
||||||
Keys map[string]struct{}
|
Keys map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type PossibleStates []EtcdState
|
type PossibleStates []EtcdState
|
||||||
|
|
||||||
type EtcdState struct {
|
type EtcdState struct {
|
||||||
@ -131,12 +143,15 @@ func describeEtcdResponse(ops []EtcdOperation, response EtcdResponse) string {
|
|||||||
if response.Err != nil {
|
if response.Err != nil {
|
||||||
return fmt.Sprintf("err: %q", response.Err)
|
return fmt.Sprintf("err: %q", response.Err)
|
||||||
}
|
}
|
||||||
if response.TxnFailure {
|
if response.ResultUnknown {
|
||||||
|
return fmt.Sprintf("unknown, rev: %d", response.Revision)
|
||||||
|
}
|
||||||
|
if response.TxnResult {
|
||||||
return fmt.Sprintf("txn failed, rev: %d", response.Revision)
|
return fmt.Sprintf("txn failed, rev: %d", response.Revision)
|
||||||
}
|
}
|
||||||
respDescription := make([]string, len(response.Result))
|
respDescription := make([]string, len(response.OpsResult))
|
||||||
for i := range response.Result {
|
for i := range response.OpsResult {
|
||||||
respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.Result[i])
|
respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.OpsResult[i])
|
||||||
}
|
}
|
||||||
respDescription = append(respDescription, fmt.Sprintf("rev: %d", response.Revision))
|
respDescription = append(respDescription, fmt.Sprintf("rev: %d", response.Revision))
|
||||||
return strings.Join(respDescription, ", ")
|
return strings.Join(respDescription, ", ")
|
||||||
@ -190,7 +205,7 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
|
|||||||
func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
|
func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
|
||||||
if len(states) == 0 {
|
if len(states) == 0 {
|
||||||
// states were not initialized
|
// states were not initialized
|
||||||
if response.Err != nil {
|
if response.Err != nil || response.ResultUnknown {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
return true, PossibleStates{initState(request, response)}
|
return true, PossibleStates{initState(request, response)}
|
||||||
@ -211,11 +226,11 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
|
|||||||
KeyLeases: map[string]int64{},
|
KeyLeases: map[string]int64{},
|
||||||
Leases: map[int64]EtcdLease{},
|
Leases: map[int64]EtcdLease{},
|
||||||
}
|
}
|
||||||
if response.TxnFailure {
|
if response.TxnResult {
|
||||||
return state
|
return state
|
||||||
}
|
}
|
||||||
for i, op := range request.Ops {
|
for i, op := range request.Ops {
|
||||||
opResp := response.Result[i]
|
opResp := response.OpsResult[i]
|
||||||
switch op.Type {
|
switch op.Type {
|
||||||
case Get:
|
case Get:
|
||||||
if opResp.Value != "" {
|
if opResp.Value != "" {
|
||||||
@ -263,7 +278,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo
|
|||||||
newStates := make(PossibleStates, 0, len(states))
|
newStates := make(PossibleStates, 0, len(states))
|
||||||
for _, s := range states {
|
for _, s := range states {
|
||||||
newState, expectResponse := applyRequestToSingleState(s, request)
|
newState, expectResponse := applyRequestToSingleState(s, request)
|
||||||
if reflect.DeepEqual(expectResponse, response) {
|
if Match(expectResponse, response) {
|
||||||
newStates = append(newStates, newState)
|
newStates = append(newStates, newState)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -280,7 +295,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !success {
|
if !success {
|
||||||
return s, EtcdResponse{Revision: s.Revision, TxnFailure: true}
|
return s, EtcdResponse{Revision: s.Revision, TxnResult: true}
|
||||||
}
|
}
|
||||||
newKVs := map[string]string{}
|
newKVs := map[string]string{}
|
||||||
for k, v := range s.KeyValues {
|
for k, v := range s.KeyValues {
|
||||||
@ -346,7 +361,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
|
|||||||
s.Revision += 1
|
s.Revision += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, EtcdResponse{Result: opResp, Revision: s.Revision}
|
return s, EtcdResponse{OpsResult: opResp, Revision: s.Revision}
|
||||||
}
|
}
|
||||||
|
|
||||||
func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState {
|
func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState {
|
||||||
|
@ -569,6 +569,11 @@ func TestModelDescribe(t *testing.T) {
|
|||||||
resp: failedResponse(errors.New("failed")),
|
resp: failedResponse(errors.New("failed")),
|
||||||
expectDescribe: `put("key4", "4") -> err: "failed"`,
|
expectDescribe: `put("key4", "4") -> err: "failed"`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
req: putRequest("key4b", "4b"),
|
||||||
|
resp: unknownResponse(42),
|
||||||
|
expectDescribe: `put("key4b", "4b") -> unknown, rev: 42`,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
req: deleteRequest("key5"),
|
req: deleteRequest("key5"),
|
||||||
resp: deleteResponse(1, 5),
|
resp: deleteResponse(1, 5),
|
||||||
@ -599,3 +604,150 @@ func TestModelDescribe(t *testing.T) {
|
|||||||
assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp))
|
assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestModelResponseMatch(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
resp1 EtcdResponse
|
||||||
|
resp2 EtcdResponse
|
||||||
|
expectMatch bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: getResponse("a", 1),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: getResponse("b", 1),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: getResponse("a", 2),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: failedResponse(errors.New("failed request")),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: unknownResponse(1),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: getResponse("a", 1),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: putResponse(3),
|
||||||
|
resp2: putResponse(3),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: putResponse(3),
|
||||||
|
resp2: putResponse(4),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: putResponse(3),
|
||||||
|
resp2: failedResponse(errors.New("failed request")),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: putResponse(3),
|
||||||
|
resp2: unknownResponse(3),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: putResponse(3),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: deleteResponse(1, 5),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: deleteResponse(0, 5),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: deleteResponse(1, 6),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: failedResponse(errors.New("failed request")),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: unknownResponse(5),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(0, 5),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(1, 5),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: deleteResponse(0, 5),
|
||||||
|
resp2: unknownResponse(2),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(false, 7),
|
||||||
|
resp2: txnResponse(false, 7),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(true, 7),
|
||||||
|
resp2: txnResponse(false, 7),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(false, 7),
|
||||||
|
resp2: txnResponse(false, 8),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(false, 7),
|
||||||
|
resp2: failedResponse(errors.New("failed request")),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(true, 7),
|
||||||
|
resp2: unknownResponse(7),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(false, 7),
|
||||||
|
resp2: unknownResponse(7),
|
||||||
|
expectMatch: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(true, 7),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
resp1: txnResponse(false, 7),
|
||||||
|
resp2: unknownResponse(0),
|
||||||
|
expectMatch: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, tc := range tcs {
|
||||||
|
assert.Equal(t, tc.expectMatch, Match(tc.resp1, tc.resp2), "%d %+v %+v", i, tc.resp1, tc.resp2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -67,6 +67,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
|
|||||||
}
|
}
|
||||||
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) {
|
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) {
|
||||||
lastRevision = resp.Header.Revision
|
lastRevision = resp.Header.Revision
|
||||||
|
time := time.Now()
|
||||||
for _, event := range resp.Events {
|
for _, event := range resp.Events {
|
||||||
var op OperationType
|
var op OperationType
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
@ -76,10 +77,13 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
|
|||||||
op = Delete
|
op = Delete
|
||||||
}
|
}
|
||||||
events = append(events, watchEvent{
|
events = append(events, watchEvent{
|
||||||
Op: op,
|
Time: time,
|
||||||
|
Revision: event.Kv.ModRevision,
|
||||||
|
Op: EtcdOperation{
|
||||||
|
Type: op,
|
||||||
Key: string(event.Kv.Key),
|
Key: string(event.Kv.Key),
|
||||||
Value: string(event.Kv.Value),
|
Value: string(event.Kv.Value),
|
||||||
Revision: event.Kv.ModRevision,
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if resp.Err() != nil {
|
if resp.Err() != nil {
|
||||||
@ -90,8 +94,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
|
|||||||
}
|
}
|
||||||
|
|
||||||
type watchEvent struct {
|
type watchEvent struct {
|
||||||
Op OperationType
|
Op EtcdOperation
|
||||||
Key string
|
|
||||||
Value string
|
|
||||||
Revision int64
|
Revision int64
|
||||||
|
Time time.Time
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user