From 7d2b7e0d2389500bb011a34c6e9dc3f0999a4b48 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 9 Feb 2016 14:59:09 -0800 Subject: [PATCH] etcd-tester: close leaky gRPC connections when closed errors will be one of: ``` grpc.ErrorDesc(err) == context.Canceled.Error() || grpc.ErrorDesc(err) == context.DeadlineExceeded.Error() || grpc.ErrorDesc(err) == "transport is closing" || grpc.ErrorDesc(err) == "grpc: the client connection is closing" ``` --- .../functional-tester/etcd-tester/stresser.go | 30 +++++++++---------- tools/functional-tester/etcd-tester/tester.go | 7 +++-- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index dc3c81712..f9b55a2e7 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -52,10 +52,8 @@ type stresser struct { N int - mu sync.Mutex - failure int - success int - + mu sync.Mutex + wg *sync.WaitGroup cancel func() conn *grpc.ClientConn } @@ -65,17 +63,23 @@ func (s *stresser) Stress() error { if err != nil { return fmt.Errorf("%v (%s)", err, s.Endpoint) } + defer conn.Close() ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(s.N) + s.mu.Lock() s.conn = conn s.cancel = cancel + s.wg = wg s.mu.Unlock() kvc := pb.NewKVClient(conn) for i := 0; i < s.N; i++ { go func(i int) { + defer wg.Done() for { putctx, putcancel := context.WithTimeout(ctx, 5*time.Second) _, err := kvc.Put(putctx, &pb.PutRequest{ @@ -83,16 +87,9 @@ func (s *stresser) Stress() error { Value: []byte(randStr(s.KeySize)), }) putcancel() - if grpc.ErrorDesc(err) == context.Canceled.Error() { + if err != nil { return } - s.mu.Lock() - if err != nil { - s.failure++ - } else { - s.success++ - } - s.mu.Unlock() } }(i) } @@ -103,17 +100,18 @@ func (s *stresser) Stress() error { func (s *stresser) Cancel() { s.mu.Lock() - cancel, conn := s.cancel, s.conn + cancel, conn, wg := s.cancel, s.conn, s.wg s.mu.Unlock() cancel() - // TODO: wait for all routines to exit by adding a waitGroup before calling conn.Close() + wg.Wait() conn.Close() } -func (s *stresser) Report() (success int, failure int) { +func (s *stresser) Report() (int, int) { s.mu.Lock() defer s.mu.Unlock() - return s.success, s.failure + // TODO: find a better way to report v3 tests + return -1, -1 } type stresserV2 struct { diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index bfa1fec68..9f6c19571 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -218,10 +218,11 @@ func (c *cluster) getRevision() (map[string]int64, error) { kvc := pb.NewKVClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")}) + cancel() + conn.Close() if err != nil { return nil, err } - cancel() revs[u] = resp.Header.Revision } return revs, nil @@ -237,10 +238,11 @@ func (c *cluster) getKVHash() (map[string]int64, error) { kvc := pb.NewKVClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := kvc.Hash(ctx, &pb.HashRequest{}) + cancel() + conn.Close() if err != nil { return nil, err } - cancel() hashes[u] = int64(resp.Hash) } return hashes, nil @@ -260,6 +262,7 @@ func (c *cluster) compactKV(rev int64) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev}) cancel() + conn.Close() if err == nil { return nil }