Merge pull request #4470 from gyuho/test

etcd-tester: close leaky gRPC connections
This commit is contained in:
Gyu-Ho Lee 2016-02-09 16:30:09 -08:00
commit bb504fdf23
2 changed files with 19 additions and 18 deletions

View File

@ -52,10 +52,8 @@ type stresser struct {
N int
mu sync.Mutex
failure int
success int
mu sync.Mutex
wg *sync.WaitGroup
cancel func()
conn *grpc.ClientConn
}
@ -65,17 +63,23 @@ func (s *stresser) Stress() error {
if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint)
}
defer conn.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(s.N)
s.mu.Lock()
s.conn = conn
s.cancel = cancel
s.wg = wg
s.mu.Unlock()
kvc := pb.NewKVClient(conn)
for i := 0; i < s.N; i++ {
go func(i int) {
defer wg.Done()
for {
putctx, putcancel := context.WithTimeout(ctx, 5*time.Second)
_, err := kvc.Put(putctx, &pb.PutRequest{
@ -83,16 +87,9 @@ func (s *stresser) Stress() error {
Value: []byte(randStr(s.KeySize)),
})
putcancel()
if grpc.ErrorDesc(err) == context.Canceled.Error() {
if err != nil {
return
}
s.mu.Lock()
if err != nil {
s.failure++
} else {
s.success++
}
s.mu.Unlock()
}
}(i)
}
@ -103,17 +100,18 @@ func (s *stresser) Stress() error {
func (s *stresser) Cancel() {
s.mu.Lock()
cancel, conn := s.cancel, s.conn
cancel, conn, wg := s.cancel, s.conn, s.wg
s.mu.Unlock()
cancel()
// TODO: wait for all routines to exit by adding a waitGroup before calling conn.Close()
wg.Wait()
conn.Close()
}
func (s *stresser) Report() (success int, failure int) {
func (s *stresser) Report() (int, int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.success, s.failure
// TODO: find a better way to report v3 tests
return -1, -1
}
type stresserV2 struct {

View File

@ -218,10 +218,11 @@ func (c *cluster) getRevision() (map[string]int64, error) {
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
cancel()
conn.Close()
if err != nil {
return nil, err
}
cancel()
revs[u] = resp.Header.Revision
}
return revs, nil
@ -237,10 +238,11 @@ func (c *cluster) getKVHash() (map[string]int64, error) {
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Hash(ctx, &pb.HashRequest{})
cancel()
conn.Close()
if err != nil {
return nil, err
}
cancel()
hashes[u] = int64(resp.Hash)
}
return hashes, nil
@ -260,6 +262,7 @@ func (c *cluster) compactKV(rev int64) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
cancel()
conn.Close()
if err == nil {
return nil
}