mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17810 from serathius/robustness-revisions-between-progress
Validate revisions between progress notify
This commit is contained in:
commit
a097a3b39d
@ -412,7 +412,40 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
expectError: errBrokeAtomic.Error(),
|
expectError: errBrokeAtomic.Error(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Resumable, Reliable, Bookmarkable - all events with bookmark - pass",
|
name: "Resumable, Reliable, Bookmarkable - all events with watch revision and bookmark - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Revision: 2,
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Events: []model.WatchEvent{
|
||||||
|
putWatchEvent("a", "1", 2, true),
|
||||||
|
putWatchEvent("b", "2", 3, true),
|
||||||
|
putWatchEvent("c", "3", 4, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 4,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
putPersistedEvent("b", "2", 3, true),
|
||||||
|
putPersistedEvent("c", "3", 4, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Resumable, Reliable, Bookmarkable - all events with only bookmarks - pass",
|
||||||
reports: []report.ClientReport{
|
reports: []report.ClientReport{
|
||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
@ -421,6 +454,10 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Revision: 1,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
putWatchEvent("a", "1", 2, true),
|
putWatchEvent("a", "1", 2, true),
|
||||||
@ -493,7 +530,112 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("b", "2", 3, true),
|
putPersistedEvent("b", "2", 3, true),
|
||||||
putPersistedEvent("c", "3", 4, true),
|
putPersistedEvent("c", "3", 4, true),
|
||||||
},
|
},
|
||||||
expectError: errBrokeBookmarkable.Error(),
|
expectError: errBrokeReliable.Error(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Resumable, Reliable, Bookmarkable - unmatched events with watch revision - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "d",
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Revision: 2,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 3,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 3,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 4,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
putPersistedEvent("b", "2", 3, true),
|
||||||
|
putPersistedEvent("c", "3", 4, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Resumable, Reliable, Bookmarkable - empty events between progress notifies - fail",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Revision: 1,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 4,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
putPersistedEvent("b", "2", 3, true),
|
||||||
|
putPersistedEvent("c", "3", 4, true),
|
||||||
|
},
|
||||||
|
expectError: errBrokeReliable.Error(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Resumable, Reliable, Bookmarkable - unmatched events between progress notifies - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "d",
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Revision: 2,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 3,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 3,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Revision: 4,
|
||||||
|
IsProgressNotify: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
putPersistedEvent("b", "2", 3, true),
|
||||||
|
putPersistedEvent("c", "3", 4, true),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Bookmarkable - revision non decreasing - pass",
|
name: "Bookmarkable - revision non decreasing - pass",
|
||||||
@ -622,7 +764,82 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
expectError: errBrokeBookmarkable.Error(),
|
expectError: errBrokeBookmarkable.Error(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Bookmarkable - missing event before bookmark - fail",
|
name: "Bookmarkable - progress precedes other progress - fail",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{},
|
||||||
|
expectError: errBrokeBookmarkable.Error(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Bookmarkable - progress notification lower than watch request - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Revision: 3,
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Bookmarkable - empty event history - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - missing event before bookmark - fail",
|
||||||
reports: []report.ClientReport{
|
reports: []report.ClientReport{
|
||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
@ -651,10 +868,10 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("b", "2", 3, true),
|
putPersistedEvent("b", "2", 3, true),
|
||||||
putPersistedEvent("c", "3", 4, true),
|
putPersistedEvent("c", "3", 4, true),
|
||||||
},
|
},
|
||||||
expectError: errBrokeBookmarkable.Error(),
|
expectError: errBrokeReliable.Error(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Bookmarkable - missing event matching watch before bookmark - fail",
|
name: "Reliable - missing event matching watch before bookmark - fail",
|
||||||
reports: []report.ClientReport{
|
reports: []report.ClientReport{
|
||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
@ -682,10 +899,10 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("a", "2", 3, false),
|
putPersistedEvent("a", "2", 3, false),
|
||||||
putPersistedEvent("c", "3", 4, true),
|
putPersistedEvent("c", "3", 4, true),
|
||||||
},
|
},
|
||||||
expectError: errBrokeBookmarkable.Error(),
|
expectError: errBrokeReliable.Error(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Bookmarkable - missing event matching watch with prefix before bookmark - fail",
|
name: "Reliable - missing event matching watch with prefix before bookmark - fail",
|
||||||
reports: []report.ClientReport{
|
reports: []report.ClientReport{
|
||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
@ -714,7 +931,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("ab", "2", 3, true),
|
putPersistedEvent("ab", "2", 3, true),
|
||||||
putPersistedEvent("cc", "3", 4, true),
|
putPersistedEvent("cc", "3", 4, true),
|
||||||
},
|
},
|
||||||
expectError: errBrokeBookmarkable.Error(),
|
expectError: errBrokeReliable.Error(),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Reliable - all events history - pass",
|
name: "Reliable - all events history - pass",
|
||||||
@ -745,6 +962,135 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectError: "",
|
expectError: "",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - single revision - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Events: []model.WatchEvent{
|
||||||
|
putWatchEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
expectError: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - single revision with watch revision - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Events: []model.WatchEvent{
|
||||||
|
putWatchEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
expectError: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - missing single revision with watch revision - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Events: []model.WatchEvent{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
expectError: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - single revision with progress notify - pass",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
Events: []model.WatchEvent{
|
||||||
|
putWatchEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
expectError: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Reliable - single revision missing with progress notify - fail",
|
||||||
|
reports: []report.ClientReport{
|
||||||
|
{
|
||||||
|
Watch: []model.WatchOperation{
|
||||||
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
Responses: []model.WatchResponse{
|
||||||
|
{
|
||||||
|
IsProgressNotify: true,
|
||||||
|
Revision: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
eventHistory: []model.PersistedEvent{
|
||||||
|
putPersistedEvent("a", "1", 2, true),
|
||||||
|
},
|
||||||
|
expectError: errBrokeReliable.Error(),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Reliable - missing middle event - fail",
|
name: "Reliable - missing middle event - fail",
|
||||||
reports: []report.ClientReport{
|
reports: []report.ClientReport{
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||||
@ -56,8 +57,12 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
err = validateBookmarkable(lg, r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if eventHistory != nil {
|
if eventHistory != nil {
|
||||||
err = validateBookmarkable(lg, eventHistory, r)
|
err = validateResumable(lg, eventHistory, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -65,10 +70,6 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = validateResumable(lg, eventHistory, r)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = validatePrevKV(lg, r, eventHistory)
|
err = validatePrevKV(lg, r, eventHistory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -96,36 +97,26 @@ func validateFilter(lg *zap.Logger, report report.ClientReport) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateBookmarkable(lg *zap.Logger, eventHistory []model.PersistedEvent, report report.ClientReport) (err error) {
|
func validateBookmarkable(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||||
for _, op := range report.Watch {
|
for _, op := range report.Watch {
|
||||||
var lastProgressNotifyRevision int64
|
var lastProgressNotifyRevision int64
|
||||||
var gotEventBeforeProgressNotify *model.PersistedEvent
|
var lastEventRevision int64
|
||||||
for _, resp := range op.Responses {
|
for _, resp := range op.Responses {
|
||||||
for _, event := range resp.Events {
|
for _, event := range resp.Events {
|
||||||
if event.Revision <= lastProgressNotifyRevision {
|
if event.Revision <= lastProgressNotifyRevision {
|
||||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", event.Revision))
|
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", event.Revision))
|
||||||
err = errBrokeBookmarkable
|
err = errBrokeBookmarkable
|
||||||
}
|
}
|
||||||
gotEventBeforeProgressNotify = &event.PersistedEvent
|
lastEventRevision = event.Revision
|
||||||
}
|
}
|
||||||
if resp.IsProgressNotify {
|
if resp.IsProgressNotify {
|
||||||
if gotEventBeforeProgressNotify != nil || op.Request.Revision != 0 {
|
if resp.Revision < lastProgressNotifyRevision {
|
||||||
var wantEventBeforeProgressNotify *model.PersistedEvent
|
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", resp.Revision))
|
||||||
for _, ev := range eventHistory {
|
err = errBrokeBookmarkable
|
||||||
if ev.Revision < op.Request.Revision {
|
}
|
||||||
continue
|
if resp.Revision < lastEventRevision {
|
||||||
}
|
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.Int64("revision", resp.Revision))
|
||||||
if ev.Revision > resp.Revision {
|
err = errBrokeBookmarkable
|
||||||
break
|
|
||||||
}
|
|
||||||
if ev.Match(op.Request) {
|
|
||||||
wantEventBeforeProgressNotify = &ev
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if diff := cmp.Diff(wantEventBeforeProgressNotify, gotEventBeforeProgressNotify); diff != "" {
|
|
||||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "bookmarkable"), zap.Int("client", report.ClientID), zap.String("diff", diff))
|
|
||||||
err = errBrokeBookmarkable
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
lastProgressNotifyRevision = resp.Revision
|
lastProgressNotifyRevision = resp.Revision
|
||||||
}
|
}
|
||||||
@ -192,24 +183,33 @@ func validateAtomic(lg *zap.Logger, report report.ClientReport) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) {
|
func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report report.ClientReport) (err error) {
|
||||||
for _, op := range report.Watch {
|
for _, watch := range report.Watch {
|
||||||
index := 0
|
firstRev := firstExpectedRevision(watch)
|
||||||
revision := firstRevision(op)
|
lastRev := lastRevision(watch)
|
||||||
for index < len(events) && events[index].Revision < revision {
|
wantEvents := []model.PersistedEvent{}
|
||||||
index++
|
if firstRev != 0 {
|
||||||
}
|
for _, e := range events {
|
||||||
if index == len(events) {
|
if e.Revision < firstRev {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, resp := range op.Responses {
|
if e.Revision > lastRev {
|
||||||
for _, event := range resp.Events {
|
break
|
||||||
if events[index].Match(op.Request) && (events[index].Event != event.PersistedEvent.Event || events[index].Revision != event.PersistedEvent.Revision) {
|
}
|
||||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.Any("missing-event", events[index]))
|
if e.Match(watch.Request) {
|
||||||
err = errBrokeReliable
|
wantEvents = append(wantEvents, e)
|
||||||
}
|
}
|
||||||
index++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
gotEvents := make([]model.PersistedEvent, 0)
|
||||||
|
for _, resp := range watch.Responses {
|
||||||
|
for _, event := range resp.Events {
|
||||||
|
gotEvents = append(gotEvents, event.PersistedEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(wantEvents, gotEvents, cmpopts.IgnoreFields(model.PersistedEvent{}, "IsCreate")); diff != "" {
|
||||||
|
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.String("diff", diff))
|
||||||
|
err = errBrokeReliable
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -290,10 +290,31 @@ func validateEventIsCreate(lg *zap.Logger, report report.ClientReport, history [
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstRevision(op model.WatchOperation) int64 {
|
func firstExpectedRevision(op model.WatchOperation) int64 {
|
||||||
for _, resp := range op.Responses {
|
if op.Request.Revision != 0 {
|
||||||
for _, event := range resp.Events {
|
return op.Request.Revision
|
||||||
return event.Revision
|
}
|
||||||
|
if len(op.Responses) > 0 {
|
||||||
|
firstResp := op.Responses[0]
|
||||||
|
if firstResp.IsProgressNotify {
|
||||||
|
return firstResp.Revision + 1
|
||||||
|
}
|
||||||
|
if len(firstResp.Events) > 0 {
|
||||||
|
return firstResp.Events[0].Revision
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func lastRevision(op model.WatchOperation) int64 {
|
||||||
|
if len(op.Responses) > 0 {
|
||||||
|
lastResp := op.Responses[len(op.Responses)-1]
|
||||||
|
if lastResp.IsProgressNotify {
|
||||||
|
return lastResp.Revision
|
||||||
|
}
|
||||||
|
if len(lastResp.Events) > 0 {
|
||||||
|
lastEvent := lastResp.Events[len(lastResp.Events)-1]
|
||||||
|
return lastEvent.Revision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user