Merge pull request #17731 from serathius/robustness-wal-validate-watch

Robustness wal validate watch
This commit is contained in:
Marek Siarkowicz 2024-04-26 08:37:33 +02:00 committed by GitHub
commit b36d9b2156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 447 additions and 394 deletions

View File

@ -19,82 +19,91 @@ import (
"strings"
)
func NewReplay(eventHistory []PersistedEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
func NewReplay(persistedRequests []EtcdRequest) *EtcdReplay {
state := freshEtcdState()
// Padding for index 0 and 1, so index matches revision..
revisionToEtcdState := []EtcdState{state, state}
var events []PersistedEvent
for _, request := range persistedRequests {
newState, response := state.Step(request)
if state.Revision != newState.Revision {
revisionToEtcdState = append(revisionToEtcdState, newState)
}
lastEventRevision = event.Revision
for _, e := range toWatchEvents(&state, request, response) {
events = append(events, e)
}
state = newState
}
return &EtcdReplay{
eventHistory: eventHistory,
revisionToEtcdState: revisionToEtcdState,
Events: events,
}
}
type EtcdReplay struct {
eventHistory []PersistedEvent
// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
revisionToEtcdState []EtcdState
Events []PersistedEvent
}
func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
if int(revision) >= len(r.revisionToEtcdState) {
return EtcdState{}, fmt.Errorf("requested revision %d, higher than observed in replay %d", revision, len(r.revisionToEtcdState)-1)
}
return r.revisionToEtcdState[revision], nil
}
for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent) {
for _, e := range r.Events {
if e.Revision < watch.Revision || !e.Match(watch) {
continue
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
events = append(events, e)
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
}
return *r.cachedState, nil
return events
}
func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
}
func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for index < len(r.eventHistory) && r.eventHistory[index].Revision == revision {
event := r.eventHistory[index]
switch event.Type {
case PutOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Put: PutOptions{Key: event.Key, Value: event.Value},
})
func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if request.Type != Txn || response.Error != "" {
return events
}
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
operations = append(operations, EtcdOperation{
Type: event.Type,
Delete: DeleteOptions{Key: event.Key},
})
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
events = append(events, e)
}
case PutOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
return events
}
type WatchEvent struct {

View File

@ -181,7 +181,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.Compaction != nil:
return nil, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{}
txn := model.TxnRequest{
Conditions: []model.EtcdCondition{},
OperationsOnSuccess: []model.EtcdOperation{},
OperationsOnFailure: []model.EtcdOperation{},
}
for _, cmp := range raftReq.Txn.Compare {
txn.Conditions = append(txn.Conditions, model.EtcdCondition{
Key: string(cmp.Key),

View File

@ -52,7 +52,7 @@ func validateLinearizableOperationsAndVisualize(lg *zap.Logger, operations []por
}
}
func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, totalEventHistory []model.PersistedEvent) {
func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, persistedRequests []model.EtcdRequest) {
lg.Info("Validating serializable operations")
staleReads := filterSerializableReads(operations)
if len(staleReads) == 0 {
@ -61,7 +61,7 @@ func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []p
sort.Slice(staleReads, func(i, j int) bool {
return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision
})
replay := model.NewReplay(totalEventHistory)
replay := model.NewReplay(persistedRequests)
for _, read := range staleReads {
request := read.Input.(model.EtcdRequest)
response := read.Output.(model.MaybeEtcdResponse)

View File

@ -15,13 +15,12 @@
package validate
import (
"encoding/json"
"fmt"
"sort"
"testing"
"time"
"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.etcd.io/etcd/tests/v3/robustness/model"
@ -30,7 +29,7 @@ import (
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest, timeout time.Duration) (visualize func(basepath string) error) {
err := checkValidationAssumptions(reports)
err := checkValidationAssumptions(reports, persistedRequests)
if err != nil {
t.Fatalf("Broken validation assumptions: %s", err)
}
@ -40,21 +39,12 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report
t.Error("Failed linearization, skipping further validation")
return visualize
}
// TODO: Don't use watch events to get event history.
eventHistory, err := mergeWatchEventHistory(reports)
if err != nil {
t.Errorf("Failed merging watch history to create event history, err: %s", err)
err = validateWatch(lg, cfg, reports, nil)
if err != nil {
t.Errorf("Failed validating watch history, err: %s", err)
}
return visualize
}
err = validateWatch(lg, cfg, reports, eventHistory)
// TODO: Use requests from linearization instead of persisted requests from WAL.
err = validateWatch(lg, cfg, reports, persistedRequests)
if err != nil {
t.Errorf("Failed validating watch history, err: %s", err)
}
validateSerializableOperations(t, lg, patchedOperations, eventHistory)
validateSerializableOperations(t, lg, patchedOperations, persistedRequests)
return visualize
}
@ -62,7 +52,7 @@ type Config struct {
ExpectRevisionUnique bool
}
func checkValidationAssumptions(reports []report.ClientReport) error {
func checkValidationAssumptions(reports []report.ClientReport, persistedRequests []model.EtcdRequest) error {
err := validatePutOperationUnique(reports)
if err != nil {
return err
@ -71,11 +61,8 @@ func checkValidationAssumptions(reports []report.ClientReport) error {
if err != nil {
return err
}
err = validateLastOperationAndObservedInWatch(reports)
if err != nil {
return err
}
err = validateObservedAllRevisionsInWatch(reports)
err = validatePersistedRequestMatchClientRequests(reports, persistedRequests)
if err != nil {
return err
}
@ -129,60 +116,48 @@ func validateEmptyDatabaseAtStart(reports []report.ClientReport) error {
return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation")
}
func validateLastOperationAndObservedInWatch(reports []report.ClientReport) error {
var lastOperation porcupine.Operation
func validatePersistedRequestMatchClientRequests(reports []report.ClientReport, persistedRequests []model.EtcdRequest) error {
persistedRequestSet := map[string]model.EtcdRequest{}
for _, request := range persistedRequests {
data, err := json.Marshal(request)
if err != nil {
return err
}
persistedRequestSet[string(data)] = request
}
clientRequests := map[string]porcupine.Operation{}
for _, r := range reports {
for _, op := range r.KeyValue {
if op.Call > lastOperation.Call {
lastOperation = op
request := op.Input.(model.EtcdRequest)
data, err := json.Marshal(request)
if err != nil {
return err
}
clientRequests[string(data)] = op
}
}
lastResponse := lastOperation.Output.(model.MaybeEtcdResponse)
if lastResponse.PartialResponse || lastResponse.Error != "" {
return fmt.Errorf("last operation %v failed, its success is required to validate watch", lastOperation)
}
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision == lastResponse.Revision {
return nil
}
}
}
}
}
return fmt.Errorf("revision from the last operation %d was not observed in watch, required to validate watch", lastResponse.Revision)
}
func validateObservedAllRevisionsInWatch(reports []report.ClientReport) error {
var maxRevision int64
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision > maxRevision {
maxRevision = e.Revision
}
}
}
for requestDump, request := range persistedRequestSet {
_, found := clientRequests[requestDump]
// We cannot validate if persisted leaseGrant was sent by client as failed leaseGrant will not return LeaseID to clients.
if request.Type == model.LeaseGrant {
continue
}
if !found {
return fmt.Errorf("request %+v was not sent by client, required to validate", requestDump)
}
}
observedRevisions := make([]bool, maxRevision+1)
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
observedRevisions[e.Revision] = true
}
}
for requestDump, op := range clientRequests {
request := op.Input.(model.EtcdRequest)
response := op.Output.(model.MaybeEtcdResponse)
if response.Error != "" || request.IsRead() {
continue
}
}
for i := 2; i < len(observedRevisions); i++ {
if !observedRevisions[i] {
return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i)
_, found := persistedRequestSet[requestDump]
if !found {
return fmt.Errorf("succesful client write %+v was not persisted, required to validate", requestDump)
}
}
return nil
@ -204,59 +179,3 @@ func validateNonConcurrentClientRequests(reports []report.ClientReport) error {
}
return nil
}
func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) {
type revisionEvents struct {
events []model.PersistedEvent
revision int64
clientID int
}
revisionToEvents := map[int64]revisionEvents{}
var lastClientID = 0
var lastRevision int64
events := []model.PersistedEvent{}
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
for _, event := range resp.Events {
if event.Revision == lastRevision && lastClientID == r.ClientID {
events = append(events, event.PersistedEvent)
} else {
if prev, found := revisionToEvents[lastRevision]; found {
// This assumes that there are txn that would be observed differently by two watches.
// TODO: Implement merging events from multiple watches about single revision based on operations.
if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff)
}
} else {
revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision}
}
lastClientID = r.ClientID
lastRevision = event.Revision
events = []model.PersistedEvent{event.PersistedEvent}
}
}
}
}
}
if prev, found := revisionToEvents[lastRevision]; found {
if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff)
}
} else {
revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision}
}
var allRevisionEvents []revisionEvents
for _, revEvents := range revisionToEvents {
allRevisionEvents = append(allRevisionEvents, revEvents)
}
sort.Slice(allRevisionEvents, func(i, j int) bool {
return allRevisionEvents[i].revision < allRevisionEvents[j].revision
})
var eventHistory []model.PersistedEvent
for _, revEvents := range allRevisionEvents {
eventHistory = append(eventHistory, revEvents.events...)
}
return eventHistory, nil
}

