tests/robustness: make merging histories work on []PersistedEvent

Event histories after merging serve as an authorotative list of
events that can be seen as ones persisted by etcd, we don't need
PrevValue as part of it.

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
This commit is contained in:
Madhav Jivrajani 2024-02-14 15:44:08 +05:30
parent 83817ac786
commit 4fa07a1c8a
4 changed files with 17 additions and 31 deletions

View File

@ -19,7 +19,7 @@ import (
"strings" "strings"
) )
func NewReplay(eventHistory []WatchEvent) *EtcdReplay { func NewReplay(eventHistory []PersistedEvent) *EtcdReplay {
var lastEventRevision int64 = 1 var lastEventRevision int64 = 1
for _, event := range eventHistory { for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
@ -33,7 +33,7 @@ func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
} }
type EtcdReplay struct { type EtcdReplay struct {
eventHistory []WatchEvent eventHistory []PersistedEvent
// Cached state and event index used for it's calculation // Cached state and event index used for it's calculation
cachedState *EtcdState cachedState *EtcdState

View File

@ -52,7 +52,7 @@ func validateLinearizableOperationsAndVisualize(lg *zap.Logger, operations []por
} }
} }
func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) { func validateSerializableOperations(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, totalEventHistory []model.PersistedEvent) {
lg.Info("Validating serializable operations") lg.Info("Validating serializable operations")
staleReads := filterSerializableReads(operations) staleReads := filterSerializableReads(operations)
if len(staleReads) == 0 { if len(staleReads) == 0 {

View File

@ -51,27 +51,27 @@ type Config struct {
ExpectRevisionUnique bool ExpectRevisionUnique bool
} }
func mergeWatchEventHistory(reports []report.ClientReport) ([]model.WatchEvent, error) { func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) {
type revisionEvents struct { type revisionEvents struct {
events []model.WatchEvent events []model.PersistedEvent
revision int64 revision int64
clientId int clientId int
} }
revisionToEvents := map[int64]revisionEvents{} revisionToEvents := map[int64]revisionEvents{}
var lastClientId = 0 var lastClientId = 0
var lastRevision int64 var lastRevision int64
events := []model.WatchEvent{} events := []model.PersistedEvent{}
for _, r := range reports { for _, r := range reports {
for _, op := range r.Watch { for _, op := range r.Watch {
for _, resp := range op.Responses { for _, resp := range op.Responses {
for _, event := range resp.Events { for _, event := range resp.Events {
if event.Revision == lastRevision && lastClientId == r.ClientId { if event.Revision == lastRevision && lastClientId == r.ClientId {
events = append(events, event) events = append(events, event.PersistedEvent)
} else { } else {
if prev, found := revisionToEvents[lastRevision]; found { if prev, found := revisionToEvents[lastRevision]; found {
// This assumes that there are txn that would be observed differently by two watches. // This assumes that there are txn that would be observed differently by two watches.
// TODO: Implement merging events from multiple watches about single revision based on operations. // TODO: Implement merging events from multiple watches about single revision based on operations.
if diff := cmp.Diff(prev.events, events, cmp.Comparer(compareWatchEvents)); diff != "" { if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff) return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
} }
} else { } else {
@ -79,14 +79,14 @@ func mergeWatchEventHistory(reports []report.ClientReport) ([]model.WatchEvent,
} }
lastClientId = r.ClientId lastClientId = r.ClientId
lastRevision = event.Revision lastRevision = event.Revision
events = []model.WatchEvent{event} events = []model.PersistedEvent{event.PersistedEvent}
} }
} }
} }
} }
} }
if prev, found := revisionToEvents[lastRevision]; found { if prev, found := revisionToEvents[lastRevision]; found {
if diff := cmp.Diff(prev.events, events, cmp.Comparer(compareWatchEvents)); diff != "" { if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff) return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
} }
} else { } else {
@ -100,23 +100,9 @@ func mergeWatchEventHistory(reports []report.ClientReport) ([]model.WatchEvent,
sort.Slice(allRevisionEvents, func(i, j int) bool { sort.Slice(allRevisionEvents, func(i, j int) bool {
return allRevisionEvents[i].revision < allRevisionEvents[j].revision return allRevisionEvents[i].revision < allRevisionEvents[j].revision
}) })
var eventHistory []model.WatchEvent var eventHistory []model.PersistedEvent
for _, revEvents := range allRevisionEvents { for _, revEvents := range allRevisionEvents {
eventHistory = append(eventHistory, revEvents.events...) eventHistory = append(eventHistory, revEvents.events...)
} }
return eventHistory, nil return eventHistory, nil
} }
func compareWatchEvents(x, y []model.WatchEvent) bool {
if len(x) != len(y) {
return false
}
for i := 0; i < len(x); i++ {
if x[i].PersistedEvent != y[i].PersistedEvent {
return false
}
}
return true
}

View File

@ -23,7 +23,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/report" "go.etcd.io/etcd/tests/v3/robustness/report"
) )
func validateWatch(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.WatchEvent) { func validateWatch(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) {
lg.Info("Validating watch") lg.Info("Validating watch")
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports { for _, r := range reports {
@ -106,7 +106,7 @@ func validateAtomic(t *testing.T, report report.ClientReport) {
} }
} }
func validateReliable(t *testing.T, events []model.WatchEvent, report report.ClientReport) { func validateReliable(t *testing.T, events []model.PersistedEvent, report report.ClientReport) {
for _, op := range report.Watch { for _, op := range report.Watch {
index := 0 index := 0
revision := firstRevision(op) revision := firstRevision(op)
@ -118,7 +118,7 @@ func validateReliable(t *testing.T, events []model.WatchEvent, report report.Cli
} }
for _, resp := range op.Responses { for _, resp := range op.Responses {
for _, event := range resp.Events { for _, event := range resp.Events {
if events[index].Match(op.Request) && events[index].PersistedEvent != event.PersistedEvent { if events[index].Match(op.Request) && events[index] != event.PersistedEvent {
t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, event missing: %+v, got: %+v", events[index], event) t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, event missing: %+v, got: %+v", events[index], event)
} }
index++ index++
@ -127,7 +127,7 @@ func validateReliable(t *testing.T, events []model.WatchEvent, report report.Cli
} }
} }
func validateResumable(t *testing.T, events []model.WatchEvent, report report.ClientReport) { func validateResumable(t *testing.T, events []model.PersistedEvent, report report.ClientReport) {
for _, op := range report.Watch { for _, op := range report.Watch {
index := 0 index := 0
revision := op.Request.Revision revision := op.Request.Revision
@ -139,7 +139,7 @@ func validateResumable(t *testing.T, events []model.WatchEvent, report report.Cl
} }
firstEvent := firstWatchEvent(op) firstEvent := firstWatchEvent(op)
// If watch is resumable, first event it gets should the first event that happened after the requested revision. // If watch is resumable, first event it gets should the first event that happened after the requested revision.
if firstEvent != nil && events[index].PersistedEvent != firstEvent.PersistedEvent { if firstEvent != nil && events[index] != firstEvent.PersistedEvent {
t.Errorf("Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window, watch request: %+v, event missing: %+v, got: %+v", op.Request, events[index], *firstEvent) t.Errorf("Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window, watch request: %+v, event missing: %+v, got: %+v", op.Request, events[index], *firstEvent)
} }
} }
@ -147,7 +147,7 @@ func validateResumable(t *testing.T, events []model.WatchEvent, report report.Cl
// validatePrevKV ensures that a watch response (if configured with WithPrevKV()) returns // validatePrevKV ensures that a watch response (if configured with WithPrevKV()) returns
// the appropriate response. // the appropriate response.
func validatePrevKV(t *testing.T, report report.ClientReport, history []model.WatchEvent) { func validatePrevKV(t *testing.T, report report.ClientReport, history []model.PersistedEvent) {
replay := model.NewReplay(history) replay := model.NewReplay(history)
for _, op := range report.Watch { for _, op := range report.Watch {
if !op.Request.WithPrevKV { if !op.Request.WithPrevKV {