sender: support elegant stop

This commit is contained in:
Yicheng Qin 2014-11-13 17:06:21 -08:00
parent e66bda957b
commit 12dba7d413
2 changed files with 4 additions and 6 deletions

View File

@ -134,6 +134,7 @@ type sender struct {
fs *stats.FollowerStats fs *stats.FollowerStats
q chan []byte q chan []byte
mu sync.RWMutex mu sync.RWMutex
wg sync.WaitGroup
} }
func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender { func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender {
@ -144,6 +145,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS
fs: fs, fs: fs,
q: make(chan []byte), q: make(chan []byte),
} }
s.wg.Add(connPerSender)
for i := 0; i < connPerSender; i++ { for i := 0; i < connPerSender; i++ {
go s.handle() go s.handle()
} }
@ -162,9 +164,11 @@ func (s *sender) send(data []byte) error {
func (s *sender) stop() { func (s *sender) stop() {
close(s.q) close(s.q)
s.wg.Wait()
} }
func (s *sender) handle() { func (s *sender) handle() {
defer s.wg.Done()
for d := range s.q { for d := range s.q {
start := time.Now() start := time.Now()
err := s.post(d) err := s.post(d)

View File

@ -102,9 +102,6 @@ func TestSenderSend(t *testing.T) {
t.Fatalf("unexpect send error: %v", err) t.Fatalf("unexpect send error: %v", err)
} }
s.stop() s.stop()
// wait for goroutines end
// TODO: elegant stop
time.Sleep(10 * time.Millisecond)
if tr.Request() == nil { if tr.Request() == nil {
t.Errorf("sender fails to post the data") t.Errorf("sender fails to post the data")
@ -155,9 +152,6 @@ func TestSenderSendFailed(t *testing.T) {
t.Fatalf("unexpect send error: %v", err) t.Fatalf("unexpect send error: %v", err)
} }
s.stop() s.stop()
// wait for goroutines end
// TODO: elegant stop
time.Sleep(10 * time.Millisecond)
fs.Lock() fs.Lock()
defer fs.Unlock() defer fs.Unlock()