Merge pull request #17835 from serathius/robustness-watch-error

Handle watch responses with error
This commit is contained in:
Marek Siarkowicz 2024-04-22 14:44:00 +02:00 committed by GitHub
commit 7d1d4cb3a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 43 additions and 6 deletions

View File

@ -26,4 +26,5 @@ type WatchResponse struct {
IsProgressNotify bool
Revision int64
Time time.Duration
Error string
}

View File

@ -253,6 +253,10 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) model.WatchRe
}
resp.IsProgressNotify = r.IsProgressNotify()
resp.Revision = r.Header.Revision
err := r.Err()
if err != nil {
resp.Error = r.Err().Error()
}
return resp
}

View File

@ -1228,6 +1228,38 @@ func TestValidateWatch(t *testing.T) {
putPersistedEvent("c", "3", 4, true),
},
},
{
name: "Reliable - ignore empty last error response - pass",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
WithPrefix: true,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("a", "1", 2, true),
putWatchEvent("b", "2", 3, true),
putWatchEvent("c", "3", 4, true),
},
},
{
Revision: 5,
Error: "etcdserver: mvcc: required revision has been compacted",
},
},
},
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
},
},
{
name: "Resumable - watch revision from middle event - pass",
reports: []report.ClientReport{

View File

@ -307,13 +307,13 @@ func firstExpectedRevision(op model.WatchOperation) int64 {
}
func lastRevision(op model.WatchOperation) int64 {
if len(op.Responses) > 0 {
lastResp := op.Responses[len(op.Responses)-1]
if lastResp.IsProgressNotify {
return lastResp.Revision
for i := len(op.Responses) - 1; i >= 0; i-- {
resp := op.Responses[i]
if resp.IsProgressNotify {
return resp.Revision
}
if len(lastResp.Events) > 0 {
lastEvent := lastResp.Events[len(lastResp.Events)-1]
if len(resp.Events) > 0 {
lastEvent := resp.Events[len(resp.Events)-1]
return lastEvent.Revision
}
}