From 84fbf7aab58a13848fb5c342a25852874a7d812e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 10:21:05 -0800 Subject: [PATCH 1/4] *: etcdserver.sender -> rafthttp.Sender --- etcdserver/sender.go | 131 +++------------------- etcdserver/sender_test.go | 188 +------------------------------ rafthttp/sender.go | 148 +++++++++++++++++++++++++ rafthttp/sender_test.go | 226 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 390 insertions(+), 303 deletions(-) create mode 100644 rafthttp/sender.go create mode 100644 rafthttp/sender_test.go diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 875c48bde..ccacdc686 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -17,24 +17,19 @@ package etcdserver import ( - "bytes" - "fmt" "log" "net/http" "net/url" "path" - "sync" - "time" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) const ( - raftPrefix = "/raft" - connPerSender = 4 - senderBufSize = connPerSender * 4 + raftPrefix = "/raft" ) type sendHub struct { @@ -42,7 +37,7 @@ type sendHub struct { cl ClusterInfo ss *stats.ServerStats ls *stats.LeaderStats - senders map[types.ID]*sender + senders map[types.ID]rafthttp.Sender shouldstop chan struct{} } @@ -55,7 +50,7 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls * cl: cl, ss: ss, ls: ls, - senders: make(map[types.ID]*sender), + senders: make(map[types.ID]rafthttp.Sender), shouldstop: make(chan struct{}, 1), } for _, m := range cl.Members() { @@ -86,14 +81,13 @@ func (h *sendHub) Send(msgs []raftpb.Message) { h.ss.SendAppendReq(len(data)) } - // TODO (xiangli): reasonable retry logic - s.send(data) + s.Send(data) } } func (h *sendHub) Stop() { for _, s := range h.senders { - s.stop() + s.Stop() } } @@ -106,14 +100,19 @@ func (h *sendHub) Add(m *Member) { return } // TODO: considering how to switch between all available peer urls - u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix) + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) fs := h.ls.Follower(m.ID.String()) - s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop) + s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop) h.senders[m.ID] = s } func (h *sendHub) Remove(id types.ID) { - h.senders[id].stop() + h.senders[id].Stop() delete(h.senders, id) } @@ -128,105 +127,5 @@ func (h *sendHub) Update(m *Member) { log.Panicf("unexpect peer url %s", peerURL) } u.Path = path.Join(u.Path, raftPrefix) - s := h.senders[m.ID] - s.mu.Lock() - defer s.mu.Unlock() - s.u = u.String() -} - -type sender struct { - tr http.RoundTripper - u string - cid types.ID - fs *stats.FollowerStats - q chan []byte - mu sync.RWMutex - wg sync.WaitGroup - shouldstop chan struct{} -} - -func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { - s := &sender{ - tr: tr, - u: u, - cid: cid, - fs: fs, - q: make(chan []byte, senderBufSize), - shouldstop: shouldstop, - } - s.wg.Add(connPerSender) - for i := 0; i < connPerSender; i++ { - go s.handle() - } - return s -} - -func (s *sender) send(data []byte) error { - select { - case s.q <- data: - return nil - default: - log.Printf("sender: reach the maximal serving to %s", s.u) - return fmt.Errorf("reach maximal serving") - } -} - -func (s *sender) stop() { - close(s.q) - s.wg.Wait() -} - -func (s *sender) handle() { - defer s.wg.Done() - for d := range s.q { - start := time.Now() - err := s.post(d) - end := time.Now() - if err != nil { - s.fs.Fail() - log.Printf("sender: %v", err) - continue - } - s.fs.Succ(end.Sub(start)) - } -} - -// post POSTs a data payload to a url. Returns nil if the POST succeeds, -// error on any failure. -func (s *sender) post(data []byte) error { - s.mu.RLock() - req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) - s.mu.RUnlock() - if err != nil { - return fmt.Errorf("new request to %s error: %v", s.u, err) - } - req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) - resp, err := s.tr.RoundTrip(req) - if err != nil { - return fmt.Errorf("error posting to %q: %v", req.URL.String(), err) - } - resp.Body.Close() - - switch resp.StatusCode { - case http.StatusPreconditionFailed: - select { - case s.shouldstop <- struct{}{}: - default: - } - log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) - return nil - case http.StatusForbidden: - select { - case s.shouldstop <- struct{}{}: - default: - } - log.Println("etcdserver: this member has been permanently removed from the cluster") - log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") - return nil - case http.StatusNoContent: - return nil - default: - return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) - } + h.senders[m.ID].Update(u.String()) } diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index e24637093..5d5f6017d 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -17,10 +17,7 @@ package etcdserver import ( - "errors" - "io/ioutil" "net/http" - "sync" "testing" "time" @@ -64,9 +61,6 @@ func TestSendHubAdd(t *testing.T) { if !ok { t.Fatalf("senders[1] is nil, want exists") } - if s.u != "http://a/raft" { - t.Errorf("url = %s, want %s", s.u, "http://a/raft") - } h.Add(m) ns := h.senders[types.ID(1)] @@ -104,7 +98,7 @@ func TestSendHubShouldStop(t *testing.T) { t.Fatalf("received unexpected shouldstop notification") case <-time.After(10 * time.Millisecond): } - h.senders[1].send([]byte("somedata")) + h.senders[1].Send([]byte("somedata")) testutil.ForceGosched() select { @@ -114,169 +108,6 @@ func TestSendHubShouldStop(t *testing.T) { } } -// TestSenderSend tests that send func could post data using roundtripper -// and increase success count in stats. -func TestSenderSend(t *testing.T) { - tr := &roundTripperRecorder{} - fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - - if err := s.send([]byte("some data")); err != nil { - t.Fatalf("unexpect send error: %v", err) - } - s.stop() - - if tr.Request() == nil { - t.Errorf("sender fails to post the data") - } - fs.Lock() - defer fs.Unlock() - if fs.Counts.Success != 1 { - t.Errorf("success = %d, want 1", fs.Counts.Success) - } -} - -func TestSenderExceedMaximalServing(t *testing.T) { - tr := newRoundTripperBlocker() - fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - - // keep the sender busy and make the buffer full - // nothing can go out as we block the sender - for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.send([]byte("some data")); err != nil { - t.Errorf("send err = %v, want nil", err) - } - // force the sender to grab data - testutil.ForceGosched() - } - - // try to send a data when we are sure the buffer is full - if err := s.send([]byte("some data")); err == nil { - t.Errorf("unexpect send success") - } - - // unblock the senders and force them to send out the data - tr.unblock() - testutil.ForceGosched() - - // It could send new data after previous ones succeed - if err := s.send([]byte("some data")); err != nil { - t.Errorf("send err = %v, want nil", err) - } - s.stop() -} - -// TestSenderSendFailed tests that when send func meets the post error, -// it increases fail count in stats. -func TestSenderSendFailed(t *testing.T) { - fs := &stats.FollowerStats{} - s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - - if err := s.send([]byte("some data")); err != nil { - t.Fatalf("unexpect send error: %v", err) - } - s.stop() - - fs.Lock() - defer fs.Unlock() - if fs.Counts.Fail != 1 { - t.Errorf("fail = %d, want 1", fs.Counts.Fail) - } -} - -func TestSenderPost(t *testing.T) { - tr := &roundTripperRecorder{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) - if err := s.post([]byte("some data")); err != nil { - t.Fatalf("unexpect post error: %v", err) - } - s.stop() - - if g := tr.Request().Method; g != "POST" { - t.Errorf("method = %s, want %s", g, "POST") - } - if g := tr.Request().URL.String(); g != "http://10.0.0.1" { - t.Errorf("url = %s, want %s", g, "http://10.0.0.1") - } - if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" { - t.Errorf("content type = %s, want %s", g, "application/protobuf") - } - if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" { - t.Errorf("cluster id = %s, want %s", g, "1") - } - b, err := ioutil.ReadAll(tr.Request().Body) - if err != nil { - t.Fatalf("unexpected ReadAll error: %v", err) - } - if string(b) != "some data" { - t.Errorf("body = %s, want %s", b, "some data") - } -} - -func TestSenderPostBad(t *testing.T) { - tests := []struct { - u string - code int - err error - }{ - // bad url - {":bad url", http.StatusNoContent, nil}, - // RoundTrip returns error - {"http://10.0.0.1", 0, errors.New("blah")}, - // unexpected response status code - {"http://10.0.0.1", http.StatusOK, nil}, - {"http://10.0.0.1", http.StatusCreated, nil}, - } - for i, tt := range tests { - shouldstop := make(chan struct{}) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) - err := s.post([]byte("some data")) - s.stop() - - if err == nil { - t.Errorf("#%d: err = nil, want not nil", i) - } - } -} - -func TestSenderPostShouldStop(t *testing.T) { - tests := []struct { - u string - code int - err error - }{ - {"http://10.0.0.1", http.StatusForbidden, nil}, - {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, - } - for i, tt := range tests { - shouldstop := make(chan struct{}, 1) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) - s.post([]byte("some data")) - s.stop() - select { - case <-shouldstop: - default: - t.Fatalf("#%d: cannot receive shouldstop notification", i) - } - } -} - -type roundTripperBlocker struct { - c chan struct{} -} - -func newRoundTripperBlocker() *roundTripperBlocker { - return &roundTripperBlocker{c: make(chan struct{})} -} -func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) { - <-t.c - return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil -} -func (t *roundTripperBlocker) unblock() { - close(t.c) -} - type respRoundTripper struct { code int err error @@ -289,23 +120,6 @@ func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err } -type roundTripperRecorder struct { - req *http.Request - sync.Mutex -} - -func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) { - t.Lock() - defer t.Unlock() - t.req = req - return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil -} -func (t *roundTripperRecorder) Request() *http.Request { - t.Lock() - defer t.Unlock() - return t.req -} - type nopReadCloser struct{} func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } diff --git a/rafthttp/sender.go b/rafthttp/sender.go new file mode 100644 index 000000000..203a6c5aa --- /dev/null +++ b/rafthttp/sender.go @@ -0,0 +1,148 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "bytes" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" +) + +const ( + connPerSender = 4 + senderBufSize = connPerSender * 4 +) + +type Sender interface { + Update(u string) + // Send sends the data to the remote node. It is always non-blocking. + // It may be fail to send data if it returns nil error. + Send(data []byte) error + // Stop performs any necessary finalization and terminates the Sender + // elegantly. + Stop() +} + +func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { + s := &sender{ + tr: tr, + u: u, + cid: cid, + fs: fs, + q: make(chan []byte, senderBufSize), + shouldstop: shouldstop, + } + s.wg.Add(connPerSender) + for i := 0; i < connPerSender; i++ { + go s.handle() + } + return s +} + +type sender struct { + tr http.RoundTripper + u string + cid types.ID + fs *stats.FollowerStats + q chan []byte + mu sync.RWMutex + wg sync.WaitGroup + shouldstop chan struct{} +} + +func (s *sender) Update(u string) { + s.mu.Lock() + defer s.mu.Unlock() + s.u = u +} + +// TODO (xiangli): reasonable retry logic +func (s *sender) Send(data []byte) error { + select { + case s.q <- data: + return nil + default: + log.Printf("sender: reach the maximal serving to %s", s.u) + return fmt.Errorf("reach maximal serving") + } +} + +func (s *sender) Stop() { + close(s.q) + s.wg.Wait() +} + +func (s *sender) handle() { + defer s.wg.Done() + for d := range s.q { + start := time.Now() + err := s.post(d) + end := time.Now() + if err != nil { + s.fs.Fail() + log.Printf("sender: %v", err) + continue + } + s.fs.Succ(end.Sub(start)) + } +} + +// post POSTs a data payload to a url. Returns nil if the POST succeeds, +// error on any failure. +func (s *sender) post(data []byte) error { + s.mu.RLock() + req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() + if err != nil { + return fmt.Errorf("new request to %s error: %v", s.u, err) + } + req.Header.Set("Content-Type", "application/protobuf") + req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) + resp, err := s.tr.RoundTrip(req) + if err != nil { + return fmt.Errorf("error posting to %q: %v", req.URL.String(), err) + } + resp.Body.Close() + + switch resp.StatusCode { + case http.StatusPreconditionFailed: + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + return nil + case http.StatusForbidden: + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Println("etcdserver: this member has been permanently removed from the cluster") + log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + return nil + case http.StatusNoContent: + return nil + default: + return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) + } +} diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go new file mode 100644 index 000000000..6e86a4f0c --- /dev/null +++ b/rafthttp/sender_test.go @@ -0,0 +1,226 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "errors" + "io/ioutil" + "net/http" + "sync" + "testing" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/types" +) + +// TestSenderSend tests that send func could post data using roundtripper +// and increase success count in stats. +func TestSenderSend(t *testing.T) { + tr := &roundTripperRecorder{} + fs := &stats.FollowerStats{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + + if err := s.Send([]byte("some data")); err != nil { + t.Fatalf("unexpect send error: %v", err) + } + s.Stop() + + if tr.Request() == nil { + t.Errorf("sender fails to post the data") + } + fs.Lock() + defer fs.Unlock() + if fs.Counts.Success != 1 { + t.Errorf("success = %d, want 1", fs.Counts.Success) + } +} + +func TestSenderExceedMaximalServing(t *testing.T) { + tr := newRoundTripperBlocker() + fs := &stats.FollowerStats{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + + // keep the sender busy and make the buffer full + // nothing can go out as we block the sender + for i := 0; i < connPerSender+senderBufSize; i++ { + if err := s.Send([]byte("some data")); err != nil { + t.Errorf("send err = %v, want nil", err) + } + // force the sender to grab data + testutil.ForceGosched() + } + + // try to send a data when we are sure the buffer is full + if err := s.Send([]byte("some data")); err == nil { + t.Errorf("unexpect send success") + } + + // unblock the senders and force them to send out the data + tr.unblock() + testutil.ForceGosched() + + // It could send new data after previous ones succeed + if err := s.Send([]byte("some data")); err != nil { + t.Errorf("send err = %v, want nil", err) + } + s.Stop() +} + +// TestSenderSendFailed tests that when send func meets the post error, +// it increases fail count in stats. +func TestSenderSendFailed(t *testing.T) { + fs := &stats.FollowerStats{} + s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) + + if err := s.Send([]byte("some data")); err != nil { + t.Fatalf("unexpect Send error: %v", err) + } + s.Stop() + + fs.Lock() + defer fs.Unlock() + if fs.Counts.Fail != 1 { + t.Errorf("fail = %d, want 1", fs.Counts.Fail) + } +} + +func TestSenderPost(t *testing.T) { + tr := &roundTripperRecorder{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) + if err := s.post([]byte("some data")); err != nil { + t.Fatalf("unexpect post error: %v", err) + } + s.Stop() + + if g := tr.Request().Method; g != "POST" { + t.Errorf("method = %s, want %s", g, "POST") + } + if g := tr.Request().URL.String(); g != "http://10.0.0.1" { + t.Errorf("url = %s, want %s", g, "http://10.0.0.1") + } + if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" { + t.Errorf("content type = %s, want %s", g, "application/protobuf") + } + if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" { + t.Errorf("cluster id = %s, want %s", g, "1") + } + b, err := ioutil.ReadAll(tr.Request().Body) + if err != nil { + t.Fatalf("unexpected ReadAll error: %v", err) + } + if string(b) != "some data" { + t.Errorf("body = %s, want %s", b, "some data") + } +} + +func TestSenderPostBad(t *testing.T) { + tests := []struct { + u string + code int + err error + }{ + // bad url + {":bad url", http.StatusNoContent, nil}, + // RoundTrip returns error + {"http://10.0.0.1", 0, errors.New("blah")}, + // unexpected response status code + {"http://10.0.0.1", http.StatusOK, nil}, + {"http://10.0.0.1", http.StatusCreated, nil}, + } + for i, tt := range tests { + shouldstop := make(chan struct{}) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + err := s.post([]byte("some data")) + s.Stop() + + if err == nil { + t.Errorf("#%d: err = nil, want not nil", i) + } + } +} + +func TestSenderPostShouldStop(t *testing.T) { + tests := []struct { + u string + code int + err error + }{ + {"http://10.0.0.1", http.StatusForbidden, nil}, + {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, + } + for i, tt := range tests { + shouldstop := make(chan struct{}, 1) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + s.post([]byte("some data")) + s.Stop() + select { + case <-shouldstop: + default: + t.Fatalf("#%d: cannot receive shouldstop notification", i) + } + } +} + +type roundTripperBlocker struct { + c chan struct{} +} + +func newRoundTripperBlocker() *roundTripperBlocker { + return &roundTripperBlocker{c: make(chan struct{})} +} +func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) { + <-t.c + return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil +} +func (t *roundTripperBlocker) unblock() { + close(t.c) +} + +type respRoundTripper struct { + code int + err error +} + +func newRespRoundTripper(code int, err error) *respRoundTripper { + return &respRoundTripper{code: code, err: err} +} +func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err +} + +type roundTripperRecorder struct { + req *http.Request + sync.Mutex +} + +func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) { + t.Lock() + defer t.Unlock() + t.req = req + return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil +} +func (t *roundTripperRecorder) Request() *http.Request { + t.Lock() + defer t.Unlock() + return t.req +} + +type nopReadCloser struct{} + +func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } +func (n *nopReadCloser) Close() error { return nil } From 3fcc0117170021c316b7ad2162a6ecce47446b86 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 12:12:57 -0800 Subject: [PATCH 2/4] etcdserver: rename sender.go -> sendhub.go --- etcdserver/{sender.go => sendhub.go} | 0 etcdserver/{sender_test.go => sendhub_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename etcdserver/{sender.go => sendhub.go} (100%) rename etcdserver/{sender_test.go => sendhub_test.go} (100%) diff --git a/etcdserver/sender.go b/etcdserver/sendhub.go similarity index 100% rename from etcdserver/sender.go rename to etcdserver/sendhub.go diff --git a/etcdserver/sender_test.go b/etcdserver/sendhub_test.go similarity index 100% rename from etcdserver/sender_test.go rename to etcdserver/sendhub_test.go From 5dc5f8145cddd8f71c87644666394e93104a4208 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 16:23:47 -0800 Subject: [PATCH 3/4] *: etcdhttp.raftHandler -> rafthttp.RaftHandler --- etcdserver/etcdhttp/peer.go | 61 +--------- etcdserver/etcdhttp/peer_test.go | 150 ------------------------- etcdserver/server.go | 5 +- rafthttp/http.go | 99 +++++++++++++++++ rafthttp/http_test.go | 184 +++++++++++++++++++++++++++++++ 5 files changed, 288 insertions(+), 211 deletions(-) create mode 100644 rafthttp/http.go create mode 100644 rafthttp/http_test.go diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index ec3aa430b..483362dc7 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -18,14 +18,11 @@ package etcdhttp import ( "encoding/json" - "io/ioutil" "log" "net/http" - "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) const ( @@ -35,12 +32,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := &raftHandler{ - stats: server, - server: server, - clusterInfo: server.Cluster, - } - + rh := rafthttp.NewHandler(server, server.Cluster.ID(), server) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } @@ -52,55 +44,6 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { return mux } -type raftHandler struct { - stats etcdserver.Stats - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo -} - -func (h *raftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "POST") { - return - } - - wcid := h.clusterInfo.ID().String() - w.Header().Set("X-Etcd-Cluster-ID", wcid) - - gcid := r.Header.Get("X-Etcd-Cluster-ID") - if gcid != wcid { - log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) - http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) - return - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("etcdhttp: error reading raft message:", err) - http.Error(w, "error reading raft message", http.StatusBadRequest) - return - } - var m raftpb.Message - if err := m.Unmarshal(b); err != nil { - log.Println("etcdhttp: error unmarshaling raft message:", err) - http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) - return - } - if err := h.server.Process(context.TODO(), m); err != nil { - switch err { - case etcdserver.ErrRemoved: - log.Printf("etcdhttp: reject message from removed member %s", types.ID(m.From).String()) - http.Error(w, "cannot process message from removed member", http.StatusForbidden) - default: - writeError(w, err) - } - return - } - if m.Type == raftpb.MsgApp { - h.stats.UpdateRecvApp(types.ID(m.From), r.ContentLength) - } - w.WriteHeader(http.StatusNoContent) -} - type peerMembersHandler struct { clusterInfo etcdserver.ClusterInfo } diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index 495d9eb4a..29e8b0dc1 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -17,165 +17,15 @@ package etcdhttp import ( - "bytes" "encoding/json" - "errors" - "io" "net/http" "net/http/httptest" "path" - "strings" "testing" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/raft/raftpb" ) -func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte { - json, err := m.Marshal() - if err != nil { - t.Fatalf("error marshalling raft Message: %#v", err) - } - return json -} - -// errReader implements io.Reader to facilitate a broken request. -type errReader struct{} - -func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } - -func TestServeRaft(t *testing.T) { - testCases := []struct { - method string - body io.Reader - serverErr error - clusterID string - - wcode int - }{ - { - // bad method - "GET", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "PUT", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "DELETE", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad request body - "POST", - &errReader{}, - nil, - "0", - http.StatusBadRequest, - }, - { - // bad request protobuf - "POST", - strings.NewReader("malformed garbage"), - nil, - "0", - http.StatusBadRequest, - }, - { - // good request, etcdserver.Server internal error - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - errors.New("some error"), - "0", - http.StatusInternalServerError, - }, - { - // good request from removed member - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - etcdserver.ErrRemoved, - "0", - http.StatusForbidden, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "1", - http.StatusPreconditionFailed, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusNoContent, - }, - } - for i, tt := range testCases { - req, err := http.NewRequest(tt.method, "foo", tt.body) - if err != nil { - t.Fatalf("#%d: could not create request: %#v", i, err) - } - req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) - rw := httptest.NewRecorder() - h := &raftHandler{stats: nil, server: &errServer{tt.serverErr}, clusterInfo: &fakeCluster{id: 0}} - h.ServeHTTP(rw, req) - if rw.Code != tt.wcode { - t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - func TestServeMembersFails(t *testing.T) { tests := []struct { method string diff --git a/etcdserver/server.go b/etcdserver/server.go index 48c8c480f..14d59c934 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" @@ -61,7 +62,6 @@ const ( var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") - ErrRemoved = errors.New("etcdserver: server removed") ErrIDRemoved = errors.New("etcdserver: ID removed") ErrIDExists = errors.New("etcdserver: ID exists") ErrIDNotFound = errors.New("etcdserver: ID not found") @@ -318,7 +318,8 @@ func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.Cluster.IsIDRemoved(types.ID(m.From)) { - return ErrRemoved + log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) + return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } return s.node.Step(ctx, m) } diff --git a/rafthttp/http.go b/rafthttp/http.go new file mode 100644 index 000000000..096cce34f --- /dev/null +++ b/rafthttp/http.go @@ -0,0 +1,99 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "io/ioutil" + "log" + "net/http" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +type Processor interface { + Process(ctx context.Context, m raftpb.Message) error +} + +type Stats interface { + UpdateRecvApp(from types.ID, length int64) +} + +func NewHandler(p Processor, cid types.ID, ss Stats) http.Handler { + return &handler{ + p: p, + cid: cid, + ss: ss, + } +} + +type handler struct { + p Processor + cid types.ID + ss Stats +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.Header().Set("Allow", "POST") + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + wcid := h.cid.String() + w.Header().Set("X-Etcd-Cluster-ID", wcid) + + gcid := r.Header.Get("X-Etcd-Cluster-ID") + if gcid != wcid { + log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) + return + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("rafthttp: error reading raft message:", err) + http.Error(w, "error reading raft message", http.StatusBadRequest) + return + } + var m raftpb.Message + if err := m.Unmarshal(b); err != nil { + log.Println("rafthttp: error unmarshaling raft message:", err) + http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) + return + } + if err := h.p.Process(context.TODO(), m); err != nil { + switch v := err.(type) { + case writerToResponse: + v.WriteTo(w) + default: + log.Printf("rafthttp: error processing raft message: %v", err) + http.Error(w, "error processing raft message", http.StatusInternalServerError) + } + return + } + if m.Type == raftpb.MsgApp { + h.ss.UpdateRecvApp(types.ID(m.From), r.ContentLength) + } + w.WriteHeader(http.StatusNoContent) +} + +type writerToResponse interface { + WriteTo(w http.ResponseWriter) +} diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go new file mode 100644 index 000000000..1718a9709 --- /dev/null +++ b/rafthttp/http_test.go @@ -0,0 +1,184 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "bytes" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestServeRaft(t *testing.T) { + testCases := []struct { + method string + body io.Reader + p Processor + clusterID string + + wcode int + }{ + { + // bad method + "GET", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "PUT", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "DELETE", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad request body + "POST", + &errReader{}, + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // bad request protobuf + "POST", + strings.NewReader("malformed garbage"), + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // good request, wrong cluster ID + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "1", + http.StatusPreconditionFailed, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusForbidden}, + }, + "0", + http.StatusForbidden, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusInternalServerError}, + }, + "0", + http.StatusInternalServerError, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{err: errors.New("blah")}, + "0", + http.StatusInternalServerError, + }, + { + // good request + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusNoContent, + }, + } + for i, tt := range testCases { + req, err := http.NewRequest(tt.method, "foo", tt.body) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) + rw := httptest.NewRecorder() + h := NewHandler(tt.p, types.ID(0), &nopStats{}) + h.ServeHTTP(rw, req) + if rw.Code != tt.wcode { + t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +// errReader implements io.Reader to facilitate a broken request. +type errReader struct{} + +func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } + +type nopProcessor struct{} + +func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil } + +type errProcessor struct { + err error +} + +func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } + +type nopStats struct{} + +func (s *nopStats) UpdateRecvApp(from types.ID, length int64) {} + +type resWriterToError struct { + code int +} + +func (e *resWriterToError) Error() string { return "" } +func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) } From f24e214ee5239f3e7f77cf0986dd750be62e59be Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 17 Nov 2014 14:40:21 -0800 Subject: [PATCH 4/4] rafthttp: move server stats in raftHandler to etcdserver --- etcdserver/etcdhttp/peer.go | 2 +- etcdserver/server.go | 9 +++------ rafthttp/http.go | 11 +---------- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 483362dc7..9f13976ca 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -32,7 +32,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := rafthttp.NewHandler(server, server.Cluster.ID(), server) + rh := rafthttp.NewHandler(server, server.Cluster.ID()) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } diff --git a/etcdserver/server.go b/etcdserver/server.go index 14d59c934..5b2eae515 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -145,8 +145,6 @@ type Stats interface { LeaderStats() []byte // StoreStats returns statistics of the store backing this EtcdServer StoreStats() []byte - // UpdateRecvApp updates the underlying statistics in response to a receiving an Append request - UpdateRecvApp(from types.ID, length int64) } type RaftTimer interface { @@ -321,6 +319,9 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } + if m.Type == raftpb.MsgApp { + s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) + } return s.node.Step(ctx, m) } @@ -486,10 +487,6 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } -func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) { - s.stats.RecvAppendReq(from.String(), int(length)) -} - func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { // TODO: move Member to protobuf type b, err := json.Marshal(memb) diff --git a/rafthttp/http.go b/rafthttp/http.go index 096cce34f..87ff9f924 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -31,22 +31,16 @@ type Processor interface { Process(ctx context.Context, m raftpb.Message) error } -type Stats interface { - UpdateRecvApp(from types.ID, length int64) -} - -func NewHandler(p Processor, cid types.ID, ss Stats) http.Handler { +func NewHandler(p Processor, cid types.ID) http.Handler { return &handler{ p: p, cid: cid, - ss: ss, } } type handler struct { p Processor cid types.ID - ss Stats } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -88,9 +82,6 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } return } - if m.Type == raftpb.MsgApp { - h.ss.UpdateRecvApp(types.ID(m.From), r.ContentLength) - } w.WriteHeader(http.StatusNoContent) }