mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17759 from serathius/robustness-assumptions
Add explicit checks for assumptions in robustness test validation
This commit is contained in:
commit
2e6eebef85
@ -64,6 +64,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cc.Close()
|
||||
// Ensure that first operation succeeds
|
||||
_, err = cc.Put(ctx, "start", "true")
|
||||
if err != nil {
|
||||
t.Fatalf("First operation failed, validation requires first operation to succeed, err: %s", err)
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
|
||||
for i := 0; i < profile.ClientCount; i++ {
|
||||
@ -85,11 +90,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
wg.Wait()
|
||||
endTime := time.Now()
|
||||
|
||||
// Ensure that last operation is succeeds
|
||||
time.Sleep(time.Second)
|
||||
// Ensure that last operation succeeds
|
||||
_, err = cc.Put(ctx, "tombstone", "true")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatalf("Last operation failed, validation requires last operation to succeed, err: %s", err)
|
||||
}
|
||||
reports = append(reports, cc.Report())
|
||||
|
||||
|
@ -30,6 +30,10 @@ import (
|
||||
|
||||
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
|
||||
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) {
|
||||
err := checkValidationAssumptions(reports)
|
||||
if err != nil {
|
||||
t.Fatalf("Broken validation assumptions: %s", err)
|
||||
}
|
||||
patchedOperations := patchedOperationHistory(reports)
|
||||
linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout)
|
||||
if linearizable != porcupine.Ok {
|
||||
@ -52,6 +56,149 @@ type Config struct {
|
||||
ExpectRevisionUnique bool
|
||||
}
|
||||
|
||||
func checkValidationAssumptions(reports []report.ClientReport) error {
|
||||
err := validatePutOperationUnique(reports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateEmptyDatabaseAtStart(reports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateLastOperationAndObservedInWatch(reports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateObservedAllRevisionsInWatch(reports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateNonConcurrentClientRequests(reports)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validatePutOperationUnique(reports []report.ClientReport) error {
|
||||
type KV struct {
|
||||
Key string
|
||||
Value model.ValueOrHash
|
||||
}
|
||||
putValue := map[KV]struct{}{}
|
||||
for _, r := range reports {
|
||||
for _, op := range r.KeyValue {
|
||||
request := op.Input.(model.EtcdRequest)
|
||||
if request.Type != model.Txn {
|
||||
continue
|
||||
}
|
||||
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
|
||||
if op.Type != model.PutOperation {
|
||||
continue
|
||||
}
|
||||
kv := KV{
|
||||
Key: op.Put.Key,
|
||||
Value: op.Put.Value,
|
||||
}
|
||||
if _, ok := putValue[kv]; ok {
|
||||
return fmt.Errorf("non unique put %v, required to patch operation history", kv)
|
||||
}
|
||||
putValue[kv] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateEmptyDatabaseAtStart(reports []report.ClientReport) error {
|
||||
for _, r := range reports {
|
||||
for _, op := range r.KeyValue {
|
||||
request := op.Input.(model.EtcdRequest)
|
||||
response := op.Output.(model.MaybeEtcdResponse)
|
||||
if response.Revision == 2 && !request.IsRead() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation")
|
||||
}
|
||||
|
||||
func validateLastOperationAndObservedInWatch(reports []report.ClientReport) error {
|
||||
var lastOperation porcupine.Operation
|
||||
|
||||
for _, r := range reports {
|
||||
for _, op := range r.KeyValue {
|
||||
if op.Call > lastOperation.Call {
|
||||
lastOperation = op
|
||||
}
|
||||
}
|
||||
}
|
||||
lastResponse := lastOperation.Output.(model.MaybeEtcdResponse)
|
||||
if lastResponse.PartialResponse || lastResponse.Error != "" {
|
||||
return fmt.Errorf("last operation %v failed, its success is required to validate watch", lastOperation)
|
||||
}
|
||||
for _, r := range reports {
|
||||
for _, watch := range r.Watch {
|
||||
for _, watchResp := range watch.Responses {
|
||||
for _, e := range watchResp.Events {
|
||||
if e.Revision == lastResponse.Revision {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("revision from the last operation %d was not observed in watch, required to validate watch", lastResponse.Revision)
|
||||
}
|
||||
|
||||
func validateObservedAllRevisionsInWatch(reports []report.ClientReport) error {
|
||||
var maxRevision int64
|
||||
for _, r := range reports {
|
||||
for _, watch := range r.Watch {
|
||||
for _, watchResp := range watch.Responses {
|
||||
for _, e := range watchResp.Events {
|
||||
if e.Revision > maxRevision {
|
||||
maxRevision = e.Revision
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
observedRevisions := make([]bool, maxRevision+1)
|
||||
for _, r := range reports {
|
||||
for _, watch := range r.Watch {
|
||||
for _, watchResp := range watch.Responses {
|
||||
for _, e := range watchResp.Events {
|
||||
observedRevisions[e.Revision] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 2; i < len(observedRevisions); i++ {
|
||||
if !observedRevisions[i] {
|
||||
return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNonConcurrentClientRequests(reports []report.ClientReport) error {
|
||||
lastClientRequestReturn := map[int]int64{}
|
||||
for _, r := range reports {
|
||||
for _, op := range r.KeyValue {
|
||||
lastRequest := lastClientRequestReturn[op.ClientId]
|
||||
if op.Call <= lastRequest {
|
||||
return fmt.Errorf("client %d has concurrent request, required for operation linearization", op.ClientId)
|
||||
}
|
||||
if op.Return <= op.Call {
|
||||
return fmt.Errorf("operation %v ends before it starts, required for operation linearization", op)
|
||||
}
|
||||
lastClientRequestReturn[op.ClientId] = op.Return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) {
|
||||
type revisionEvents struct {
|
||||
events []model.PersistedEvent
|
||||
|
Loading…
x
Reference in New Issue
Block a user