Handle watch responses with error

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-04-21 20:49:46 +02:00
parent a097a3b39d
commit fa9e9504ad
4 changed files with 43 additions and 6 deletions

View File

@ -26,4 +26,5 @@ type WatchResponse struct {
IsProgressNotify bool IsProgressNotify bool
Revision int64 Revision int64
Time time.Duration Time time.Duration
Error string
} }

View File

@ -253,6 +253,10 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) model.WatchRe
} }
resp.IsProgressNotify = r.IsProgressNotify() resp.IsProgressNotify = r.IsProgressNotify()
resp.Revision = r.Header.Revision resp.Revision = r.Header.Revision
err := r.Err()
if err != nil {
resp.Error = r.Err().Error()
}
return resp return resp
} }

View File

@ -1228,6 +1228,38 @@ func TestValidateWatch(t *testing.T) {
putPersistedEvent("c", "3", 4, true), putPersistedEvent("c", "3", 4, true),
}, },
}, },
{
name: "Reliable - ignore empty last error response - pass",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
WithPrefix: true,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("a", "1", 2, true),
putWatchEvent("b", "2", 3, true),
putWatchEvent("c", "3", 4, true),
},
},
{
Revision: 5,
Error: "etcdserver: mvcc: required revision has been compacted",
},
},
},
},
},
},
eventHistory: []model.PersistedEvent{
putPersistedEvent("a", "1", 2, true),
putPersistedEvent("b", "2", 3, true),
putPersistedEvent("c", "3", 4, true),
},
},
{ {
name: "Resumable - watch revision from middle event - pass", name: "Resumable - watch revision from middle event - pass",
reports: []report.ClientReport{ reports: []report.ClientReport{

View File

@ -307,13 +307,13 @@ func firstExpectedRevision(op model.WatchOperation) int64 {
} }
func lastRevision(op model.WatchOperation) int64 { func lastRevision(op model.WatchOperation) int64 {
if len(op.Responses) > 0 { for i := len(op.Responses) - 1; i >= 0; i-- {
lastResp := op.Responses[len(op.Responses)-1] resp := op.Responses[i]
if lastResp.IsProgressNotify { if resp.IsProgressNotify {
return lastResp.Revision return resp.Revision
} }
if len(lastResp.Events) > 0 { if len(resp.Events) > 0 {
lastEvent := lastResp.Events[len(lastResp.Events)-1] lastEvent := resp.Events[len(resp.Events)-1]
return lastEvent.Revision return lastEvent.Revision
} }
} }