diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 324a2a29b..e2bbd3a60 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,28 +15,17 @@ package rafthttp import ( - "bytes" "errors" - "fmt" - "log" "net/http" "sync" "time" "github.com/coreos/etcd/etcdserver/stats" - "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" ) const ( - connPerSender = 4 - // senderBufSize is the size of sender buffer, which helps hold the - // temporary network latency. - // The size ensures that sender does not drop messages when the network - // is out of work for less than 1 second in good path. - senderBufSize = 64 - appRespBatchMs = 50 propBatchMs = 10 @@ -50,50 +39,35 @@ type peer struct { id types.ID cid types.ID - tr http.RoundTripper - r Raft - fs *stats.FollowerStats - errorc chan error + tr http.RoundTripper + // the url this sender post to + u string + r Raft + fs *stats.FollowerStats batcher *Batcher propBatcher *ProposalBatcher - q chan *raftpb.Message - stream *stream + pipeline *pipeline + stream *stream - // wait for the handling routines - wg sync.WaitGroup - - // the url this sender post to - u string - // if the last send was successful, the sender is active. - // Or it is inactive - active bool - errored error paused bool stopped bool } func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { - p := &peer{ + return &peer{ id: id, - active: true, + cid: cid, tr: tr, u: u, - cid: cid, r: r, fs: fs, + pipeline: newPipeline(tr, u, id, cid, fs, errorc), stream: &stream{}, - errorc: errorc, batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), - q: make(chan *raftpb.Message, senderBufSize), } - p.wg.Add(connPerSender) - for i := 0; i < connPerSender; i++ { - go p.handle() - } - return p } func (p *peer) Update(u string) { @@ -104,6 +78,7 @@ func (p *peer) Update(u string) { panic("peer: update a stopped peer") } p.u = u + p.pipeline.update(u) } // Send sends the data to the remote node. It is always non-blocking. @@ -134,14 +109,14 @@ func (p *peer) Send(m raftpb.Message) error { p.propBatcher.Batch(m) case canBatch(m) && p.stream.isOpen(): if !p.batcher.ShouldBatch(time.Now()) { - err = p.send(m) + err = p.pipeline.send(m) } case canUseStream(m): if ok := p.stream.write(m); !ok { - err = p.send(m) + err = p.pipeline.send(m) } default: - err = p.send(m) + err = p.pipeline.send(m) } // send out batched MsgProp if needed // TODO: it is triggered by all outcoming send now, and it needs @@ -150,111 +125,23 @@ func (p *peer) Send(m raftpb.Message) error { if !p.propBatcher.IsEmpty() { t := time.Now() if !p.propBatcher.ShouldBatch(t) { - p.send(p.propBatcher.Message) + p.pipeline.send(p.propBatcher.Message) p.propBatcher.Reset(t) } } return err } -func (p *peer) send(m raftpb.Message) error { - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - select { - case p.q <- &m: - return nil - default: - log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached", - m.Type, senderBufSize, p.u) - return fmt.Errorf("reach maximal serving") - } -} - // Stop performs any necessary finalization and terminates the peer // elegantly. func (p *peer) Stop() { - close(p.q) - p.wg.Wait() - p.Lock() defer p.Unlock() + p.pipeline.stop() p.stream.stop() p.stopped = true } -func (p *peer) handle() { - defer p.wg.Done() - for m := range p.q { - start := time.Now() - err := p.post(pbutil.MustMarshal(m)) - end := time.Now() - - p.Lock() - if err != nil { - if p.errored == nil || p.errored.Error() != err.Error() { - log.Printf("sender: error posting to %s: %v", p.id, err) - p.errored = err - } - if p.active { - log.Printf("sender: the connection with %s became inactive", p.id) - p.active = false - } - if m.Type == raftpb.MsgApp { - p.fs.Fail() - } - } else { - if !p.active { - log.Printf("sender: the connection with %s became active", p.id) - p.active = true - p.errored = nil - } - if m.Type == raftpb.MsgApp { - p.fs.Succ(end.Sub(start)) - } - } - p.Unlock() - } -} - -// post POSTs a data payload to a url. Returns nil if the POST succeeds, -// error on any failure. -func (p *peer) post(data []byte) error { - p.Lock() - req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data)) - p.Unlock() - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", p.cid.String()) - resp, err := p.tr.RoundTrip(req) - if err != nil { - return err - } - resp.Body.Close() - - switch resp.StatusCode { - case http.StatusPreconditionFailed: - err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) - select { - case p.errorc <- err: - default: - } - return nil - case http.StatusForbidden: - err := fmt.Errorf("the member has been permanently removed from the cluster") - select { - case p.errorc <- err: - default: - } - return nil - case http.StatusNoContent: - return nil - default: - return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String()) - } -} - // attachStream attaches a streamSever to the peer. func (p *peer) attachStream(sw *streamWriter) error { p.Lock() diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go new file mode 100644 index 000000000..a1b002ffc --- /dev/null +++ b/rafthttp/pipeline.go @@ -0,0 +1,169 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "bytes" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +const ( + connPerPipeline = 4 + // pipelineBufSize is the size of pipeline buffer, which helps hold the + // temporary network latency. + // The size ensures that pipeline does not drop messages when the network + // is out of work for less than 1 second in good path. + pipelineBufSize = 64 +) + +type pipeline struct { + id types.ID + cid types.ID + + tr http.RoundTripper + // the url this pipeline sends to + u string + fs *stats.FollowerStats + errorc chan error + + q chan *raftpb.Message + // wait for the handling routines + wg sync.WaitGroup + sync.Mutex + // if the last send was successful, the pipeline is active. + // Or it is inactive + active bool + errored error +} + +func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, errorc chan error) *pipeline { + p := &pipeline{ + id: id, + cid: cid, + tr: tr, + u: u, + fs: fs, + errorc: errorc, + q: make(chan *raftpb.Message, pipelineBufSize), + active: true, + } + p.wg.Add(connPerPipeline) + for i := 0; i < connPerPipeline; i++ { + go p.handle() + } + return p +} + +func (p *pipeline) update(u string) { p.u = u } + +func (p *pipeline) send(m raftpb.Message) error { + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + select { + case p.q <- &m: + return nil + default: + log.Printf("pipeline: dropping %s because maximal number %d of pipeline buffer entries to %s has been reached", + m.Type, pipelineBufSize, p.u) + return fmt.Errorf("reach maximal serving") + } +} + +func (p *pipeline) stop() { + close(p.q) + p.wg.Wait() +} + +func (p *pipeline) handle() { + defer p.wg.Done() + for m := range p.q { + start := time.Now() + err := p.pipeline(pbutil.MustMarshal(m)) + end := time.Now() + + p.Lock() + if err != nil { + if p.errored == nil || p.errored.Error() != err.Error() { + log.Printf("pipeline: error posting to %s: %v", p.id, err) + p.errored = err + } + if p.active { + log.Printf("pipeline: the connection with %s became inactive", p.id) + p.active = false + } + if m.Type == raftpb.MsgApp { + p.fs.Fail() + } + } else { + if !p.active { + log.Printf("pipeline: the connection with %s became active", p.id) + p.active = true + p.errored = nil + } + if m.Type == raftpb.MsgApp { + p.fs.Succ(end.Sub(start)) + } + } + p.Unlock() + } +} + +// post POSTs a data payload to a url. Returns nil if the POST succeeds, +// error on any failure. +func (p *pipeline) pipeline(data []byte) error { + p.Lock() + req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data)) + p.Unlock() + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/protobuf") + req.Header.Set("X-Etcd-Cluster-ID", p.cid.String()) + resp, err := p.tr.RoundTrip(req) + if err != nil { + return err + } + resp.Body.Close() + + switch resp.StatusCode { + case http.StatusPreconditionFailed: + err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) + select { + case p.errorc <- err: + default: + } + return nil + case http.StatusForbidden: + err := fmt.Errorf("the member has been permanently removed from the cluster") + select { + case p.errorc <- err: + default: + } + return nil + case http.StatusNoContent: + return nil + default: + return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String()) + } +} diff --git a/rafthttp/peer_test.go b/rafthttp/pipeline_test.go similarity index 75% rename from rafthttp/peer_test.go rename to rafthttp/pipeline_test.go index 6aa77dcbd..ce738cb07 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/pipeline_test.go @@ -27,17 +27,17 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -// TestSenderSend tests that send func could post data using roundtripper +// TestPipelineSend tests that pipeline could send data using roundtripper // and increase success count in stats. -func TestSenderSend(t *testing.T) { +func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) - if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { + if err := p.send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect send error: %v", err) } - p.Stop() + p.stop() if tr.Request() == nil { t.Errorf("sender fails to post the data") @@ -49,15 +49,15 @@ func TestSenderSend(t *testing.T) { } } -func TestSenderExceedMaximalServing(t *testing.T) { +func TestPipelineExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender - for i := 0; i < connPerSender+senderBufSize; i++ { - if err := p.Send(raftpb.Message{}); err != nil { + for i := 0; i < connPerPipeline+pipelineBufSize; i++ { + if err := p.send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } // force the sender to grab data @@ -65,7 +65,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { } // try to send a data when we are sure the buffer is full - if err := p.Send(raftpb.Message{}); err == nil { + if err := p.send(raftpb.Message{}); err == nil { t.Errorf("unexpect send success") } @@ -74,22 +74,22 @@ func TestSenderExceedMaximalServing(t *testing.T) { testutil.ForceGosched() // It could send new data after previous ones succeed - if err := p.Send(raftpb.Message{}); err != nil { + if err := p.send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } - p.Stop() + p.stop() } -// TestSenderSendFailed tests that when send func meets the post error, +// TestPipelineSendFailed tests that when send func meets the post error, // it increases fail count in stats. -func TestSenderSendFailed(t *testing.T) { +func TestPipelineSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) - if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { + if err := p.send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect Send error: %v", err) } - p.Stop() + p.stop() fs.Lock() defer fs.Unlock() @@ -98,13 +98,13 @@ func TestSenderSendFailed(t *testing.T) { } } -func TestSenderPost(t *testing.T) { +func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} - p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil) - if err := p.post([]byte("some data")); err != nil { + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, nil) + if err := p.pipeline([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } - p.Stop() + p.stop() if g := tr.Request().Method; g != "POST" { t.Errorf("method = %s, want %s", g, "POST") @@ -127,7 +127,7 @@ func TestSenderPost(t *testing.T) { } } -func TestSenderPostBad(t *testing.T) { +func TestPipelinePostBad(t *testing.T) { tests := []struct { u string code int @@ -142,9 +142,9 @@ func TestSenderPostBad(t *testing.T) { {"http://10.0.0.1", http.StatusCreated, nil}, } for i, tt := range tests { - p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, make(chan error)) - err := p.post([]byte("some data")) - p.Stop() + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, make(chan error)) + err := p.pipeline([]byte("some data")) + p.stop() if err == nil { t.Errorf("#%d: err = nil, want not nil", i) @@ -152,7 +152,7 @@ func TestSenderPostBad(t *testing.T) { } } -func TestPeerPostErrorc(t *testing.T) { +func TestPipelinePostErrorc(t *testing.T) { tests := []struct { u string code int @@ -163,9 +163,9 @@ func TestPeerPostErrorc(t *testing.T) { } for i, tt := range tests { errorc := make(chan error, 1) - p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, errorc) - p.post([]byte("some data")) - p.Stop() + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, errorc) + p.pipeline([]byte("some data")) + p.stop() select { case <-errorc: default: