From ceb9fe4822bda2552a6dc3430ba894b6679ccddc Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 14 Jul 2016 11:28:46 -0700 Subject: [PATCH] etcd-tester: stop stress before compact, fix races fix race condition between stresser cancel, start --- .../functional-tester/etcd-tester/stresser.go | 11 ++++---- tools/functional-tester/etcd-tester/tester.go | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index f8d7818ee..c50c844c3 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -72,7 +72,6 @@ func (s *stresser) Stress() error { if err != nil { return fmt.Errorf("%v (%s)", err, s.Endpoint) } - defer conn.Close() ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -91,7 +90,7 @@ func (s *stresser) Stress() error { go s.run(ctx, kvc) } - <-ctx.Done() + plog.Printf("stresser %q is started", s.Endpoint) return nil } @@ -161,11 +160,13 @@ func (s *stresser) run(ctx context.Context, kvc pb.KVClient) { func (s *stresser) Cancel() { s.mu.Lock() - cancel, conn, wg := s.cancel, s.conn, s.wg + s.cancel() + s.conn.Close() + wg := s.wg s.mu.Unlock() - cancel() + wg.Wait() - conn.Close() + plog.Printf("stresser %q is canceled", s.Endpoint) } func (s *stresser) Report() (int, int) { diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index cc7799340..e1613da0c 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -143,7 +143,12 @@ func (tt *tester) updateRevision() error { func (tt *tester) checkConsistency() (failed bool, err error) { tt.cancelStressers() - defer tt.startStressers() + defer func() { + serr := tt.startStressers() + if err == nil { + err = serr + } + }() plog.Printf("%s updating current revisions...", tt.logPrefix()) var ( @@ -184,15 +189,23 @@ func (tt *tester) checkConsistency() (failed bool, err error) { return } -func (tt *tester) compact(rev int64, timeout time.Duration) error { +func (tt *tester) compact(rev int64, timeout time.Duration) (err error) { + tt.cancelStressers() + defer func() { + serr := tt.startStressers() + if err == nil { + err = serr + } + }() + plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev) - if err := tt.cluster.compactKV(rev, timeout); err != nil { + if err = tt.cluster.compactKV(rev, timeout); err != nil { return err } plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev) plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev) - if err := tt.cluster.checkCompact(rev); err != nil { + if err = tt.cluster.checkCompact(rev); err != nil { plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err) return err } @@ -257,10 +270,13 @@ func (tt *tester) cancelStressers() { plog.Printf("%s canceled stressers", tt.logPrefix()) } -func (tt *tester) startStressers() { +func (tt *tester) startStressers() error { plog.Printf("%s starting the stressers...", tt.logPrefix()) for _, s := range tt.cluster.Stressers { - go s.Stress() + if err := s.Stress(); err != nil { + return err + } } plog.Printf("%s started stressers", tt.logPrefix()) + return nil }