mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Validate delivery of events between progress notifies
Simplifying bookmarkable to just validate revision order between events and progress notifies. Use reliable to validate if events are missing, but still report broken resumable if first event after revision is missing. It's easier to have one place that validates event slices. Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
2ff45f8fc6
commit
964680c8d0
@ -412,7 +412,40 @@ func TestValidateWatch(t *testing.T) {
|
||||
expectError: errBrokeAtomic.Error(),
|
||||
},
|
||||
{
|
||||
name: "Resumable, Reliable, Bookmarkable - all events with bookmark - pass",
|
||||
name: "Resumable, Reliable, Bookmarkable - all events with watch revision and bookmark - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Revision: 2,
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
putWatchEvent("a", "1", 2, true),
|
||||
putWatchEvent("b", "2", 3, true),
|
||||
putWatchEvent("c", "3", 4, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
Revision: 4,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Resumable, Reliable, Bookmarkable - all events with only bookmarks - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
@ -421,6 +454,10 @@ func TestValidateWatch(t *testing.T) {
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Revision: 1,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
putWatchEvent("a", "1", 2, true),
|
||||
@ -493,7 +530,112 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Resumable, Reliable, Bookmarkable - unmatched events with watch revision - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "d",
|
||||
Revision: 2,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Revision: 2,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 3,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 3,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 4,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Resumable, Reliable, Bookmarkable - empty events between progress notifies - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Revision: 1,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 4,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Resumable, Reliable, Bookmarkable - unmatched events between progress notifies - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "d",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Revision: 2,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 3,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 3,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
{
|
||||
Revision: 4,
|
||||
IsProgressNotify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - revision non decreasing - pass",
|
||||
@ -622,7 +764,82 @@ func TestValidateWatch(t *testing.T) {
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - missing event before bookmark - fail",
|
||||
name: "Bookmarkable - progress precedes other progress - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 2,
|
||||
},
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{},
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - progress notification lower than watch request - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Revision: 3,
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - empty event history - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 1,
|
||||
},
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{},
|
||||
},
|
||||
{
|
||||
name: "Reliable - missing event before bookmark - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
@ -651,10 +868,10 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - missing event matching watch before bookmark - fail",
|
||||
name: "Reliable - missing event matching watch before bookmark - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
@ -682,10 +899,10 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("a", "2", 3, false),
|
||||
putPersistedEvent("c", "3", 4, true),
|
||||
},
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Bookmarkable - missing event matching watch with prefix before bookmark - fail",
|
||||
name: "Reliable - missing event matching watch with prefix before bookmark - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
@ -714,7 +931,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("ab", "2", 3, true),
|
||||
putPersistedEvent("cc", "3", 4, true),
|
||||
},
|
||||
expectError: errBrokeBookmarkable.Error(),
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Reliable - all events history - pass",
|
||||
@ -745,6 +962,135 @@ func TestValidateWatch(t *testing.T) {
|
||||
},
|
||||
expectError: "",
|
||||
},
|
||||
{
|
||||
name: "Reliable - single revision - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
putWatchEvent("a", "1", 2, true),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
expectError: "",
|
||||
},
|
||||
{
|
||||
name: "Reliable - single revision with watch revision - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
Revision: 2,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
putWatchEvent("a", "1", 2, true),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
expectError: "",
|
||||
},
|
||||
{
|
||||
name: "Reliable - missing single revision with watch revision - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
Revision: 2,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
expectError: "",
|
||||
},
|
||||
{
|
||||
name: "Reliable - single revision with progress notify - pass",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
putWatchEvent("a", "1", 2, true),
|
||||
},
|
||||
},
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
expectError: "",
|
||||
},
|
||||
{
|
||||
name: "Reliable - single revision missing with progress notify - fail",
|
||||
reports: []report.ClientReport{
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
Revision: 2,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
IsProgressNotify: true,
|
||||
Revision: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
eventHistory: []model.PersistedEvent{
|
||||
putPersistedEvent("a", "1", 2, true),
|
||||
},
|
||||
expectError: errBrokeReliable.Error(),
|
||||
},
|
||||
{
|
||||
name: "Reliable - missing middle event - fail",
|
||||
reports: []report.ClientReport{
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||
@ -56,8 +57,12 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateBookmarkable(lg, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if eventHistory != nil {
|
||||
err = validateBookmarkable(lg, eventHistory, r)
|
||||
err = validateResumable(lg, eventHistory, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -65,10 +70,6 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateResumable(lg, eventHistory, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validatePrevKV(lg, r, eventHistory)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -96,36 +97,26 @@ func validateFilter(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func validateBookmarkable(lg *zap.Logger, eventHistory []model.PersistedEvent, report report.ClientReport) (err error) {
|
||||
func validateBookmarkable(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||
for _, op := range report.Watch {
|
||||
var lastProgressNotifyRevision int64
|
||||
var gotEventBeforeProgressNotify *model.PersistedEvent
|
||||
var lastEventRevision int64
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
if event.Revision <= lastProgressNotifyRevision {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", event.Revision))
|
||||
err = errBrokeBookmarkable
|
||||
}
|
||||
gotEventBeforeProgressNotify = &event.PersistedEvent
|
||||
lastEventRevision = event.Revision
|
||||
}
|
||||
if resp.IsProgressNotify {
|
||||
if gotEventBeforeProgressNotify != nil || op.Request.Revision != 0 {
|
||||
var wantEventBeforeProgressNotify *model.PersistedEvent
|
||||
for _, ev := range eventHistory {
|
||||
if ev.Revision < op.Request.Revision {
|
||||
continue
|
||||
}
|
||||
if ev.Revision > resp.Revision {
|
||||
break
|
||||
}
|
||||
if ev.Match(op.Request) {
|
||||
wantEventBeforeProgressNotify = &ev
|
||||
}
|
||||
}
|
||||
if diff := cmp.Diff(wantEventBeforeProgressNotify, gotEventBeforeProgressNotify); diff != "" {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.String("diff", diff))
|
||||
err = errBrokeBookmarkable
|
||||
}
|
||||
if resp.Revision < lastProgressNotifyRevision {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", resp.Revision))
|
||||
err = errBrokeBookmarkable
|
||||
}
|
||||
if resp.Revision < lastEventRevision {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", resp.Revision))
|
||||
err = errBrokeBookmarkable
|
||||
}
|
||||
lastProgressNotifyRevision = resp.Revision
|
||||
}
|
||||
@ -192,24 +183,33 @@ func validateAtomic(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||
}
|
||||
|
||||
func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) {
|
||||
for _, op := range report.Watch {
|
||||
index := 0
|
||||
revision := firstRevision(op)
|
||||
for index < len(events) && events[index].Revision < revision {
|
||||
index++
|
||||
}
|
||||
if index == len(events) {
|
||||
continue
|
||||
}
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
if events[index].Match(op.Request) && (events[index].Event != event.PersistedEvent.Event || events[index].Revision != event.PersistedEvent.Revision) {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.Any("missing-event", events[index]))
|
||||
err = errBrokeReliable
|
||||
for _, watch := range report.Watch {
|
||||
firstRev := firstExpectedRevision(watch)
|
||||
lastRev := lastRevision(watch)
|
||||
wantEvents := []model.PersistedEvent{}
|
||||
if firstRev != 0 {
|
||||
for _, e := range events {
|
||||
if e.Revision < firstRev {
|
||||
continue
|
||||
}
|
||||
if e.Revision > lastRev {
|
||||
break
|
||||
}
|
||||
if e.Match(watch.Request) {
|
||||
wantEvents = append(wantEvents, e)
|
||||
}
|
||||
index++
|
||||
}
|
||||
}
|
||||
gotEvents := make([]model.PersistedEvent, 0)
|
||||
for _, resp := range watch.Responses {
|
||||
for _, event := range resp.Events {
|
||||
gotEvents = append(gotEvents, event.PersistedEvent)
|
||||
}
|
||||
}
|
||||
if diff := cmp.Diff(wantEvents, gotEvents, cmpopts.IgnoreFields(model.PersistedEvent{}, "IsCreate")); diff != "" {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.String("diff", diff))
|
||||
err = errBrokeReliable
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -288,10 +288,31 @@ func validateEventIsCreate(lg *zap.Logger, report report.ClientReport, history [
|
||||
return err
|
||||
}
|
||||
|
||||
func firstRevision(op model.WatchOperation) int64 {
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
return event.Revision
|
||||
func firstExpectedRevision(op model.WatchOperation) int64 {
|
||||
if op.Request.Revision != 0 {
|
||||
return op.Request.Revision
|
||||
}
|
||||
if len(op.Responses) > 0 {
|
||||
firstResp := op.Responses[0]
|
||||
if firstResp.IsProgressNotify {
|
||||
return firstResp.Revision + 1
|
||||
}
|
||||
if len(firstResp.Events) > 0 {
|
||||
return firstResp.Events[0].Revision
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func lastRevision(op model.WatchOperation) int64 {
|
||||
if len(op.Responses) > 0 {
|
||||
lastResp := op.Responses[len(op.Responses)-1]
|
||||
if lastResp.IsProgressNotify {
|
||||
return lastResp.Revision
|
||||
}
|
||||
if len(lastResp.Events) > 0 {
|
||||
lastEvent := lastResp.Events[len(lastResp.Events)-1]
|
||||
return lastEvent.Revision
|
||||
}
|
||||
}
|
||||
return 0
|
||||
|
Loading…
x
Reference in New Issue
Block a user