Merge pull request #18135 from fuweid/fix-17968

tests/robustness: unlock Delete/LeaseRevoke ops
This commit is contained in:
Marek Siarkowicz 2024-06-07 10:22:53 +02:00 committed by GitHub
commit d9dcf62558
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 124 additions and 66 deletions

View File

@ -16,6 +16,7 @@ package robustness
import (
"context"
"math/rand"
"testing"
"time"
@ -36,6 +37,7 @@ import (
var testRunner = framework.E2eTestRunner
func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
testRunner.TestMain(m)
}

View File

@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"hash/fnv"
"maps"
"reflect"
"sort"
@ -75,6 +76,20 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}
func (s EtcdState) DeepCopy() EtcdState {
newState := EtcdState{Revision: s.Revision}
newState.KeyValues = maps.Clone(s.KeyValues)
newState.KeyLeases = maps.Clone(s.KeyLeases)
newLeases := map[int64]EtcdLease{}
for key, val := range s.Leases {
newLeases[key] = val.DeepCopy()
}
newState.Leases = newLeases
return newState
}
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
@ -86,25 +101,22 @@ func freshEtcdState() EtcdState {
// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
s.KeyValues = newKVs
newState := s.DeepCopy()
switch request.Type {
case Range:
if request.Range.Revision == 0 || request.Range.Revision == s.Revision {
resp := s.getRange(request.Range.RangeOptions)
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}}
if request.Range.Revision == 0 || request.Range.Revision == newState.Revision {
resp := newState.getRange(request.Range.RangeOptions)
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: newState.Revision}}
}
if request.Range.Revision > s.Revision {
return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
if request.Range.Revision > newState.Revision {
return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
case Txn:
failure := false
for _, cond := range request.Txn.Conditions {
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
if val := newState.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
failure = true
break
}
@ -119,27 +131,27 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
switch op.Type {
case RangeOperation:
opResp[i] = EtcdOperationResult{
RangeResponse: s.getRange(op.Range),
RangeResponse: newState.getRange(op.Range),
}
case PutOperation:
_, leaseExists := s.Leases[op.Put.LeaseID]
_, leaseExists := newState.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}
s.KeyValues[op.Put.Key] = ValueRevision{
newState.KeyValues[op.Put.Key] = ValueRevision{
Value: op.Put.Value,
ModRevision: s.Revision + 1,
ModRevision: newState.Revision + 1,
}
increaseRevision = true
s = detachFromOldLease(s, op.Put.Key)
newState = detachFromOldLease(newState, op.Put.Key)
if leaseExists {
s = attachToNewLease(s, op.Put.LeaseID, op.Put.Key)
newState = attachToNewLease(newState, op.Put.LeaseID, op.Put.Key)
}
case DeleteOperation:
if _, ok := s.KeyValues[op.Delete.Key]; ok {
delete(s.KeyValues, op.Delete.Key)
if _, ok := newState.KeyValues[op.Delete.Key]; ok {
delete(newState.KeyValues, op.Delete.Key)
increaseRevision = true
s = detachFromOldLease(s, op.Delete.Key)
newState = detachFromOldLease(newState, op.Delete.Key)
opResp[i].Deleted = 1
}
default:
@ -147,37 +159,37 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
}
}
if increaseRevision {
s.Revision++
newState.Revision++
}
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision}}
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: newState.Revision}}
case LeaseGrant:
lease := EtcdLease{
LeaseID: request.LeaseGrant.LeaseID,
Keys: map[string]struct{}{},
}
s.Leases[request.LeaseGrant.LeaseID] = lease
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseGrant: &LeaseGrantReponse{}}}
newState.Leases[request.LeaseGrant.LeaseID] = lease
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseGrant: &LeaseGrantReponse{}}}
case LeaseRevoke:
//Delete the keys attached to the lease
keyDeleted := false
for key := range s.Leases[request.LeaseRevoke.LeaseID].Keys {
for key := range newState.Leases[request.LeaseRevoke.LeaseID].Keys {
//same as delete.
if _, ok := s.KeyValues[key]; ok {
if _, ok := newState.KeyValues[key]; ok {
if !keyDeleted {
keyDeleted = true
}
delete(s.KeyValues, key)
delete(s.KeyLeases, key)
delete(newState.KeyValues, key)
delete(newState.KeyLeases, key)
}
}
//delete the lease
delete(s.Leases, request.LeaseRevoke.LeaseID)
delete(newState.Leases, request.LeaseRevoke.LeaseID)
if keyDeleted {
s.Revision++
newState.Revision++
}
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
@ -377,6 +389,13 @@ type EtcdLease struct {
Keys map[string]struct{}
}
func (el EtcdLease) DeepCopy() EtcdLease {
return EtcdLease{
LeaseID: el.LeaseID,
Keys: maps.Clone(el.Keys),
}
}
type ValueRevision struct {
Value ValueOrHash
ModRevision int64

View File

@ -16,6 +16,7 @@ package model
import (
"fmt"
"sort"
"strings"
)
@ -63,44 +64,72 @@ func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent
}
func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if request.Type != Txn || response.Error != "" {
if response.Error != "" {
return events
}
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
switch request.Type {
case Txn:
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
events = append(events, e)
}
case PutOperation:
_, leaseExists := prevState.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
case PutOperation:
}
case LeaseRevoke:
deletedKeys := []string{}
for key := range prevState.Leases[request.LeaseRevoke.LeaseID].Keys {
if _, ok := prevState.KeyValues[key]; ok {
deletedKeys = append(deletedKeys, key)
}
}
sort.Strings(deletedKeys)
for _, key := range deletedKeys {
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
Type: DeleteOperation,
Key: key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
return events

View File

@ -172,7 +172,7 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.LeaseRevoke != nil:
return &model.EtcdRequest{
Type: model.LeaseRevoke,
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseGrant.ID},
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseRevoke.ID},
}, nil
case raftReq.LeaseGrant != nil:
return &model.EtcdRequest{

View File

@ -115,10 +115,12 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
return
default:
}
shouldReturn := false
// Avoid multiple failed writes in a row
if lastOperationSucceeded {
choices := t.requests
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueEtcdWrites(choices)
}
requestType = pickRandom(choices)
@ -126,7 +128,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
requestType = Get
}
rev, err := client.Request(ctx, requestType, lastRev)
if requestType == Delete || requestType == LeaseRevoke {
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
lastOperationSucceeded = err == nil

View File

@ -154,15 +154,16 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
} else {
shouldReturn := false
choices := t.writeChoices
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueKubernetesWrites(t.writeChoices)
}
op := pickRandom(choices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
@ -170,6 +171,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
}
}
if err != nil {

View File

@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
}
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}