From 5b84526e9a5ddb55ce3db61d62839bfec8459957 Mon Sep 17 00:00:00 2001 From: Geeta Gharpure Date: Thu, 29 Dec 2022 16:08:36 -0800 Subject: [PATCH] Add support for lease api to linearizability tests Signed-off-by: Geeta Gharpure --- tests/linearizability/client.go | 29 +++++ tests/linearizability/history.go | 81 ++++++++++++ tests/linearizability/lease_ids.go | 53 ++++++++ tests/linearizability/linearizability_test.go | 3 +- tests/linearizability/model.go | 110 +++++++++++++++-- tests/linearizability/model_test.go | 116 ++++++++++++++++++ tests/linearizability/traffic.go | 30 ++++- 7 files changed, 409 insertions(+), 13 deletions(-) create mode 100644 tests/linearizability/lease_ids.go diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index b5a65df78..f4d45f045 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -94,3 +94,32 @@ func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err) return err } + +func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { + callTime := time.Now() + resp, err := c.client.Lease.Grant(ctx, ttl) + returnTime := time.Now() + c.history.AppendLeaseGrant(callTime, returnTime, resp, err) + var leaseId int64 + if resp != nil { + leaseId = int64(resp.ID) + } + return leaseId, err +} + +func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { + callTime := time.Now() + resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) + returnTime := time.Now() + c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) + return err +} + +func (c *recordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { + callTime := time.Now() + opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) + resp, err := c.client.Put(ctx, key, value, opts) + returnTime := time.Now() + c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err) + return err +} diff --git a/tests/linearizability/history.go b/tests/linearizability/history.go index 4ec0f8161..d0a3888a9 100644 --- a/tests/linearizability/history.go +++ b/tests/linearizability/history.go @@ -78,6 +78,67 @@ func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, r }) } +func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) { + request := putWithLeaseRequest(key, value, leaseID) + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: putResponse(revision), + Return: end.UnixNano(), + }) +} + +func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) { + var leaseID int64 + if resp != nil { + leaseID = int64(resp.ID) + } + request := leaseGrantRequest(leaseID) + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.ResponseHeader != nil { + revision = resp.ResponseHeader.Revision + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: leaseGrantResponse(revision), + Return: end.UnixNano(), + }) +} + +func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) { + request := leaseRevokeRequest(id) + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: leaseRevokeResponse(revision), + Return: end.UnixNano(), + }) +} + func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) { request := deleteRequest(key) if err != nil { @@ -171,6 +232,26 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse { return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision} } +func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest { + return EtcdRequest{Ops: []EtcdOperation{{Type: PutWithLease, Key: key, Value: value, LeaseID: leaseID}}} +} + +func leaseGrantRequest(leaseID int64) EtcdRequest { + return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseGrant, LeaseID: leaseID}}} +} + +func leaseGrantResponse(revision int64) EtcdResponse { + return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision} +} + +func leaseRevokeRequest(leaseID int64) EtcdRequest { + return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseRevoke, LeaseID: leaseID}}} +} + +func leaseRevokeResponse(revision int64) EtcdResponse { + return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision} +} + type history struct { successful []porcupine.Operation // failed requests are kept separate as we don't know return time of failed operations. diff --git a/tests/linearizability/lease_ids.go b/tests/linearizability/lease_ids.go new file mode 100644 index 000000000..0a15da793 --- /dev/null +++ b/tests/linearizability/lease_ids.go @@ -0,0 +1,53 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "sync" +) + +type clientId2LeaseIdMapper interface { + LeaseId(int) int64 + AddLeaseId(int, int64) + RemoveLeaseId(int) +} + +func newClientId2LeaseIdMapper() clientId2LeaseIdMapper { + return &atomicClientId2LeaseIdMapper{m: map[int]int64{}} +} + +type atomicClientId2LeaseIdMapper struct { + sync.RWMutex + // m is used to store clientId to leaseId mapping. + m map[int]int64 +} + +func (lm *atomicClientId2LeaseIdMapper) LeaseId(clientId int) int64 { + lm.RLock() + defer lm.RUnlock() + return lm.m[clientId] +} + +func (lm *atomicClientId2LeaseIdMapper) AddLeaseId(clientId int, leaseId int64) { + lm.Lock() + defer lm.Unlock() + lm.m[clientId] = leaseId +} + +func (lm *atomicClientId2LeaseIdMapper) RemoveLeaseId(clientId int) { + lm.Lock() + defer lm.Unlock() + delete(lm.m, clientId) +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 405ae21a1..5b559a628 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -156,6 +156,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu endpoints := clus.EndpointsV3() ids := newIdProvider() + lm := newClientId2LeaseIdMapper() h := history{} limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) @@ -172,7 +173,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu defer wg.Done() defer c.Close() - config.traffic.Run(ctx, c, limiter, ids) + config.traffic.Run(ctx, c, limiter, ids, lm) mux.Lock() h = h.Merge(c.history.history) mux.Unlock() diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index 6a01180a4..2309eeaa5 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -26,10 +26,13 @@ import ( type OperationType string const ( - Get OperationType = "get" - Put OperationType = "put" - Delete OperationType = "delete" - Txn OperationType = "txn" + Get OperationType = "get" + Put OperationType = "put" + Delete OperationType = "delete" + Txn OperationType = "txn" + PutWithLease OperationType = "putWithLease" + LeaseGrant OperationType = "leaseGrant" + LeaseRevoke OperationType = "leaseRevoke" ) type EtcdRequest struct { @@ -43,9 +46,10 @@ type EtcdCondition struct { } type EtcdOperation struct { - Type OperationType - Key string - Value string + Type OperationType + Key string + Value string + LeaseID int64 } type EtcdResponse struct { @@ -60,11 +64,20 @@ type EtcdOperationResult struct { Deleted int64 } +var leased = struct{}{} + +type EtcdLease struct { + LeaseID int64 + Keys map[string]struct{} +} + type PossibleStates []EtcdState type EtcdState struct { Revision int64 KeyValues map[string]string + KeyLeases map[string]int64 + Leases map[int64]EtcdLease } var etcdModel = porcupine.Model{ @@ -139,6 +152,12 @@ func describeEtcdOperation(op EtcdOperation) string { return fmt.Sprintf("delete(%q)", op.Key) case Txn: return "" + case LeaseGrant: + return fmt.Sprintf("leaseGrant(%d)", op.LeaseID) + case LeaseRevoke: + return fmt.Sprintf("leaseRevoke(%d)", op.LeaseID) + case PutWithLease: + return fmt.Sprintf("putWithLease(%q, %q, %d)", op.Key, op.Value, op.LeaseID) default: return fmt.Sprintf("", op.Type) } @@ -157,6 +176,12 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s return fmt.Sprintf("deleted: %d", resp.Deleted) case Txn: return "" + case LeaseGrant: + return fmt.Sprintf("ok") + case LeaseRevoke: + return fmt.Sprintf("ok") + case PutWithLease: + return fmt.Sprintf("ok") default: return fmt.Sprintf("", op) } @@ -183,6 +208,8 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState { state := EtcdState{ Revision: response.Revision, KeyValues: map[string]string{}, + KeyLeases: map[string]int64{}, + Leases: map[int64]EtcdLease{}, } if response.TxnFailure { return state @@ -197,6 +224,24 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState { case Put: state.KeyValues[op.Key] = op.Value case Delete: + case PutWithLease: + if _, ok := state.Leases[op.LeaseID]; ok { + state.KeyValues[op.Key] = op.Value + //detach from old lease id but we dont expect that at init + if _, ok := state.KeyLeases[op.Key]; ok { + panic("old lease id found at init") + } + //attach to new lease id + state.KeyLeases[op.Key] = op.LeaseID + state.Leases[op.LeaseID].Keys[op.Key] = leased + } + case LeaseGrant: + lease := EtcdLease{ + LeaseID: op.LeaseID, + Keys: map[string]struct{}{}, + } + state.Leases[op.LeaseID] = lease + case LeaseRevoke: default: panic("Unknown operation") } @@ -244,6 +289,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc s.KeyValues = newKVs opResp := make([]EtcdOperationResult, len(request.Ops)) increaseRevision := false + for i, op := range request.Ops { switch op.Type { case Get: @@ -251,18 +297,68 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc case Put: s.KeyValues[op.Key] = op.Value increaseRevision = true + s = detachFromOldLease(s, op) case Delete: if _, ok := s.KeyValues[op.Key]; ok { delete(s.KeyValues, op.Key) increaseRevision = true + s = detachFromOldLease(s, op) opResp[i].Deleted = 1 } + case PutWithLease: + if _, ok := s.Leases[op.LeaseID]; ok { + //handle put op. + s.KeyValues[op.Key] = op.Value + increaseRevision = true + s = detachFromOldLease(s, op) + s = attachToNewLease(s, op) + } + case LeaseRevoke: + //Delete the keys attached to the lease + keyDeleted := false + for key, _ := range s.Leases[op.LeaseID].Keys { + //same as delete. + if _, ok := s.KeyValues[key]; ok { + if !keyDeleted { + keyDeleted = true + } + delete(s.KeyValues, key) + delete(s.KeyLeases, key) + } + } + //delete the lease + delete(s.Leases, op.LeaseID) + if keyDeleted { + increaseRevision = true + } + case LeaseGrant: + lease := EtcdLease{ + LeaseID: op.LeaseID, + Keys: map[string]struct{}{}, + } + s.Leases[op.LeaseID] = lease default: panic("unsupported operation") } } + if increaseRevision { s.Revision += 1 } + return s, EtcdResponse{Result: opResp, Revision: s.Revision} } + +func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState { + if oldLeaseId, ok := s.KeyLeases[op.Key]; ok { + delete(s.Leases[oldLeaseId].Keys, op.Key) + delete(s.KeyLeases, op.Key) + } + return s +} + +func attachToNewLease(s EtcdState, op EtcdOperation) EtcdState { + s.KeyLeases[op.Key] = op.LeaseID + s.Leases[op.LeaseID].Keys[op.Key] = leased + return s +} diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go index 067046cbd..9a82ff611 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model_test.go @@ -402,6 +402,122 @@ func TestModelStep(t *testing.T) { {req: txnRequest("key", "8", "10"), resp: txnResponse(false, 9)}, }, }, + { + name: "Put with valid lease id should succeed. Put with invalid lease id should fail", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3), failure: true}, + {req: getRequest("key"), resp: getResponse("2", 2)}, + }, + }, + { + name: "Put with valid lease id should succeed. Put with expired lease id should fail", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: getRequest("key"), resp: getResponse("2", 2)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, + {req: putWithLeaseRequest("key", "4", 1), resp: putResponse(4), failure: true}, + {req: getRequest("key"), resp: getResponse("", 3)}, + }, + }, + { + name: "Revoke should increment the revision", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, + {req: getRequest("key"), resp: getResponse("", 3)}, + }, + }, + { + name: "Put following a PutWithLease will detach the key from the lease", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: putRequest("key", "3"), resp: putResponse(3)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, + {req: getRequest("key"), resp: getResponse("3", 3)}, + }, + }, + { + name: "Change lease. Revoking older lease should not increment revision", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: leaseGrantRequest(2), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, + {req: getRequest("key"), resp: getResponse("3", 3)}, + {req: leaseRevokeRequest(2), resp: leaseRevokeResponse(4)}, + {req: getRequest("key"), resp: getResponse("", 4)}, + }, + }, + { + name: "Update key with same lease", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key", "3", 1), resp: putResponse(3)}, + {req: getRequest("key"), resp: getResponse("3", 3)}, + }, + }, + { + name: "Deleting a leased key - revoke should not increment revision", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)}, + {req: deleteRequest("key"), resp: deleteResponse(1, 3)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(4), failure: true}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, + }, + }, + { + name: "Lease a few keys - revoke should increment revision only once", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)}, + {req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)}, + {req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(6)}, + }, + }, + { + name: "Lease some keys then delete some of them. Revoke should increment revision since some keys were still leased", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)}, + {req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)}, + {req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)}, + {req: deleteRequest("key1"), resp: deleteResponse(1, 6)}, + {req: deleteRequest("key3"), resp: deleteResponse(1, 7)}, + {req: deleteRequest("key4"), resp: deleteResponse(1, 8)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)}, + {req: deleteRequest("key2"), resp: deleteResponse(0, 9)}, + {req: getRequest("key1"), resp: getResponse("", 9)}, + {req: getRequest("key2"), resp: getResponse("", 9)}, + {req: getRequest("key3"), resp: getResponse("", 9)}, + {req: getRequest("key4"), resp: getResponse("", 9)}, + }, + }, + { + name: "Lease some keys then delete all of them. Revoke should not increment", + operations: []testOperation{ + {req: leaseGrantRequest(1), resp: leaseGrantResponse(1)}, + {req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)}, + {req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)}, + {req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)}, + {req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)}, + {req: deleteRequest("key1"), resp: deleteResponse(1, 6)}, + {req: deleteRequest("key2"), resp: deleteResponse(1, 7)}, + {req: deleteRequest("key3"), resp: deleteResponse(1, 8)}, + {req: deleteRequest("key4"), resp: deleteResponse(1, 9)}, + {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)}, + }, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index e4ab5410d..8c7b1f969 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -26,16 +26,18 @@ import ( ) var ( - DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, writes: []opChance{{operation: Put, chance: 60}, {operation: Delete, chance: 20}, {operation: Txn, chance: 20}}} + DefaultLeaseTTL int64 = 7200 + DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: Put, chance: 50}, {operation: Delete, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, {operation: Txn, chance: 20}}} ) type Traffic interface { - Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) + Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) } type readWriteSingleKey struct { keyCount int writes []opChance + leaseTTL int64 } type opChance struct { @@ -43,7 +45,7 @@ type opChance struct { chance int } -func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) { +func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) { for { select { @@ -58,7 +60,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter continue } // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), resp) + t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, c.history.id, resp) } } @@ -72,7 +74,7 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite return resp, err } -func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lastValues []*mvccpb.KeyValue) error { +func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm clientId2LeaseIdMapper, cid int, lastValues []*mvccpb.KeyValue) error { putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) var err error @@ -87,6 +89,24 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit expectValue = string(lastValues[0].Value) } err = c.Txn(putCtx, key, expectValue, newValue) + case PutWithLease: + leaseId := lm.LeaseId(cid) + if leaseId == 0 { + leaseId, err = c.LeaseGrant(ctx, t.leaseTTL) + lm.AddLeaseId(cid, leaseId) + } + if leaseId != 0 { + err = c.PutWithLease(putCtx, key, newValue, leaseId) + } + case LeaseRevoke: + leaseId := lm.LeaseId(cid) + if leaseId != 0 { + err = c.LeaseRevoke(putCtx, leaseId) + //if LeaseRevoke has failed, do not remove the mapping. + if err == nil { + lm.RemoveLeaseId(cid) + } + } default: panic("invalid operation") }