View File

@ -61,11 +61,11 @@ func TestDataReports(t *testing.T) {
func TestValidateWatch(t *testing.T) {
tcs := []struct {
name string
config Config
reports []report.ClientReport
eventHistory []model.PersistedEvent
expectError string
name string
config Config
reports []report.ClientReport
persistedRequests []model.EtcdRequest
expectError string
}{
{
name: "Ordered, Unique - ordered unique events in one response - pass",
@ -88,6 +88,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
},
{
name: "Ordered, Unique - unique ordered events in separate response - pass",
@ -114,6 +118,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
},
{
name: "Ordered - unordered events in one response - fail",
@ -136,6 +144,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeOrdered.Error(),
},
{
@ -163,6 +175,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeOrdered.Error(),
},
{
@ -197,6 +213,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
},
{
name: "Unique - duplicated events in one response - fail",
@ -219,6 +239,9 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: errBrokeUnique.Error(),
},
{
@ -247,6 +270,9 @@ func TestValidateWatch(t *testing.T) {
},
},
expectError: errBrokeUnique.Error(),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
},
{
name: "Unique - duplicated events in watch requests - pass",
@ -272,7 +298,7 @@ func TestValidateWatch(t *testing.T) {
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("a", "2", 2, true),
putWatchEvent("a", "1", 2, true),
},
},
},
@ -280,6 +306,9 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
},
{
name: "Unique, Atomic - duplicated revision in one response - pass",
@ -302,6 +331,35 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
{
Type: model.Txn,
LeaseGrant: nil,
LeaseRevoke: nil,
Range: nil,
Txn: &model.TxnRequest{
Conditions: nil,
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "a",
Value: model.ToValueOrHash("1"),
},
},
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "b",
Value: model.ToValueOrHash("2"),
},
},
},
OperationsOnFailure: nil,
},
Defragment: nil,
},
},
},
{
name: "Unique - duplicated revision in separate watch request - pass",
@ -335,6 +393,9 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
},
{
name: "Unique revision - duplicated revision in one response - fail",
@ -358,6 +419,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeUnique.Error(),
},
{
@ -382,6 +447,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeUnique.Error(),
},
{
@ -409,6 +478,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeAtomic.Error(),
},
{
@ -438,10 +511,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -474,10 +547,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -499,10 +572,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -525,10 +598,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -564,10 +637,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -593,10 +666,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -631,10 +704,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -687,10 +760,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -719,10 +792,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeBookmarkable.Error(),
},
@ -756,10 +829,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeBookmarkable.Error(),
},
@ -786,8 +859,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{},
expectError: errBrokeBookmarkable.Error(),
persistedRequests: []model.EtcdRequest{},
expectError: errBrokeBookmarkable.Error(),
},
{
name: "Bookmarkable - progress notification lower than watch request - pass",
@ -809,8 +882,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
},
{
@ -836,7 +909,7 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{},
persistedRequests: []model.EtcdRequest{},
},
{
name: "Reliable - missing event before bookmark - fail",
@ -863,10 +936,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -894,10 +967,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
putRequest("c", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -926,10 +999,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("aa", "1", 2, true),
putPersistedEvent("ab", "2", 3, true),
putPersistedEvent("cc", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("aa", "1"),
putRequest("ab", "2"),
putRequest("cc", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -955,10 +1028,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: "",
},
@ -982,8 +1055,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: "",
},
@ -1008,8 +1081,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: "",
},
@ -1032,8 +1105,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: "",
},
@ -1061,8 +1134,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: "",
},
@ -1086,8 +1159,8 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
},
expectError: errBrokeReliable.Error(),
},
@ -1112,10 +1185,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeReliable.Error(),
},
@ -1140,10 +1213,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("ab", "2", 3, true),
putPersistedEvent("a", "3", 4, false),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("ab", "2"),
putRequest("a", "3"),
},
},
{
@ -1168,10 +1241,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("aa", "1", 2, true),
putPersistedEvent("bb", "2", 3, true),
putPersistedEvent("ac", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("aa", "1"),
putRequest("bb", "2"),
putRequest("ac", "3"),
},
},
{
@ -1195,10 +1268,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -1222,10 +1295,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -1254,10 +1327,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -1282,10 +1355,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -1309,10 +1382,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
},
{
@ -1338,10 +1411,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("bb", "2", 3, true),
putPersistedEvent("bc", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("bb", "2"),
putRequest("bc", "3"),
},
},
{
@ -1365,10 +1438,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("c", "3"),
},
expectError: errBrokeResumable.Error(),
},
@ -1385,7 +1458,7 @@ func TestValidateWatch(t *testing.T) {
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("b", "3", 4, true),
putWatchEvent("b", "3", 4, false),
},
},
},
@ -1393,10 +1466,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("b", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
putRequest("b", "3"),
},
expectError: errBrokeResumable.Error(),
},
@ -1422,10 +1495,10 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("bb", "2", 3, true),
putPersistedEvent("bc", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("bb", "2"),
putRequest("bc", "3"),
},
expectError: errBrokeResumable.Error(),
},
@ -1452,11 +1525,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
},
{
@ -1482,11 +1555,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokeIsCreate.Error(),
},
@ -1513,11 +1586,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokeIsCreate.Error(),
},
@ -1545,11 +1618,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
},
{
@ -1576,11 +1649,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
},
{
@ -1607,11 +1680,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokePrevKV.Error(),
},
@ -1639,11 +1712,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokePrevKV.Error(),
},
@ -1671,11 +1744,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokePrevKV.Error(),
},
@ -1703,11 +1776,11 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("a", "2", 3, false),
deletePersistedEvent("a", 4),
putPersistedEvent("a", "4", 5, true),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("a", "2"),
deleteRequest("a"),
putRequest("a", "4"),
},
expectError: errBrokePrevKV.Error(),
},
@ -1725,7 +1798,6 @@ func TestValidateWatch(t *testing.T) {
Events: []model.WatchEvent{
putWatchEvent("a", "1", 2, true),
putWatchEvent("b", "2", 3, true),
putWatchEvent("a", "3", 4, false),
},
},
},
@ -1733,10 +1805,9 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("a", "3", 4, false),
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeFilter.Error(),
},
@ -1763,17 +1834,17 @@ func TestValidateWatch(t *testing.T) {
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("aa", "1", 2, true),
putPersistedEvent("bb", "2", 3, true),
putPersistedEvent("ac", "3", 4, true),
persistedRequests: []model.EtcdRequest{
putRequest("aa", "1"),
putRequest("bb", "2"),
putRequest("ac", "3"),
},
expectError: errBrokeFilter.Error(),
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
err := validateWatch(zaptest.NewLogger(t), tc.config, tc.reports, tc.eventHistory)
err := validateWatch(zaptest.NewLogger(t), tc.config, tc.reports, tc.persistedRequests)
var errStr string
if err != nil {
errStr = err.Error()
@ -1838,3 +1909,48 @@ func deletePersistedEvent(key string, rev int64) model.PersistedEvent {
Revision: rev,
}
}
func putRequest(key, value string) model.EtcdRequest {
return model.EtcdRequest{
Type: model.Txn,
LeaseGrant: nil,
LeaseRevoke: nil,
Range: nil,
Txn: &model.TxnRequest{
Conditions: nil,
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: key,
Value: model.ToValueOrHash(value),
},
},
},
OperationsOnFailure: nil,
},
Defragment: nil,
}
}
func deleteRequest(key string) model.EtcdRequest {
return model.EtcdRequest{
Type: model.Txn,
LeaseGrant: nil,
LeaseRevoke: nil,
Range: nil,
Txn: &model.TxnRequest{
Conditions: nil,
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.DeleteOperation,
Delete: model.DeleteOptions{
Key: key,
},
},
},
OperationsOnFailure: nil,
},
Defragment: nil,
}
}

