diff --git a/etcdserver/server.go b/etcdserver/server.go index 6ef4a7167..bf6aa3f30 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -168,6 +168,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) + var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: if err := cfg.VerifyJoinExisting(); err != nil { @@ -180,6 +181,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } + remotes = existingCluster.Members() cfg.Cluster.SetID(existingCluster.id) cfg.Cluster.SetStore(st) cfg.Print() @@ -269,8 +271,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), } + // TODO: move transport initialization near the definition of remote tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) - // add all the remote members into sendhub + // add all remotes into transport + for _, m := range remotes { + if m.ID != id { + tr.AddRemote(m.ID, m.PeerURLs) + } + } for _, m := range cfg.Cluster.Members() { if m.ID != id { tr.AddPeer(m.ID, m.PeerURLs) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 315719545..8da0a36f0 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1366,6 +1366,7 @@ type nopTransporter struct{} func (s *nopTransporter) Handler() http.Handler { return nil } func (s *nopTransporter) Send(m []raftpb.Message) {} +func (s *nopTransporter) AddRemote(id types.ID, us []string) {} func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) RemoveAllPeers() {} diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 5e8ebb1c6..4443d3e25 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -101,7 +101,7 @@ func (p *pipeline) handle() { log.Printf("pipeline: the connection with %s became inactive", p.id) p.active = false } - if m.Type == raftpb.MsgApp { + if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Fail() } p.r.ReportUnreachable(m.To) @@ -114,7 +114,7 @@ func (p *pipeline) handle() { p.active = true p.errored = nil } - if m.Type == raftpb.MsgApp { + if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Succ(end.Sub(start)) } if isMsgSnap(m) { diff --git a/rafthttp/remote.go b/rafthttp/remote.go new file mode 100644 index 000000000..396e6b03f --- /dev/null +++ b/rafthttp/remote.go @@ -0,0 +1,48 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "log" + "net/http" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +type remote struct { + id types.ID + pipeline *pipeline +} + +func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote { + picker := newURLPicker(urls) + return &remote{ + id: to, + pipeline: newPipeline(tr, picker, to, cid, nil, r, errorc), + } +} + +func (g *remote) Send(m raftpb.Message) { + select { + case g.pipeline.msgc <- m: + default: + log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize) + } +} + +func (g *remote) Stop() { + g.pipeline.stop() +} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index aaff28c22..163028531 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -45,6 +45,12 @@ type Transporter interface { // If the id cannot be found in the transport, the message // will be ignored. Send(m []raftpb.Message) + // AddRemote adds a remote with given peer urls into the transport. + // A remote helps newly joined member to catch up the progress of cluster, + // and will not be used after that. + // It is the caller's responsibility to ensure the urls are all vaild, + // or it panics. + AddRemote(id types.ID, urls []string) // AddPeer adds a peer with given peer urls into the transport. // It is the caller's responsibility to ensure the urls are all vaild, // or it panics. @@ -70,9 +76,10 @@ type transport struct { serverStats *stats.ServerStats leaderStats *stats.LeaderStats - mu sync.RWMutex // protect the peer map - peers map[types.ID]Peer // remote peers - errorc chan error + mu sync.RWMutex // protect the remote and peer map + remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up + peers map[types.ID]Peer // peers map + errorc chan error } func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter { @@ -83,6 +90,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan raft: r, serverStats: ss, leaderStats: ls, + remotes: make(map[types.ID]*remote), peers: make(map[types.ID]Peer), errorc: errorc, } @@ -110,21 +118,30 @@ func (t *transport) Send(msgs []raftpb.Message) { continue } to := types.ID(m.To) + p, ok := t.peers[to] - if !ok { - log.Printf("etcdserver: send message to unknown receiver %s", to) + if ok { + if m.Type == raftpb.MsgApp { + t.serverStats.SendAppendReq(m.Size()) + } + p.Send(m) continue } - if m.Type == raftpb.MsgApp { - t.serverStats.SendAppendReq(m.Size()) + g, ok := t.remotes[to] + if ok { + g.Send(m) + continue } - p.Send(m) + log.Printf("etcdserver: send message to unknown receiver %s", to) } } func (t *transport) Stop() { + for _, r := range t.remotes { + r.Stop() + } for _, p := range t.peers { p.Stop() } @@ -133,6 +150,19 @@ func (t *transport) Stop() { } } +func (t *transport) AddRemote(id types.ID, us []string) { + t.mu.Lock() + defer t.mu.Unlock() + if _, ok := t.remotes[id]; ok { + return + } + urls, err := types.NewURLs(us) + if err != nil { + log.Panicf("newURLs %+v should never fail: %+v", us, err) + } + t.remotes[id] = startRemote(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, t.errorc) +} + func (t *transport) AddPeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock()