diff --git a/etcdserver/sendhub.go b/etcdserver/sendhub.go index a34befdd4..b062393d9 100644 --- a/etcdserver/sendhub.go +++ b/etcdserver/sendhub.go @@ -113,7 +113,7 @@ func (h *sendHub) Add(m *Member) { } u.Path = path.Join(u.Path, raftPrefix) fs := h.ls.Follower(m.ID.String()) - s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop) + s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop) h.senders[m.ID] = s } diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 515226b0e..412849546 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -65,8 +65,10 @@ type Sender interface { Resume() } -func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { +func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { s := &sender{ + id: id, + active: true, tr: tr, u: u, cid: cid, @@ -85,9 +87,10 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st } type sender struct { + id types.ID + cid types.ID + tr http.RoundTripper - u string - cid types.ID p Processor fs *stats.FollowerStats shouldstop chan struct{} @@ -99,9 +102,16 @@ type sender struct { strmSrvMu sync.Mutex q chan []byte - paused bool - mu sync.RWMutex - wg sync.WaitGroup + // wait for the handling routines + wg sync.WaitGroup + + mu sync.RWMutex + u string // the url this sender post to + // if the last send was successful, thi sender is active. + // Or it is inactive + active bool + errored error + paused bool } func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) { @@ -258,12 +268,27 @@ func (s *sender) handle() { start := time.Now() err := s.post(d) end := time.Now() + + s.mu.Lock() if err != nil { + if s.errored == nil || s.errored.Error() != err.Error() { + log.Printf("sender: error posting to %s: %v", s.id, err) + s.errored = err + } + if s.active { + log.Printf("sender: the connection with %s becomes inactive", s.id) + s.active = false + } s.fs.Fail() - log.Printf("sender: %v", err) - continue + } else { + if !s.active { + log.Printf("sender: the connection with %s becomes active", s.id) + s.active = true + s.errored = nil + } + s.fs.Succ(end.Sub(start)) } - s.fs.Succ(end.Sub(start)) + s.mu.Unlock() } } @@ -274,13 +299,13 @@ func (s *sender) post(data []byte) error { req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) s.mu.RUnlock() if err != nil { - return fmt.Errorf("new request to %s error: %v", s.u, err) + return err } req.Header.Set("Content-Type", "application/protobuf") req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) resp, err := s.tr.RoundTrip(req) if err != nil { - return fmt.Errorf("error posting to %q: %v", req.URL.String(), err) + return err } resp.Body.Close() @@ -290,15 +315,15 @@ func (s *sender) post(data []byte) error { case s.shouldstop <- struct{}{}: default: } - log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) return nil case http.StatusForbidden: select { case s.shouldstop <- struct{}{}: default: } - log.Println("etcdserver: this member has been permanently removed from the cluster") - log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + log.Println("rafthttp: this member has been permanently removed from the cluster") + log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") return nil case http.StatusNoContent: return nil diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go index d47e86a94..2181c4ca6 100644 --- a/rafthttp/sender_test.go +++ b/rafthttp/sender_test.go @@ -34,7 +34,7 @@ import ( func TestSenderSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect send error: %v", err) @@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) { func TestSenderExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -86,7 +86,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { // it increases fail count in stats. func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil) + s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect Send error: %v", err) @@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) { func TestSenderPost(t *testing.T) { tr := &roundTripperRecorder{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil) if err := s.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}) - s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) err := s.post([]byte("some data")) s.Stop() @@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}, 1) - s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) s.post([]byte("some data")) s.Stop() select {