mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17806 from serathius/robustness-watch-filter
Add filter validation to ensure watch only includes events within selector
This commit is contained in:
commit
6f6647271f
@ -73,6 +73,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -92,6 +95,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -115,6 +121,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -135,6 +144,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -159,6 +171,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "b",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -168,6 +183,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -186,6 +204,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -206,6 +227,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -230,6 +254,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -239,6 +266,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -257,6 +287,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -276,6 +309,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -285,6 +321,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -304,6 +343,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -325,6 +367,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -345,6 +390,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -370,7 +418,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -403,7 +450,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -430,7 +476,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Revision: 2,
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -458,7 +503,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -515,7 +559,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -550,7 +593,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -588,7 +630,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -687,7 +728,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -717,7 +757,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -801,7 +840,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -831,7 +869,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -859,7 +896,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
Revision: 3,
|
||||
},
|
||||
@ -944,7 +980,6 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "",
|
||||
WithPrefix: true,
|
||||
Revision: 3,
|
||||
},
|
||||
@ -1029,6 +1064,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -1056,6 +1094,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -1084,6 +1125,9 @@ func TestValidateWatch(t *testing.T) {
|
||||
{
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
{
|
||||
Events: []model.WatchEvent{
|
||||
@ -1113,6 +1157,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1143,6 +1188,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1173,6 +1219,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1204,6 +1251,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1235,6 +1283,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
Key: "a",
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1266,6 +1315,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
Watch: []model.WatchOperation{
|
||||
{
|
||||
Request: model.WatchRequest{
|
||||
WithPrefix: true,
|
||||
WithPrevKV: true,
|
||||
},
|
||||
Responses: []model.WatchResponse{
|
||||
@ -1317,8 +1367,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("b", "2", 3, true),
|
||||
putPersistedEvent("a", "3", 4, false),
|
||||
},
|
||||
// TODO: Test should fail as there is an event not matching selector
|
||||
expectError: "",
|
||||
expectError: errBrokeFilter.Error(),
|
||||
},
|
||||
{
|
||||
name: "Filter - event not matching the watch with prefix - fail",
|
||||
@ -1348,8 +1397,7 @@ func TestValidateWatch(t *testing.T) {
|
||||
putPersistedEvent("bb", "2", 3, true),
|
||||
putPersistedEvent("ac", "3", 4, true),
|
||||
},
|
||||
// TODO: Test should fail as there is an event not matching selector
|
||||
expectError: "",
|
||||
expectError: errBrokeFilter.Error(),
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
|
@ -32,13 +32,18 @@ var (
|
||||
errBrokeResumable = errors.New("broke Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window")
|
||||
errBrokePrevKV = errors.New("incorrect event prevValue")
|
||||
errBrokeIsCreate = errors.New("incorrect event IsCreate")
|
||||
errBrokeFilter = errors.New("event not matching watch filter")
|
||||
)
|
||||
|
||||
func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) error {
|
||||
lg.Info("Validating watch")
|
||||
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
|
||||
for _, r := range reports {
|
||||
err := validateOrdered(lg, r)
|
||||
err := validateFilter(lg, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateOrdered(lg, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -76,6 +81,20 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateFilter(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||
for _, watch := range report.Watch {
|
||||
for _, resp := range watch.Responses {
|
||||
for _, event := range resp.Events {
|
||||
if !event.Match(watch.Request) {
|
||||
lg.Error("event not matching event filter", zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("event", event))
|
||||
err = errBrokeFilter
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func validateBookmarkable(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||
for _, op := range report.Watch {
|
||||
var lastProgressNotifyRevision int64
|
||||
@ -163,7 +182,7 @@ func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report repo
|
||||
}
|
||||
for _, resp := range op.Responses {
|
||||
for _, event := range resp.Events {
|
||||
if events[index].Match(op.Request) && events[index] != event.PersistedEvent {
|
||||
if events[index].Match(op.Request) && (events[index].Event != event.PersistedEvent.Event || events[index].Revision != event.PersistedEvent.Revision) {
|
||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.Any("missing-event", events[index]))
|
||||
err = errBrokeReliable
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user