mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests/robustness: Add resumable validation and improve reliable validation
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
3ea0cb8d9c
commit
09c462e2ea
@ -16,6 +16,7 @@ package model
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
|
||||
@ -106,3 +107,18 @@ type Event struct {
|
||||
Key string
|
||||
Value ValueOrHash
|
||||
}
|
||||
|
||||
func (e Event) Match(request WatchRequest) bool {
|
||||
if request.WithPrefix {
|
||||
return strings.HasPrefix(e.Key, request.Key)
|
||||
} else {
|
||||
return e.Key == request.Key
|
||||
}
|
||||
}
|
||||
|
||||
type WatchRequest struct {
|
||||
Key string
|
||||
Revision int64
|
||||
WithPrefix bool
|
||||
WithProgressNotify bool
|
||||
}
|
||||
|
@ -46,17 +46,10 @@ type RecordingClient struct {
|
||||
}
|
||||
|
||||
type WatchOperation struct {
|
||||
Request WatchRequest
|
||||
Request model.WatchRequest
|
||||
Responses []WatchResponse
|
||||
}
|
||||
|
||||
type WatchRequest struct {
|
||||
Key string
|
||||
Revision int64
|
||||
WithPrefix bool
|
||||
WithProgressNotify bool
|
||||
}
|
||||
|
||||
type WatchResponse struct {
|
||||
Events []model.WatchEvent
|
||||
IsProgressNotify bool
|
||||
@ -228,7 +221,7 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan {
|
||||
request := WatchRequest{
|
||||
request := model.WatchRequest{
|
||||
Key: key,
|
||||
Revision: rev,
|
||||
WithPrefix: withPrefix,
|
||||
@ -238,7 +231,7 @@ func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, with
|
||||
|
||||
}
|
||||
|
||||
func (c *RecordingClient) watch(ctx context.Context, request WatchRequest) clientv3.WatchChan {
|
||||
func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest) clientv3.WatchChan {
|
||||
ops := []clientv3.OpOption{}
|
||||
if request.WithPrefix {
|
||||
ops = append(ops, clientv3.WithPrefix())
|
||||
|
@ -45,8 +45,6 @@ func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []p
|
||||
t.Fatalf("Unknown Linearization")
|
||||
}
|
||||
lg.Info("Validating serializable operations")
|
||||
// TODO: Use linearization result instead of event history to get order of events
|
||||
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
|
||||
validateSerializableOperations(t, operations, eventHistory)
|
||||
return visualize
|
||||
}
|
||||
|
@ -30,12 +30,15 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []m
|
||||
validateOrdered(t, r)
|
||||
validateUnique(t, cfg.ExpectRevisionUnique, r)
|
||||
validateAtomic(t, r)
|
||||
// TODO: Validate Resumable
|
||||
validateBookmarkable(t, r)
|
||||
}
|
||||
// TODO: Use linearization result instead of event history to get order of events
|
||||
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
|
||||
eventHistory := mergeWatchEventHistory(t, reports)
|
||||
// TODO: Validate that each watch report is reliable, not only the longest one.
|
||||
validateReliable(t, eventHistory)
|
||||
for _, r := range reports {
|
||||
validateReliable(t, eventHistory, r)
|
||||
validateResumable(t, eventHistory, r)
|
||||
}
|
||||
return eventHistory
|
||||
}
|
||||
|
||||
@ -107,16 +110,63 @@ func validateAtomic(t *testing.T, report traffic.ClientReport) {
|
||||
}
|
||||
}
|
||||
|
||||
func validateReliable(t *testing.T, events []model.WatchEvent) {
|
||||
var lastEventRevision int64 = 1
|
||||
for _, event := range events {
|
||||
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
|
||||
t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision)
|
||||
func validateReliable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) {
|
||||
for _, op := range report.Watch {
|
||||
index := 0
|
||||
revision := firstRevision(op)
|
||||
for index < len(events) && events[index].Revision < revision {
|
||||
index++
|
||||
}
|
||||
if index == len(events) {
|
||||
continue
|
||||
}
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
if events[index].Match(op.Request) && events[index] != event {
|
||||
t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, event missing: %+v", events[index])
|
||||
}
|
||||
index++
|
||||
}
|
||||
}
|
||||
lastEventRevision = event.Revision
|
||||
}
|
||||
}
|
||||
|
||||
func validateResumable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) {
|
||||
for _, op := range report.Watch {
|
||||
index := 0
|
||||
revision := op.Request.Revision
|
||||
for index < len(events) && (events[index].Revision < revision || !events[index].Match(op.Request)) {
|
||||
index++
|
||||
}
|
||||
if index == len(events) {
|
||||
continue
|
||||
}
|
||||
firstEvent := firstWatchEvent(op)
|
||||
// If watch is resumable, first event it gets should the first event that happened after the requested revision.
|
||||
if firstEvent != nil && events[index] != *firstEvent {
|
||||
t.Errorf("Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window, watch request: %+v, event missing: %+v", op.Request, events[index])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func firstRevision(op traffic.WatchOperation) int64 {
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
return event.Revision
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func firstWatchEvent(op traffic.WatchOperation) *model.WatchEvent {
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
return &event
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []model.WatchEvent {
|
||||
type revisionEvents struct {
|
||||
events []model.WatchEvent
|
||||
@ -135,6 +185,8 @@ func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []mode
|
||||
events = append(events, event)
|
||||
} else {
|
||||
if prev, found := revisionToEvents[lastRevision]; found {
|
||||
// This assumes that there are txn that would be observed differently by two watches.
|
||||
// TODO: Implement merging events from multiple watches about single revision based on operations.
|
||||
if diff := cmp.Diff(prev.events, events); diff != "" {
|
||||
t.Errorf("Events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user