Merge pull request #16091 from serathius/robustness-stale-read-1

tests/robustness: Implement stale reads without validation
This commit is contained in:
Marek Siarkowicz 2023-06-19 11:21:48 +02:00 committed by GitHub
commit b7e7811ba4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 164 additions and 73 deletions

View File

@ -44,7 +44,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
func describeEtcdRequest(request EtcdRequest) string { func describeEtcdRequest(request EtcdRequest) string {
switch request.Type { switch request.Type {
case Range: case Range:
return describeRangeRequest(request.Range.Key, request.Range.RangeOptions) return describeRangeRequest(request.Range.Key, request.Range.Revision, request.Range.RangeOptions)
case Txn: case Txn:
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess) onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
if len(request.Txn.Conditions) != 0 { if len(request.Txn.Conditions) != 0 {
@ -105,7 +105,7 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
func describeEtcdOperation(op EtcdOperation) string { func describeEtcdOperation(op EtcdOperation) string {
switch op.Type { switch op.Type {
case RangeOperation: case RangeOperation:
return describeRangeRequest(op.Key, op.RangeOptions) return describeRangeRequest(op.Key, 0, op.RangeOptions)
case PutOperation: case PutOperation:
if op.LeaseID != 0 { if op.LeaseID != 0 {
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID) return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
@ -118,8 +118,11 @@ func describeEtcdOperation(op EtcdOperation) string {
} }
} }
func describeRangeRequest(key string, opts RangeOptions) string { func describeRangeRequest(key string, revision int64, opts RangeOptions) string {
kwargs := []string{} kwargs := []string{}
if revision != 0 {
kwargs = append(kwargs, fmt.Sprintf("rev=%d", revision))
}
if opts.Limit != 0 { if opts.Limit != 0 {
kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit)) kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit))
} }

View File

@ -134,6 +134,11 @@ func TestModelDescribe(t *testing.T) {
resp: rangeResponse(nil, 0, 14), resp: rangeResponse(nil, 0, 14),
expectDescribe: `range("key14", limit=14) -> [], count: 0, rev: 14`, expectDescribe: `range("key14", limit=14) -> [], count: 0, rev: 14`,
}, },
{
req: staleRangeRequest("key15", true, 0, 15),
resp: rangeResponse(nil, 0, 15),
expectDescribe: `range("key15", rev=15) -> [], count: 0, rev: 15`,
},
} }
for _, tc := range tcs { for _, tc := range tcs {
assert.Equal(t, tc.expectDescribe, NonDeterministicModel.DescribeOperation(tc.req, tc.resp)) assert.Equal(t, tc.expectDescribe, NonDeterministicModel.DescribeOperation(tc.req, tc.resp))

View File

@ -16,6 +16,7 @@ package model
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"reflect" "reflect"
@ -93,8 +94,15 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
s.KeyValues = newKVs s.KeyValues = newKVs
switch request.Type { switch request.Type {
case Range: case Range:
resp := s.getRange(request.Range.Key, request.Range.RangeOptions) if request.Range.Revision == 0 || request.Range.Revision == s.Revision {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} resp := s.getRange(request.Range.Key, request.Range.RangeOptions)
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}}
} else {
if request.Range.Revision > s.Revision {
return s, MaybeEtcdResponse{Err: EtcdFutureRevErr}
}
return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}}
}
case Txn: case Txn:
failure := false failure := false
for _, cond := range request.Txn.Conditions { for _, cond := range request.Txn.Conditions {
@ -245,8 +253,7 @@ type EtcdRequest struct {
type RangeRequest struct { type RangeRequest struct {
Key string Key string
RangeOptions RangeOptions
// TODO: Implement stale read using revision Revision int64
revision int64
} }
type RangeOptions struct { type RangeOptions struct {
@ -304,6 +311,8 @@ type MaybeEtcdResponse struct {
Err error Err error
} }
var EtcdFutureRevErr = errors.New("future rev")
type EtcdResponse struct { type EtcdResponse struct {
Txn *TxnResponse Txn *TxnResponse
Range *RangeResponse Range *RangeResponse

View File

@ -151,6 +151,34 @@ var commonTestScenarios = []modelTestCase{
{req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 3, 3), expectFailure: true}, {req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 3, 3), expectFailure: true},
}, },
}, },
{
name: "Stale Get doesn't need to match put if asking about old revision",
operations: []testOperation{
{req: putRequest("key", "1"), resp: putResponse(2)},
{req: staleGetRequest("key", 1), resp: getResponse("key", "2", 2, 2)},
{req: staleGetRequest("key", 1), resp: getResponse("key", "1", 2, 2)},
},
},
{
name: "Stale Get need to match put if asking about matching revision",
operations: []testOperation{
{req: putRequest("key", "1"), resp: putResponse(2)},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 3, 2), expectFailure: true},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true},
{req: staleGetRequest("key", 2), resp: getResponse("key", "2", 2, 2), expectFailure: true},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)},
},
},
{
name: "Stale Get need to have a proper response revision",
operations: []testOperation{
{req: putRequest("key", "1"), resp: putResponse(2)},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)},
{req: putRequest("key", "2"), resp: putResponse(3)},
{req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3)},
},
},
{ {
name: "Put must increase revision by 1", name: "Put must increase revision by 1",
operations: []testOperation{ operations: []testOperation{

View File

@ -54,16 +54,16 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
} }
} }
func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end time.Duration, resp *clientv3.GetResponse) { func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision int64, start, end time.Duration, resp *clientv3.GetResponse) {
var revision int64 var respRevision int64
if resp != nil && resp.Header != nil { if resp != nil && resp.Header != nil {
revision = resp.Header.Revision respRevision = resp.Header.Revision
} }
h.appendSuccessful(porcupine.Operation{ h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId, ClientId: h.streamId,
Input: rangeRequest(key, withPrefix, 0), Input: staleRangeRequest(key, withPrefix, 0, revision),
Call: start.Nanoseconds(), Call: start.Nanoseconds(),
Output: rangeResponse(resp.Kvs, resp.Count, revision), Output: rangeResponse(resp.Kvs, resp.Count, respRevision),
Return: end.Nanoseconds(), Return: end.Nanoseconds(),
}) })
} }
@ -340,8 +340,16 @@ func getRequest(key string) EtcdRequest {
return rangeRequest(key, false, 0) return rangeRequest(key, false, 0)
} }
func staleGetRequest(key string, revision int64) EtcdRequest {
return staleRangeRequest(key, false, 0, revision)
}
func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest { func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest {
return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}}} return staleRangeRequest(key, withPrefix, limit, 0)
}
func staleRangeRequest(key string, withPrefix bool, limit, revision int64) EtcdRequest {
return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}, Revision: revision}}
} }
func emptyGetResponse(revision int64) MaybeEtcdResponse { func emptyGetResponse(revision int64) MaybeEtcdResponse {

View File

@ -106,22 +106,25 @@ func (r ClientReport) WatchEventCount() int {
return count return count
} }
func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) {
resp, err := c.Range(ctx, key, false) resp, err := c.Range(ctx, key, false, revision)
if err != nil || len(resp.Kvs) == 0 { if err != nil {
return nil, err return nil, 0, err
} }
if len(resp.Kvs) == 1 { if len(resp.Kvs) == 1 {
return resp.Kvs[0], err kv = resp.Kvs[0]
} }
panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs))) return kv, resp.Header.Revision, nil
} }
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) { func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool, revision int64) (*clientv3.GetResponse, error) {
ops := []clientv3.OpOption{} ops := []clientv3.OpOption{}
if withPrefix { if withPrefix {
ops = append(ops, clientv3.WithPrefix()) ops = append(ops, clientv3.WithPrefix())
} }
if revision != 0 {
ops = append(ops, clientv3.WithRev(revision))
}
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
@ -130,28 +133,28 @@ func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool
return nil, err return nil, err
} }
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp) c.operations.AppendRange(key, withPrefix, revision, callTime, returnTime, resp)
return resp, nil return resp, nil
} }
func (c *RecordingClient) Put(ctx context.Context, key, value string) error { func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) {
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value) resp, err := c.client.Put(ctx, key, value)
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendPut(key, value, callTime, returnTime, resp, err) c.operations.AppendPut(key, value, callTime, returnTime, resp, err)
return err return resp, err
} }
func (c *RecordingClient) Delete(ctx context.Context, key string) error { func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) {
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
resp, err := c.client.Delete(ctx, key) resp, err := c.client.Delete(ctx, key)
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendDelete(key, callTime, returnTime, resp, err) c.operations.AppendDelete(key, callTime, returnTime, resp, err)
return nil return resp, err
} }
func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
@ -171,31 +174,27 @@ func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, on
return resp, err return resp, err
} }
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Grant(ctx, ttl) resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendLeaseGrant(callTime, returnTime, resp, err) c.operations.AppendLeaseGrant(callTime, returnTime, resp, err)
var leaseId int64 return resp, err
if resp != nil {
leaseId = int64(resp.ID)
}
return leaseId, err
} }
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) (*clientv3.LeaseRevokeResponse, error) {
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
return err return resp, err
} }
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) (*clientv3.PutResponse, error) {
opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
@ -203,17 +202,17 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
resp, err := c.client.Put(ctx, key, value, opts) resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
return err return resp, err
} }
func (c *RecordingClient) Defragment(ctx context.Context) error { func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) {
c.opMux.Lock() c.opMux.Lock()
defer c.opMux.Unlock() defer c.opMux.Unlock()
callTime := time.Since(c.baseTime) callTime := time.Since(c.baseTime)
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
returnTime := time.Since(c.baseTime) returnTime := time.Since(c.baseTime)
c.operations.AppendDefragment(callTime, returnTime, resp, err) c.operations.AppendDefragment(callTime, returnTime, resp, err)
return err return resp, err
} }
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan { func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan {

View File

@ -37,8 +37,9 @@ var (
keyCount: 10, keyCount: 10,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
largePutSize: 32769, largePutSize: 32769,
operations: []choiceWeight[etcdRequestType]{ requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 50}, {choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Put, weight: 23}, {choice: Put, weight: 23},
{choice: LargePut, weight: 2}, {choice: LargePut, weight: 2},
{choice: Delete, weight: 5}, {choice: Delete, weight: 5},
@ -58,8 +59,9 @@ var (
keyCount: 10, keyCount: 10,
largePutSize: 32769, largePutSize: 32769,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
operations: []choiceWeight[etcdRequestType]{ requests: []choiceWeight[etcdRequestType]{
{choice: Get, weight: 50}, {choice: Get, weight: 25},
{choice: StaleGet, weight: 25},
{choice: Put, weight: 40}, {choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5}, {choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5}, {choice: LargePut, weight: 5},
@ -70,7 +72,7 @@ var (
type etcdTraffic struct { type etcdTraffic struct {
keyCount int keyCount int
operations []choiceWeight[etcdRequestType] requests []choiceWeight[etcdRequestType]
leaseTTL int64 leaseTTL int64
largePutSize int largePutSize int
} }
@ -83,6 +85,7 @@ type etcdRequestType string
const ( const (
Get etcdRequestType = "get" Get etcdRequestType = "get"
StaleGet etcdRequestType = "staleGet"
Put etcdRequestType = "put" Put etcdRequestType = "put"
LargePut etcdRequestType = "largePut" LargePut etcdRequestType = "largePut"
Delete etcdRequestType = "delete" Delete etcdRequestType = "delete"
@ -95,6 +98,8 @@ const (
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
lastOperationSucceeded := true lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -105,47 +110,59 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} }
key := fmt.Sprintf("%d", rand.Int()%t.keyCount) key := fmt.Sprintf("%d", rand.Int()%t.keyCount)
// Avoid multiple failed writes in a row // Avoid multiple failed writes in a row
if !lastOperationSucceeded { if lastOperationSucceeded {
_, err := t.Read(ctx, c, key) requestType = pickRandom(t.requests)
if err != nil { } else {
continue requestType = Get
}
limiter.Wait(ctx)
} }
err := t.RandomOperation(ctx, c, limiter, key, ids, lm) rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev)
lastOperationSucceeded = err == nil lastOperationSucceeded = err == nil
if err != nil { if err != nil {
continue continue
} }
if rev != 0 {
lastRev = rev
}
limiter.Wait(ctx) limiter.Wait(ctx)
} }
} }
func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) (*mvccpb.KeyValue, error) { func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
return resp, err
}
func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage) error {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error switch request {
switch pickRandom(t.operations) { case StaleGet:
_, rev, err = c.Get(opCtx, key, lastRev)
case Get: case Get:
_, err = c.Get(opCtx, key) _, rev, err = c.Get(opCtx, key, 0)
case Put: case Put:
err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
if resp != nil {
rev = resp.Header.Revision
}
case LargePut: case LargePut:
err = c.Put(opCtx, key, randString(t.largePutSize)) var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, randString(t.largePutSize))
if resp != nil {
rev = resp.Header.Revision
}
case Delete: case Delete:
err = c.Delete(opCtx, key) var resp *clientv3.DeleteResponse
resp, err = c.Delete(opCtx, key)
if resp != nil {
rev = resp.Header.Revision
}
case MultiOpTxn: case MultiOpTxn:
_, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) var resp *clientv3.TxnResponse
resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil)
if resp != nil {
rev = resp.Header.Revision
}
case CompareAndSet: case CompareAndSet:
var kv *mvccpb.KeyValue var kv *mvccpb.KeyValue
kv, err = c.Get(opCtx, key) kv, rev, err = c.Get(opCtx, key, 0)
if err == nil { if err == nil {
limiter.Wait(ctx) limiter.Wait(ctx)
var expectedRevision int64 var expectedRevision int64
@ -153,13 +170,22 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li
expectedRevision = kv.ModRevision expectedRevision = kv.ModRevision
} }
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
_, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) var resp *clientv3.TxnResponse
resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil)
txnCancel() txnCancel()
if resp != nil {
rev = resp.Header.Revision
}
} }
case PutWithLease: case PutWithLease:
leaseId := lm.LeaseId(c.id) leaseId := lm.LeaseId(c.id)
if leaseId == 0 { if leaseId == 0 {
leaseId, err = c.LeaseGrant(opCtx, t.leaseTTL) var resp *clientv3.LeaseGrantResponse
resp, err = c.LeaseGrant(opCtx, t.leaseTTL)
if resp != nil {
leaseId = int64(resp.ID)
rev = resp.ResponseHeader.Revision
}
if err == nil { if err == nil {
lm.AddLeaseId(c.id, leaseId) lm.AddLeaseId(c.id, leaseId)
limiter.Wait(ctx) limiter.Wait(ctx)
@ -167,25 +193,37 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li
} }
if leaseId != 0 { if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) var resp *clientv3.PutResponse
resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId)
putCancel() putCancel()
if resp != nil {
rev = resp.Header.Revision
}
} }
case LeaseRevoke: case LeaseRevoke:
leaseId := lm.LeaseId(c.id) leaseId := lm.LeaseId(c.id)
if leaseId != 0 { if leaseId != 0 {
err = c.LeaseRevoke(opCtx, leaseId) var resp *clientv3.LeaseRevokeResponse
resp, err = c.LeaseRevoke(opCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping. //if LeaseRevoke has failed, do not remove the mapping.
if err == nil { if err == nil {
lm.RemoveLeaseId(c.id) lm.RemoveLeaseId(c.id)
} }
if resp != nil {
rev = resp.Header.Revision
}
} }
case Defragment: case Defragment:
err = c.Defragment(opCtx) var resp *clientv3.DefragmentResponse
resp, err = c.Defragment(opCtx)
if resp != nil {
rev = resp.Header.Revision
}
default: default:
panic("invalid choice") panic("invalid choice")
} }
cancel() cancel()
return err return rev, err
} }
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {

View File

@ -169,7 +169,7 @@ type kubernetesClient struct {
} }
func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) { func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) {
resp, err := k.client.Range(ctx, key, true) resp, err := k.client.Range(ctx, key, true, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -70,7 +70,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
// Ensure that last operation is succeeds // Ensure that last operation is succeeds
time.Sleep(time.Second) time.Sleep(time.Second)
err = cc.Put(ctx, "tombstone", "true") _, err = cc.Put(ctx, "tombstone", "true")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -27,6 +27,7 @@ import (
// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. // ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatch(t, cfg, reports) validateWatch(t, cfg, reports)
// TODO: Validate stale reads responses.
allOperations := operations(reports) allOperations := operations(reports)
watchEvents := uniqueWatchEvents(reports) watchEvents := uniqueWatchEvents(reports)
newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents) newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)