View File

@ -37,9 +37,10 @@ var (
errBrokeFilter = errors.New("event not matching watch filter")
)
func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) error {
func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest) error {
lg.Info("Validating watch")
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
replay := model.NewReplay(persistedRequests)
for _, r := range reports {
err := validateFilter(lg, r)
if err != nil {
@ -61,23 +62,21 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
if err != nil {
return err
}
if eventHistory != nil {
err = validateResumable(lg, eventHistory, r)
if err != nil {
return err
}
err = validateReliable(lg, eventHistory, r)
if err != nil {
return err
}
err = validatePrevKV(lg, r, eventHistory)
if err != nil {
return err
}
err = validateEventIsCreate(lg, r, eventHistory)
if err != nil {
return err
}
err = validateResumable(lg, replay, r)
if err != nil {
return err
}
err = validateReliable(lg, replay, r)
if err != nil {
return err
}
err = validatePrevKV(lg, replay, r)
if err != nil {
return err
}
err = validateIsCreate(lg, replay, r)
if err != nil {
return err
}
}
return nil
@ -182,10 +181,11 @@ func validateAtomic(lg *zap.Logger, report report.ClientReport) (err error) {
return err
}
func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) {
func validateReliable(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) {
for _, watch := range report.Watch {
firstRev := firstExpectedRevision(watch)
lastRev := lastRevision(watch)
events := replay.EventsForWatch(watch.Request)
wantEvents := []model.PersistedEvent{}
if firstRev != 0 {
for _, e := range events {
@ -214,22 +214,23 @@ func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report repo
return err
}
func validateResumable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) {
for _, op := range report.Watch {
if op.Request.Revision == 0 {
func validateResumable(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) {
for _, watch := range report.Watch {
if watch.Request.Revision == 0 {
continue
}
events := replay.EventsForWatch(watch.Request)
index := 0
for index < len(events) && (events[index].Revision < op.Request.Revision || !events[index].Match(op.Request)) {
for index < len(events) && (events[index].Revision < watch.Request.Revision || !events[index].Match(watch.Request)) {
index++
}
if index == len(events) {
continue
}
firstEvent := firstWatchEvent(op)
firstEvent := firstWatchEvent(watch)
// If watch is resumable, first event it gets should the first event that happened after the requested revision.
if firstEvent != nil && events[index] != firstEvent.PersistedEvent {
lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", op.Request), zap.Any("got-event", *firstEvent), zap.Any("want-event", events[index]))
lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("got-event", *firstEvent), zap.Any("want-event", events[index]))
err = errBrokeResumable
}
}
@ -238,8 +239,7 @@ func validateResumable(lg *zap.Logger, events []model.PersistedEvent, report rep
// validatePrevKV ensures that a watch response (if configured with WithPrevKV()) returns
// the appropriate response.
func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model.PersistedEvent) (err error) {
replay := model.NewReplay(history)
func validatePrevKV(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) {
for _, op := range report.Watch {
if !op.Request.WithPrevKV {
continue
@ -247,7 +247,10 @@ func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model.
for _, resp := range op.Responses {
for _, event := range resp.Events {
// Get state state just before the current event.
state, _ := replay.StateForRevision(event.Revision - 1)
state, err2 := replay.StateForRevision(event.Revision - 1)
if err2 != nil {
panic(err2)
}
// TODO(MadhavJivrajani): check if compaction has been run as part
// of failpoint injection. If compaction has run, prevKV can be nil
// even if it is not a create event.
@ -271,13 +274,15 @@ func validatePrevKV(lg *zap.Logger, report report.ClientReport, history []model.
return err
}
func validateEventIsCreate(lg *zap.Logger, report report.ClientReport, history []model.PersistedEvent) (err error) {
replay := model.NewReplay(history)
func validateIsCreate(lg *zap.Logger, replay *model.EtcdReplay, report report.ClientReport) (err error) {
for _, op := range report.Watch {
for _, resp := range op.Responses {
for _, event := range resp.Events {
// Get state state just before the current event.
state, _ := replay.StateForRevision(event.Revision - 1)
state, err2 := replay.StateForRevision(event.Revision - 1)
if err2 != nil {
panic(err2)
}
// A create event will not have an entry in our history and a non-create
// event *should* have an entry in our history.
if _, prevKeyExists := state.KeyValues[event.Key]; event.IsCreate == prevKeyExists {