rafthttp: add AddRemote

Add remotes to rafthttp, who help newly joined members catch up the
progress of the cluster. It supports basic message sending to remote, and
has no stream connection for simplicity. remotes will not be used
after the latest peers have been added into rafthttp.
This commit is contained in:
Yicheng Qin 2015-04-20 23:48:36 -07:00
parent 1811701427
commit 9f19b5660f
5 changed files with 98 additions and 11 deletions

View File

@ -168,6 +168,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
var remotes []*Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
@ -180,6 +181,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
@ -269,8 +271,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)

View File

@ -1366,6 +1366,7 @@ type nopTransporter struct{}
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}

View File

@ -101,7 +101,7 @@ func (p *pipeline) handle() {
log.Printf("pipeline: the connection with %s became inactive", p.id)
p.active = false
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
p.r.ReportUnreachable(m.To)
@ -114,7 +114,7 @@ func (p *pipeline) handle() {
p.active = true
p.errored = nil
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
if isMsgSnap(m) {

48
rafthttp/remote.go Normal file
View File

@ -0,0 +1,48 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"log"
"net/http"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
pipeline *pipeline
}
func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
picker := newURLPicker(urls)
return &remote{
id: to,
pipeline: newPipeline(tr, picker, to, cid, nil, r, errorc),
}
}
func (g *remote) Send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize)
}
}
func (g *remote) Stop() {
g.pipeline.stop()
}

View File

@ -45,6 +45,12 @@ type Transporter interface {
// If the id cannot be found in the transport, the message
// will be ignored.
Send(m []raftpb.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
AddRemote(id types.ID, urls []string)
// AddPeer adds a peer with given peer urls into the transport.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
@ -70,9 +76,10 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]Peer // remote peers
errorc chan error
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@ -83,6 +90,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
raft: r,
serverStats: ss,
leaderStats: ls,
remotes: make(map[types.ID]*remote),
peers: make(map[types.ID]Peer),
errorc: errorc,
}
@ -110,21 +118,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
to := types.ID(m.To)
p, ok := t.peers[to]
if !ok {
log.Printf("etcdserver: send message to unknown receiver %s", to)
if ok {
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
}
p.Send(m)
continue
}
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
g, ok := t.remotes[to]
if ok {
g.Send(m)
continue
}
p.Send(m)
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
}
func (t *transport) Stop() {
for _, r := range t.remotes {
r.Stop()
}
for _, p := range t.peers {
p.Stop()
}
@ -133,6 +150,19 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.remotes[id]; ok {
return
}
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.remotes[id] = startRemote(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, t.errorc)
}
func (t *transport) AddPeer(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()