mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15575 from serathius/ensure-watch
tests: Ensure watch catches all events generated in traffic
This commit is contained in:
commit
4340cbb4aa
@ -202,21 +202,32 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
|
||||
trafficCancel()
|
||||
return nil
|
||||
})
|
||||
watchCtx, watchCancel := context.WithCancel(ctx)
|
||||
maxRevisionChan := make(chan int64, 1)
|
||||
g.Go(func() error {
|
||||
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
|
||||
time.Sleep(time.Second)
|
||||
watchCancel()
|
||||
maxRevisionChan <- operationsMaxRevision(operations)
|
||||
return nil
|
||||
})
|
||||
g.Go(func() error {
|
||||
responses = collectClusterWatchEvents(watchCtx, t, lg, clus)
|
||||
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
|
||||
return nil
|
||||
})
|
||||
g.Wait()
|
||||
return operations, responses
|
||||
}
|
||||
|
||||
func operationsMaxRevision(operations []porcupine.Operation) int64 {
|
||||
var maxRevision int64
|
||||
for _, op := range operations {
|
||||
revision := op.Output.(model.EtcdResponse).Revision
|
||||
if revision > maxRevision {
|
||||
maxRevision = revision
|
||||
}
|
||||
}
|
||||
return maxRevision
|
||||
}
|
||||
|
||||
// forcestopCluster stops the etcd member with signal kill.
|
||||
func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
|
||||
for _, member := range clus.Procs {
|
||||
|
@ -31,10 +31,11 @@ import (
|
||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||
)
|
||||
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchResponse {
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64) [][]watchResponse {
|
||||
mux := sync.Mutex{}
|
||||
var wg sync.WaitGroup
|
||||
memberResponses := make([][]watchResponse, len(clus.Procs))
|
||||
memberMaxRevisionChans := make([]chan int64, len(clus.Procs))
|
||||
for i, member := range clus.Procs {
|
||||
c, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: member.EndpointsV3(),
|
||||
@ -45,39 +46,81 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
memberChan := make(chan int64, 1)
|
||||
memberMaxRevisionChans[i] = memberChan
|
||||
wg.Add(1)
|
||||
go func(i int, c *clientv3.Client) {
|
||||
defer wg.Done()
|
||||
defer c.Close()
|
||||
responses := watchMember(ctx, lg, c)
|
||||
responses := watchMember(ctx, t, c, memberChan)
|
||||
mux.Lock()
|
||||
memberResponses[i] = responses
|
||||
mux.Unlock()
|
||||
}(i, c)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
maxRevision := <-maxRevisionChan
|
||||
for _, memberChan := range memberMaxRevisionChans {
|
||||
memberChan <- maxRevision
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
return memberResponses
|
||||
}
|
||||
|
||||
func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) {
|
||||
// watchMember collects all responses until context is cancelled or has observed revision provided via maxRevisionChan.
|
||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) {
|
||||
var maxRevision int64 = 0
|
||||
var lastRevision int64 = 0
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
watch := c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(1), clientv3.WithProgressNotify())
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
revision := watchResponsesMaxRevision(resps)
|
||||
if maxRevision == 0 {
|
||||
t.Errorf("Client didn't collect all events, max revision not set")
|
||||
}
|
||||
if revision < maxRevision {
|
||||
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision)
|
||||
}
|
||||
return resps
|
||||
default:
|
||||
}
|
||||
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) {
|
||||
resps = append(resps, watchResponse{resp, time.Now()})
|
||||
lastRevision = resp.Header.Revision
|
||||
if resp.Err() != nil {
|
||||
lg.Info("Watch error", zap.Error(resp.Err()))
|
||||
case maxRevision = <-maxRevisionChan:
|
||||
if lastRevision >= maxRevision {
|
||||
cancel()
|
||||
}
|
||||
case resp := <-watch:
|
||||
if resp.Err() == nil {
|
||||
resps = append(resps, watchResponse{resp, time.Now()})
|
||||
} else if !resp.Canceled {
|
||||
t.Errorf("Watch stream received error, err %v", resp.Err())
|
||||
}
|
||||
// Assumes that we track all events as we watch all keys.
|
||||
if len(resp.Events) > 0 {
|
||||
lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision
|
||||
}
|
||||
if maxRevision != 0 && lastRevision >= maxRevision {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchResponsesMaxRevision(responses []watchResponse) int64 {
|
||||
var maxRevision int64
|
||||
for _, response := range responses {
|
||||
for _, event := range response.Events {
|
||||
if event.Kv.ModRevision > maxRevision {
|
||||
maxRevision = event.Kv.ModRevision
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxRevision
|
||||
}
|
||||
|
||||
func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectProgressNotify bool) {
|
||||
for _, memberResponses := range responses {
|
||||
validateMemberWatchResponses(t, memberResponses, expectProgressNotify)
|
||||
|
Loading…
x
Reference in New Issue
Block a user