mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Combine watch responses for concurrent client requests and find last revision based one time of response
Signed-off-by: ah8ad3 <ah8ad3@gmail.com>
This commit is contained in:
parent
ab6921bbce
commit
f7c6c33b19
@ -16,6 +16,7 @@ package robustness
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -142,25 +143,56 @@ external:
|
||||
}
|
||||
}
|
||||
|
||||
type timeLastRevision struct {
|
||||
time time.Duration
|
||||
lastRevision int64
|
||||
}
|
||||
|
||||
func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision {
|
||||
result := make(map[uint64][]timeLastRevision)
|
||||
for _, r := range reports {
|
||||
for _, op := range r.Watch {
|
||||
for _, resp := range op.Responses {
|
||||
if len(resp.Events) == 0 {
|
||||
continue
|
||||
}
|
||||
result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision})
|
||||
}
|
||||
}
|
||||
}
|
||||
for memberId, structs := range result {
|
||||
sort.Slice(structs, func(i, j int) bool {
|
||||
return structs[i].time < structs[j].time
|
||||
})
|
||||
result[memberId] = structs
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func validateWatchSequential(t *testing.T, reports []report.ClientReport) {
|
||||
combinedWatchResponses := combineWatchResponses(reports)
|
||||
for _, r := range reports {
|
||||
for _, op := range r.Watch {
|
||||
if op.Request.Revision != 0 {
|
||||
continue
|
||||
}
|
||||
lastEventRevision := make(map[uint64]int64)
|
||||
for _, resp := range op.Responses {
|
||||
if len(resp.Events) == 0 {
|
||||
continue
|
||||
}
|
||||
if _, ok := lastEventRevision[resp.MemberId]; !ok {
|
||||
lastEventRevision[resp.MemberId] = 1
|
||||
var lastMemberWatchRevision int64
|
||||
for i, c := range combinedWatchResponses[resp.MemberId] {
|
||||
// Reports are sorted by time, find first greater or equal and use previous one.
|
||||
if resp.Time >= c.time {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision
|
||||
}
|
||||
}
|
||||
firstEventRevision := resp.Events[0].Revision
|
||||
if firstEventRevision < lastEventRevision[resp.MemberId] {
|
||||
t.Errorf("Error watch sequential, expect: %v or higher, got: %v, member id: %v", lastEventRevision[resp.MemberId], firstEventRevision, resp.MemberId)
|
||||
if resp.Events[0].Revision < lastMemberWatchRevision {
|
||||
t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId)
|
||||
}
|
||||
lastEventRevision[resp.MemberId] = resp.Events[len(resp.Events)-1].Revision
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user