From ad688b2a85e192d31872e77c5d3d10d09e396889 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Mar 2023 20:41:39 +0200 Subject: [PATCH] tests: Ensure watch catches all events generated in traffic Signed-off-by: Marek Siarkowicz --- tests/robustness/linearizability_test.go | 17 +++++-- tests/robustness/watch.go | 65 ++++++++++++++++++++---- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 6cc65a467..8fabd7415 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -202,21 +202,32 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et trafficCancel() return nil }) - watchCtx, watchCancel := context.WithCancel(ctx) + maxRevisionChan := make(chan int64, 1) g.Go(func() error { operations = simulateTraffic(trafficCtx, t, lg, clus, traffic) time.Sleep(time.Second) - watchCancel() + maxRevisionChan <- operationsMaxRevision(operations) return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(watchCtx, t, lg, clus) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan) return nil }) g.Wait() return operations, responses } +func operationsMaxRevision(operations []porcupine.Operation) int64 { + var maxRevision int64 + for _, op := range operations { + revision := op.Output.(model.EtcdResponse).Revision + if revision > maxRevision { + maxRevision = revision + } + } + return maxRevision +} + // forcestopCluster stops the etcd member with signal kill. func forcestopCluster(clus *e2e.EtcdProcessCluster) error { for _, member := range clus.Procs { diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 0a3910433..0cd46e08a 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -31,10 +31,11 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup memberResponses := make([][]watchResponse, len(clus.Procs)) + memberMaxRevisionChans := make([]chan int64, len(clus.Procs)) for i, member := range clus.Procs { c, err := clientv3.New(clientv3.Config{ Endpoints: member.EndpointsV3(), @@ -45,39 +46,81 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger if err != nil { t.Fatal(err) } - + memberChan := make(chan int64, 1) + memberMaxRevisionChans[i] = memberChan wg.Add(1) go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - responses := watchMember(ctx, lg, c) + responses := watchMember(ctx, t, c, memberChan) mux.Lock() memberResponses[i] = responses mux.Unlock() }(i, c) } + wg.Add(1) + go func() { + defer wg.Done() + maxRevision := <-maxRevisionChan + for _, memberChan := range memberMaxRevisionChans { + memberChan <- maxRevision + } + }() wg.Wait() return memberResponses } -func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) { +// watchMember collects all responses until context is cancelled or has observed revision provided via maxRevisionChan. +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) { + var maxRevision int64 = 0 var lastRevision int64 = 0 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + watch := c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(1), clientv3.WithProgressNotify()) for { select { case <-ctx.Done(): + revision := watchResponsesMaxRevision(resps) + if maxRevision == 0 { + t.Errorf("Client didn't collect all events, max revision not set") + } + if revision < maxRevision { + t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision) + } return resps - default: - } - for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) { - resps = append(resps, watchResponse{resp, time.Now()}) - lastRevision = resp.Header.Revision - if resp.Err() != nil { - lg.Info("Watch error", zap.Error(resp.Err())) + case maxRevision = <-maxRevisionChan: + if lastRevision >= maxRevision { + cancel() + } + case resp := <-watch: + if resp.Err() == nil { + resps = append(resps, watchResponse{resp, time.Now()}) + } else if !resp.Canceled { + t.Errorf("Watch stream received error, err %v", resp.Err()) + } + // Assumes that we track all events as we watch all keys. + if len(resp.Events) > 0 { + lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision + } + if maxRevision != 0 && lastRevision >= maxRevision { + cancel() } } } } +func watchResponsesMaxRevision(responses []watchResponse) int64 { + var maxRevision int64 + for _, response := range responses { + for _, event := range response.Events { + if event.Kv.ModRevision > maxRevision { + maxRevision = event.Kv.ModRevision + } + } + } + return maxRevision +} + func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectProgressNotify bool) { for _, memberResponses := range responses { validateMemberWatchResponses(t, memberResponses, expectProgressNotify)