Merge f7c6c33b19eb4dca60599e9a01e95ab833c57bb0 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Ahmad Zolfaghari 2024-09-26 22:00:06 +01:00 committed by GitHub
commit 25273b43fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 59 additions and 0 deletions

View File

@ -316,6 +316,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) model.WatchRe
}
resp.IsProgressNotify = r.IsProgressNotify()
resp.Revision = r.Header.Revision
resp.MemberId = r.Header.MemberId
err := r.Err()
if err != nil {
resp.Error = r.Err().Error()

View File

@ -101,6 +101,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
watchProgressNotifyEnabled := c.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
}
validateWatchSequential(t, r.Client)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

View File

@ -26,5 +26,6 @@ type WatchResponse struct {
IsProgressNotify bool
Revision int64
Time time.Duration
MemberId uint64
Error string
}

View File

@ -16,6 +16,7 @@ package robustness
import (
"context"
"sort"
"sync"
"testing"
"time"
@ -141,3 +142,58 @@ external:
t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify)
}
}
type timeLastRevision struct {
time time.Duration
lastRevision int64
}
func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision {
result := make(map[uint64][]timeLastRevision)
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
if len(resp.Events) == 0 {
continue
}
result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision})
}
}
}
for memberId, structs := range result {
sort.Slice(structs, func(i, j int) bool {
return structs[i].time < structs[j].time
})
result[memberId] = structs
}
return result
}
func validateWatchSequential(t *testing.T, reports []report.ClientReport) {
combinedWatchResponses := combineWatchResponses(reports)
for _, r := range reports {
for _, op := range r.Watch {
if op.Request.Revision != 0 {
continue
}
for _, resp := range op.Responses {
if len(resp.Events) == 0 {
continue
}
var lastMemberWatchRevision int64
for i, c := range combinedWatchResponses[resp.MemberId] {
// Reports are sorted by time, find first greater or equal and use previous one.
if resp.Time >= c.time {
if i == 0 {
continue
}
lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision
}
}
if resp.Events[0].Revision < lastMemberWatchRevision {
t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId)
}
}
}
}
}