From 96987d8b5e6a2c1346e526266529d9d3eeef58a7 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 16 Jun 2023 11:05:10 +0200 Subject: [PATCH] tests/robustness: Implement stale reads without validation For now we just validate stale read revision, but not response content. Reason is that etcd model only stores latest version of keys, and no history like real etcd. Validating stale read contents needs to be done outside of model as storing whole history is just to costly for linearization validation. Signed-off-by: Marek Siarkowicz --- tests/robustness/model/describe.go | 9 +- tests/robustness/model/describe_test.go | 5 + tests/robustness/model/deterministic.go | 17 ++- tests/robustness/model/deterministic_test.go | 28 +++++ tests/robustness/model/history.go | 20 ++-- tests/robustness/traffic/client.go | 47 ++++---- tests/robustness/traffic/etcd.go | 106 +++++++++++++------ tests/robustness/traffic/kubernetes.go | 2 +- tests/robustness/traffic/traffic.go | 2 +- tests/robustness/validate/validate.go | 1 + 10 files changed, 164 insertions(+), 73 deletions(-) diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 03da88644..6cf370941 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -44,7 +44,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin func describeEtcdRequest(request EtcdRequest) string { switch request.Type { case Range: - return describeRangeRequest(request.Range.Key, request.Range.RangeOptions) + return describeRangeRequest(request.Range.Key, request.Range.Revision, request.Range.RangeOptions) case Txn: onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess) if len(request.Txn.Conditions) != 0 { @@ -105,7 +105,7 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string { func describeEtcdOperation(op EtcdOperation) string { switch op.Type { case RangeOperation: - return describeRangeRequest(op.Key, op.RangeOptions) + return describeRangeRequest(op.Key, 0, op.RangeOptions) case PutOperation: if op.LeaseID != 0 { return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID) @@ -118,8 +118,11 @@ func describeEtcdOperation(op EtcdOperation) string { } } -func describeRangeRequest(key string, opts RangeOptions) string { +func describeRangeRequest(key string, revision int64, opts RangeOptions) string { kwargs := []string{} + if revision != 0 { + kwargs = append(kwargs, fmt.Sprintf("rev=%d", revision)) + } if opts.Limit != 0 { kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit)) } diff --git a/tests/robustness/model/describe_test.go b/tests/robustness/model/describe_test.go index fdc380b83..66ce0be6b 100644 --- a/tests/robustness/model/describe_test.go +++ b/tests/robustness/model/describe_test.go @@ -134,6 +134,11 @@ func TestModelDescribe(t *testing.T) { resp: rangeResponse(nil, 0, 14), expectDescribe: `range("key14", limit=14) -> [], count: 0, rev: 14`, }, + { + req: staleRangeRequest("key15", true, 0, 15), + resp: rangeResponse(nil, 0, 15), + expectDescribe: `range("key15", rev=15) -> [], count: 0, rev: 15`, + }, } for _, tc := range tcs { assert.Equal(t, tc.expectDescribe, NonDeterministicModel.DescribeOperation(tc.req, tc.resp)) diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index f466cae8b..3a88ab4a5 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -16,6 +16,7 @@ package model import ( "encoding/json" + "errors" "fmt" "hash/fnv" "reflect" @@ -93,8 +94,15 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) { s.KeyValues = newKVs switch request.Type { case Range: - resp := s.getRange(request.Range.Key, request.Range.RangeOptions) - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} + if request.Range.Revision == 0 || request.Range.Revision == s.Revision { + resp := s.getRange(request.Range.Key, request.Range.RangeOptions) + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} + } else { + if request.Range.Revision > s.Revision { + return s, MaybeEtcdResponse{Err: EtcdFutureRevErr} + } + return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}} + } case Txn: failure := false for _, cond := range request.Txn.Conditions { @@ -245,8 +253,7 @@ type EtcdRequest struct { type RangeRequest struct { Key string RangeOptions - // TODO: Implement stale read using revision - revision int64 + Revision int64 } type RangeOptions struct { @@ -304,6 +311,8 @@ type MaybeEtcdResponse struct { Err error } +var EtcdFutureRevErr = errors.New("future rev") + type EtcdResponse struct { Txn *TxnResponse Range *RangeResponse diff --git a/tests/robustness/model/deterministic_test.go b/tests/robustness/model/deterministic_test.go index ecdc00fe0..7a9ff0ab2 100644 --- a/tests/robustness/model/deterministic_test.go +++ b/tests/robustness/model/deterministic_test.go @@ -151,6 +151,34 @@ var commonTestScenarios = []modelTestCase{ {req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 3, 3), expectFailure: true}, }, }, + { + name: "Stale Get doesn't need to match put if asking about old revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 1), resp: getResponse("key", "2", 2, 2)}, + {req: staleGetRequest("key", 1), resp: getResponse("key", "1", 2, 2)}, + }, + }, + { + name: "Stale Get need to match put if asking about matching revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 3, 2), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "2", 2, 2), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)}, + }, + }, + { + name: "Stale Get need to have a proper response revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)}, + {req: putRequest("key", "2"), resp: putResponse(3)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3)}, + }, + }, { name: "Put must increase revision by 1", operations: []testOperation{ diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index bc5e7a040..5d5d40e1b 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -54,16 +54,16 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory { } } -func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end time.Duration, resp *clientv3.GetResponse) { - var revision int64 +func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision int64, start, end time.Duration, resp *clientv3.GetResponse) { + var respRevision int64 if resp != nil && resp.Header != nil { - revision = resp.Header.Revision + respRevision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ ClientId: h.streamId, - Input: rangeRequest(key, withPrefix, 0), + Input: staleRangeRequest(key, withPrefix, 0, revision), Call: start.Nanoseconds(), - Output: rangeResponse(resp.Kvs, resp.Count, revision), + Output: rangeResponse(resp.Kvs, resp.Count, respRevision), Return: end.Nanoseconds(), }) } @@ -340,8 +340,16 @@ func getRequest(key string) EtcdRequest { return rangeRequest(key, false, 0) } +func staleGetRequest(key string, revision int64) EtcdRequest { + return staleRangeRequest(key, false, 0, revision) +} + func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest { - return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}}} + return staleRangeRequest(key, withPrefix, limit, 0) +} + +func staleRangeRequest(key string, withPrefix bool, limit, revision int64) EtcdRequest { + return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}, Revision: revision}} } func emptyGetResponse(revision int64) MaybeEtcdResponse { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 5462f749e..98bc83f51 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -106,22 +106,25 @@ func (r ClientReport) WatchEventCount() int { return count } -func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { - resp, err := c.Range(ctx, key, false) - if err != nil || len(resp.Kvs) == 0 { - return nil, err +func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) { + resp, err := c.Range(ctx, key, false, revision) + if err != nil { + return nil, 0, err } if len(resp.Kvs) == 1 { - return resp.Kvs[0], err + kv = resp.Kvs[0] } - panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs))) + return kv, resp.Header.Revision, nil } -func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) { +func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool, revision int64) (*clientv3.GetResponse, error) { ops := []clientv3.OpOption{} if withPrefix { ops = append(ops, clientv3.WithPrefix()) } + if revision != 0 { + ops = append(ops, clientv3.WithRev(revision)) + } c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) @@ -130,28 +133,28 @@ func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool return nil, err } returnTime := time.Since(c.baseTime) - c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp) + c.operations.AppendRange(key, withPrefix, revision, callTime, returnTime, resp) return resp, nil } -func (c *RecordingClient) Put(ctx context.Context, key, value string) error { +func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) c.operations.AppendPut(key, value, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) Delete(ctx context.Context, key string) error { +func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) c.operations.AppendDelete(key, callTime, returnTime, resp, err) - return nil + return resp, err } func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { @@ -171,31 +174,27 @@ func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, on return resp, err } -func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { +func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) c.operations.AppendLeaseGrant(callTime, returnTime, resp, err) - var leaseId int64 - if resp != nil { - leaseId = int64(resp.ID) - } - return leaseId, err + return resp, err } -func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { +func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) (*clientv3.LeaseRevokeResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { +func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) (*clientv3.PutResponse, error) { opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) c.opMux.Lock() defer c.opMux.Unlock() @@ -203,17 +202,17 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) Defragment(ctx context.Context) error { +func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) c.operations.AppendDefragment(callTime, returnTime, resp, err) - return err + return resp, err } func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan { diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index f9361a2ae..cfce3845c 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -37,8 +37,9 @@ var ( keyCount: 10, leaseTTL: DefaultLeaseTTL, largePutSize: 32769, - operations: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 50}, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 25}, + {choice: StaleGet, weight: 25}, {choice: Put, weight: 23}, {choice: LargePut, weight: 2}, {choice: Delete, weight: 5}, @@ -58,8 +59,9 @@ var ( keyCount: 10, largePutSize: 32769, leaseTTL: DefaultLeaseTTL, - operations: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 50}, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 25}, + {choice: StaleGet, weight: 25}, {choice: Put, weight: 40}, {choice: MultiOpTxn, weight: 5}, {choice: LargePut, weight: 5}, @@ -70,7 +72,7 @@ var ( type etcdTraffic struct { keyCount int - operations []choiceWeight[etcdRequestType] + requests []choiceWeight[etcdRequestType] leaseTTL int64 largePutSize int } @@ -83,6 +85,7 @@ type etcdRequestType string const ( Get etcdRequestType = "get" + StaleGet etcdRequestType = "staleGet" Put etcdRequestType = "put" LargePut etcdRequestType = "largePut" Delete etcdRequestType = "delete" @@ -95,6 +98,8 @@ const ( func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { lastOperationSucceeded := true + var lastRev int64 + var requestType etcdRequestType for { select { case <-ctx.Done(): @@ -105,47 +110,59 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } key := fmt.Sprintf("%d", rand.Int()%t.keyCount) // Avoid multiple failed writes in a row - if !lastOperationSucceeded { - _, err := t.Read(ctx, c, key) - if err != nil { - continue - } - limiter.Wait(ctx) + if lastOperationSucceeded { + requestType = pickRandom(t.requests) + } else { + requestType = Get } - err := t.RandomOperation(ctx, c, limiter, key, ids, lm) + rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev) lastOperationSucceeded = err == nil if err != nil { continue } + if rev != 0 { + lastRev = rev + } limiter.Wait(ctx) } } -func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) (*mvccpb.KeyValue, error) { - getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Get(getCtx, key) - cancel() - return resp, err -} - -func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage) error { +func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) { opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - var err error - switch pickRandom(t.operations) { + switch request { + case StaleGet: + _, rev, err = c.Get(opCtx, key, lastRev) case Get: - _, err = c.Get(opCtx, key) + _, rev, err = c.Get(opCtx, key, 0) case Put: - err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) + var resp *clientv3.PutResponse + resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) + if resp != nil { + rev = resp.Header.Revision + } case LargePut: - err = c.Put(opCtx, key, randString(t.largePutSize)) + var resp *clientv3.PutResponse + resp, err = c.Put(opCtx, key, randString(t.largePutSize)) + if resp != nil { + rev = resp.Header.Revision + } case Delete: - err = c.Delete(opCtx, key) + var resp *clientv3.DeleteResponse + resp, err = c.Delete(opCtx, key) + if resp != nil { + rev = resp.Header.Revision + } case MultiOpTxn: - _, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) + var resp *clientv3.TxnResponse + resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) + if resp != nil { + rev = resp.Header.Revision + } + case CompareAndSet: var kv *mvccpb.KeyValue - kv, err = c.Get(opCtx, key) + kv, rev, err = c.Get(opCtx, key, 0) if err == nil { limiter.Wait(ctx) var expectedRevision int64 @@ -153,13 +170,22 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li expectedRevision = kv.ModRevision } txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) - _, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) + var resp *clientv3.TxnResponse + resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) txnCancel() + if resp != nil { + rev = resp.Header.Revision + } } case PutWithLease: leaseId := lm.LeaseId(c.id) if leaseId == 0 { - leaseId, err = c.LeaseGrant(opCtx, t.leaseTTL) + var resp *clientv3.LeaseGrantResponse + resp, err = c.LeaseGrant(opCtx, t.leaseTTL) + if resp != nil { + leaseId = int64(resp.ID) + rev = resp.ResponseHeader.Revision + } if err == nil { lm.AddLeaseId(c.id, leaseId) limiter.Wait(ctx) @@ -167,25 +193,37 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li } if leaseId != 0 { putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) - err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) + var resp *clientv3.PutResponse + resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) putCancel() + if resp != nil { + rev = resp.Header.Revision + } } case LeaseRevoke: leaseId := lm.LeaseId(c.id) if leaseId != 0 { - err = c.LeaseRevoke(opCtx, leaseId) + var resp *clientv3.LeaseRevokeResponse + resp, err = c.LeaseRevoke(opCtx, leaseId) //if LeaseRevoke has failed, do not remove the mapping. if err == nil { lm.RemoveLeaseId(c.id) } + if resp != nil { + rev = resp.Header.Revision + } } case Defragment: - err = c.Defragment(opCtx) + var resp *clientv3.DefragmentResponse + resp, err = c.Defragment(opCtx) + if resp != nil { + rev = resp.Header.Revision + } default: panic("invalid choice") } cancel() - return err + return rev, err } func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index d3d26d91d..b4f3f9f5e 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -169,7 +169,7 @@ type kubernetesClient struct { } func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) { - resp, err := k.client.Range(ctx, key, true) + resp, err := k.client.Range(ctx, key, true, 0) if err != nil { return nil, err } diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 02dade953..f2f6cbdeb 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -70,7 +70,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 // Ensure that last operation is succeeds time.Sleep(time.Second) - err = cc.Put(ctx, "tombstone", "true") + _, err = cc.Put(ctx, "tombstone", "true") if err != nil { t.Error(err) } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index 7096dcde2..d7a9a806d 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -27,6 +27,7 @@ import ( // ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { validateWatch(t, cfg, reports) + // TODO: Validate stale reads responses. allOperations := operations(reports) watchEvents := uniqueWatchEvents(reports) newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)