Merge pull request #15293 from serathius/linearizability-bookmark

Codify kubernetes using response header revision as bookmark
This commit is contained in:
Marek Siarkowicz 2023-02-15 11:36:03 +01:00 committed by GitHub
commit 25eb025fc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -83,9 +83,10 @@ func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectPro
func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expectProgressNotify bool) {
var (
gotProgressNotify = false
lastEventModRevision int64 = 1 // The event.Kv.ModRevision in the latest event.
lastHeadRevision int64 = 1 // The resp.Header.Revision in last watch response.
gotProgressNotify = false
lastEventModRevision int64 = 1 // The event.Kv.ModRevision in the latest event.
lastHeadRevision int64 = 1 // The resp.Header.Revision in last watch response.
lastProgressNotifyRevision int64 = 0 // The resp.Header.Revision in the last progress notify watch response.
)
for _, resp := range responses {
if resp.Header.Revision < lastHeadRevision {
@ -104,6 +105,9 @@ func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expec
}
for _, event := range resp.Events {
if event.Kv.ModRevision <= lastProgressNotifyRevision {
t.Errorf("BROKE: etcd will not send progress notification to a watcher until it has synced all events. So a watcher will never receive an event with a lower `ModRevision` than the last progressNotification's revision, eventRevision: %d, progressNotifyRevision: %d", event.Kv.ModRevision, lastProgressNotifyRevision)
}
if event.Kv.ModRevision != lastEventModRevision && event.Kv.ModRevision != lastEventModRevision+1 {
t.Errorf("Event mod revision should stay the same or increment by one, last: %d, current: %d", lastEventModRevision, event.Kv.ModRevision)
}
@ -115,6 +119,9 @@ func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expec
lastEventModRevision, resp.Header.Revision)
}
if resp.IsProgressNotify() {
lastProgressNotifyRevision = resp.Header.Revision
}
lastHeadRevision = resp.Header.Revision
}
if gotProgressNotify != expectProgressNotify {