From f7c6c33b19eb4dca60599e9a01e95ab833c57bb0 Mon Sep 17 00:00:00 2001 From: ah8ad3 Date: Wed, 3 Jul 2024 10:07:24 +0330 Subject: [PATCH] Combine watch responses for concurrent client requests and find last revision based one time of response Signed-off-by: ah8ad3 --- tests/robustness/watch.go | 46 +++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 1453b71f6..254d81164 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "sort" "sync" "testing" "time" @@ -142,25 +143,56 @@ external: } } +type timeLastRevision struct { + time time.Duration + lastRevision int64 +} + +func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision { + result := make(map[uint64][]timeLastRevision) + for _, r := range reports { + for _, op := range r.Watch { + for _, resp := range op.Responses { + if len(resp.Events) == 0 { + continue + } + result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision}) + } + } + } + for memberId, structs := range result { + sort.Slice(structs, func(i, j int) bool { + return structs[i].time < structs[j].time + }) + result[memberId] = structs + } + return result +} + func validateWatchSequential(t *testing.T, reports []report.ClientReport) { + combinedWatchResponses := combineWatchResponses(reports) for _, r := range reports { for _, op := range r.Watch { if op.Request.Revision != 0 { continue } - lastEventRevision := make(map[uint64]int64) for _, resp := range op.Responses { if len(resp.Events) == 0 { continue } - if _, ok := lastEventRevision[resp.MemberId]; !ok { - lastEventRevision[resp.MemberId] = 1 + var lastMemberWatchRevision int64 + for i, c := range combinedWatchResponses[resp.MemberId] { + // Reports are sorted by time, find first greater or equal and use previous one. + if resp.Time >= c.time { + if i == 0 { + continue + } + lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision + } } - firstEventRevision := resp.Events[0].Revision - if firstEventRevision < lastEventRevision[resp.MemberId] { - t.Errorf("Error watch sequential, expect: %v or higher, got: %v, member id: %v", lastEventRevision[resp.MemberId], firstEventRevision, resp.MemberId) + if resp.Events[0].Revision < lastMemberWatchRevision { + t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId) } - lastEventRevision[resp.MemberId] = resp.Events[len(resp.Events)-1].Revision } } }