From 1e05cd75c778db5eaf7f9b9b41b4d096944e2da2 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 4 Nov 2014 15:57:25 -0800 Subject: [PATCH] etcdserver: refactor sender 1. restrict the number of inflight connections to remote member 2. support stop --- etcdserver/cluster.go | 1 + etcdserver/etcdhttp/http_test.go | 1 + etcdserver/sender.go | 169 +++++++++++++++++++++---------- etcdserver/server.go | 17 ++-- etcdserver/server_test.go | 42 +++++--- 5 files changed, 152 insertions(+), 78 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 058e19a0d..a176c4d8f 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -46,6 +46,7 @@ type ClusterInfo interface { // Members returns a slice of members sorted by their ID Members() []*Member Member(id types.ID) *Member + IsIDRemoved(id types.ID) bool } // Cluster is a list of Members that belong to the same raft cluster diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 439fea9eb..476a10725 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -57,6 +57,7 @@ func (c *fakeCluster) Members() []*etcdserver.Member { return []*etcdserver.Member(sms) } func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] } +func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false } // errServer implements the etcd.Server interface for testing. // It returns the given error from any Do/Process/AddMember/RemoveMember calls. diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 29020a94b..fb5744a44 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -28,42 +28,38 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -const raftPrefix = "/raft" +const ( + raftPrefix = "/raft" + maxConnsPerSender = 4 +) -// Sender creates the default production sender used to transport raft messages -// in the cluster. The returned sender will update the given ServerStats and +type sendHub struct { + tr *http.Transport + cl ClusterInfo + ss *stats.ServerStats + ls *stats.LeaderStats + senders map[types.ID]*sender +} + +// newSendHub creates the default send hub used to transport raft messages +// to other members. The returned sendHub will update the given ServerStats and // LeaderStats appropriately. -func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) { - c := &http.Client{Transport: t} - - return func(msgs []raftpb.Message) { - for _, m := range msgs { - // TODO: reuse go routines - // limit the number of outgoing connections for the same receiver - go send(c, cl, m, ss, ls) - } +func newSendHub(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { + return &sendHub{ + tr: t, + cl: cl, + ss: ss, + ls: ls, + senders: make(map[types.ID]*sender), } } -// send uses the given client to send a message to a member in the given -// ClusterStore, retrying up to 3 times for each message. The given -// ServerStats and LeaderStats are updated appropriately -func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) { - to := types.ID(m.To) - cid := cl.ID() - // TODO (xiangli): reasonable retry logic - for i := 0; i < 3; i++ { - memb := cl.Member(to) - if memb == nil { - if !cl.IsIDRemoved(to) { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Printf("etcdserver: error sending message to unknown receiver %s", to.String()) - } - return +func (h *sendHub) Send(msgs []raftpb.Message) { + for _, m := range msgs { + s := h.sender(types.ID(m.To)) + if s == nil { + continue } - u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix) // TODO: don't block. we should be able to have 1000s // of messages out at a time. @@ -73,52 +69,113 @@ func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, return // drop bad message } if m.Type == raftpb.MsgApp { - ss.SendAppendReq(len(data)) + h.ss.SendAppendReq(len(data)) } - fs := ls.Follower(to.String()) - start := time.Now() - sent := httpPost(c, u, cid, data) - end := time.Now() - if sent { - fs.Succ(end.Sub(start)) - return - } - fs.Fail() - // TODO: backoff + // TODO (xiangli): reasonable retry logic + s.send(data) } } -// httpPost POSTs a data payload to a url using the given client. Returns true -// if the POST succeeds, false on any failure. -func httpPost(c *http.Client, url string, cid types.ID, data []byte) bool { - req, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) +func (h *sendHub) Stop() { + for _, s := range h.senders { + s.stop() + } +} + +func (h *sendHub) sender(id types.ID) *sender { + if s, ok := h.senders[id]; ok { + return s + } + return h.add(id) +} + +func (h *sendHub) add(id types.ID) *sender { + memb := h.cl.Member(id) + if memb == nil { + if !h.cl.IsIDRemoved(id) { + log.Printf("etcdserver: error sending message to unknown receiver %s", id) + } + return nil + } + // TODO: considering how to switch between all available peer urls + u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix) + c := &http.Client{Transport: h.tr} + fs := h.ls.Follower(id.String()) + s := newSender(u, h.cl.ID(), c, fs) + // TODO: recycle sender during long running + h.senders[id] = s + return s +} + +type sender struct { + u string + cid types.ID + c *http.Client + fs *stats.FollowerStats + q chan []byte +} + +func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender { + s := &sender{u: u, cid: cid, c: c, fs: fs, q: make(chan []byte)} + for i := 0; i < maxConnsPerSender; i++ { + go s.handle() + } + return s +} + +func (s *sender) send(data []byte) { + // TODO: we cannot afford the miss of MsgProp, so we wait for some handler + // to take the data + s.q <- data +} + +func (s *sender) stop() { + close(s.q) +} + +func (s *sender) handle() { + for d := range s.q { + start := time.Now() + err := s.post(d) + end := time.Now() + if err != nil { + s.fs.Fail() + log.Printf("sender: %v", err) + continue + } + s.fs.Succ(end.Sub(start)) + } +} + +// post POSTs a data payload to a url. Returns nil if the POST succeeds, +// error on any failure. +func (s *sender) post(data []byte) error { + req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) if err != nil { - // TODO: log the error? - return false + return fmt.Errorf("new request to %s error: %v", s.u, err) } req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", cid.String()) - resp, err := c.Do(req) + req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) + resp, err := s.c.Do(req) if err != nil { - // TODO: log the error? - return false + return fmt.Errorf("do request %+v error: %v", req, err) } resp.Body.Close() switch resp.StatusCode { case http.StatusPreconditionFailed: // TODO: shutdown the etcdserver gracefully? - log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), cid.String()) - return false + log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + return nil case http.StatusForbidden: // TODO: stop the server log.Println("etcd: this member has been permanently removed from the cluster") log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") - return false + return nil case http.StatusNoContent: - return true + return nil default: - return false + return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) } } diff --git a/etcdserver/server.go b/etcdserver/server.go index 0c4d28c45..483fa3215 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -78,14 +78,17 @@ func init() { rand.Seed(time.Now().UnixNano()) } -type sendFunc func(m []raftpb.Message) - type Response struct { Event *store.Event Watcher store.Watcher err error } +type Sender interface { + Send(m []raftpb.Message) + Stop() +} + type Storage interface { // Save function saves ents and state to the underlying stable storage. // Save MUST block until st and ents are on stable storage. @@ -156,11 +159,11 @@ type EtcdServer struct { stats *stats.ServerStats lstats *stats.LeaderStats - // send specifies the send function for sending msgs to members. send + // sender specifies the sender to send msgs to members. sending msgs // MUST NOT block. It is okay to drop messages, since clients should // timeout and reissue their messages. If send is nil, server will // panic. - send sendFunc + sender Sender storage Storage @@ -241,6 +244,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } lstats := stats.NewLeaderStats(id.String()) + shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats) s := &EtcdServer{ store: st, node: n, @@ -253,7 +257,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { }{w, ss}, stats: sstats, lstats: lstats, - send: Sender(cfg.Transport, cfg.Cluster, sstats, lstats), + sender: shub, Ticker: time.Tick(100 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, @@ -318,7 +322,7 @@ func (s *EtcdServer) run() { if err := s.storage.SaveSnap(rd.Snapshot); err != nil { log.Fatalf("etcdserver: create snapshot error: %v", err) } - s.send(rd.Messages) + s.sender.Send(rd.Messages) // TODO(bmizerany): do this in the background, but take // care to apply entries in a single goroutine, and not @@ -361,6 +365,7 @@ func (s *EtcdServer) Stop() { s.node.Stop() close(s.done) <-s.stopped + s.sender.Stop() } // Do interprets r and performs an operation on s.store according to r.Method diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 995bce519..b5a9486fc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -487,19 +487,23 @@ func TestApplyConfChangeError(t *testing.T) { func TestClusterOf1(t *testing.T) { testServer(t, 1) } func TestClusterOf3(t *testing.T) { testServer(t, 3) } +type fakeSender struct { + ss []*EtcdServer +} + +func (s *fakeSender) Send(msgs []raftpb.Message) { + for _, m := range msgs { + s.ss[m.To-1].node.Step(context.TODO(), m) + } +} +func (s *fakeSender) Stop() {} + func testServer(t *testing.T, ns uint64) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ss := make([]*EtcdServer, ns) - send := func(msgs []raftpb.Message) { - for _, m := range msgs { - t.Logf("m = %+v\n", m) - ss[m.To-1].node.Step(ctx, m) - } - } - ids := make([]uint64, ns) for i := uint64(0); i < ns; i++ { ids[i] = i + 1 @@ -516,7 +520,7 @@ func testServer(t *testing.T, ns uint64) { srv := &EtcdServer{ node: n, store: st, - send: send, + sender: &fakeSender{ss}, storage: &storageRecorder{}, Ticker: tk.C, Cluster: cl, @@ -585,7 +589,7 @@ func TestDoProposal(t *testing.T) { srv := &EtcdServer{ node: n, store: st, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, Ticker: tk, Cluster: cl, @@ -668,7 +672,7 @@ func TestDoProposalStopped(t *testing.T) { // TODO: use fake node for better testability node: n, store: st, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, Ticker: tk, } @@ -768,7 +772,7 @@ func TestSyncTrigger(t *testing.T) { srv := &EtcdServer{ node: n, store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, SyncTicker: st, } @@ -842,7 +846,7 @@ func TestTriggerSnap(t *testing.T) { cl.SetStore(store.New()) s := &EtcdServer{ store: st, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: p, node: n, snapCount: 10, @@ -876,7 +880,7 @@ func TestRecvSnapshot(t *testing.T) { p := &storageRecorder{} s := &EtcdServer{ store: st, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: p, node: n, } @@ -904,7 +908,7 @@ func TestRecvSlowSnapshot(t *testing.T) { st := &storeRecorder{} s := &EtcdServer{ store: st, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, node: n, } @@ -939,7 +943,7 @@ func TestAddMember(t *testing.T) { s := &EtcdServer{ node: n, store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, Cluster: cl, } @@ -974,7 +978,7 @@ func TestRemoveMember(t *testing.T) { s := &EtcdServer{ node: n, store: &storeRecorder{}, - send: func(_ []raftpb.Message) {}, + sender: &nopSender{}, storage: &storageRecorder{}, Cluster: cl, } @@ -1042,6 +1046,7 @@ func TestPublish(t *testing.T) { func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ node: &nodeRecorder{}, + sender: &nopSender{}, Cluster: &Cluster{}, w: &waitRecorder{}, done: make(chan struct{}), @@ -1402,6 +1407,11 @@ func (cs *removedClusterStore) Get() Cluster { return Cluster{} } func (cs *removedClusterStore) Remove(id uint64) {} func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] } +type nopSender struct{} + +func (s *nopSender) Send(m []raftpb.Message) {} +func (s *nopSender) Stop() {} + func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { peers := make([]raft.Peer, len(ids)) for i, id := range ids {