diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 2e3f17575..4feeaae81 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -134,6 +134,7 @@ type sender struct { fs *stats.FollowerStats q chan []byte mu sync.RWMutex + wg sync.WaitGroup } func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender { @@ -144,6 +145,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS fs: fs, q: make(chan []byte), } + s.wg.Add(connPerSender) for i := 0; i < connPerSender; i++ { go s.handle() } @@ -162,9 +164,11 @@ func (s *sender) send(data []byte) error { func (s *sender) stop() { close(s.q) + s.wg.Wait() } func (s *sender) handle() { + defer s.wg.Done() for d := range s.q { start := time.Now() err := s.post(d) diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index 7827010f0..2be6fdfdb 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -102,9 +102,6 @@ func TestSenderSend(t *testing.T) { t.Fatalf("unexpect send error: %v", err) } s.stop() - // wait for goroutines end - // TODO: elegant stop - time.Sleep(10 * time.Millisecond) if tr.Request() == nil { t.Errorf("sender fails to post the data") @@ -155,9 +152,6 @@ func TestSenderSendFailed(t *testing.T) { t.Fatalf("unexpect send error: %v", err) } s.stop() - // wait for goroutines end - // TODO: elegant stop - time.Sleep(10 * time.Millisecond) fs.Lock() defer fs.Unlock()