mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: transport cleanup
This commit is contained in:
parent
e1ee335c3a
commit
5bb8eeb5cf
@ -275,7 +275,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
RoundTripper: cfg.Transport,
|
RoundTripper: cfg.Transport,
|
||||||
ID: id,
|
ID: id,
|
||||||
ClusterID: cfg.Cluster.ID(),
|
ClusterID: cfg.Cluster.ID(),
|
||||||
Processor: srv,
|
Raft: srv,
|
||||||
ServerStats: sstats,
|
ServerStats: sstats,
|
||||||
LeaderStats: lstats,
|
LeaderStats: lstats,
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,6 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -536,8 +535,7 @@ type fakeTransporter struct {
|
|||||||
ss []*EtcdServer
|
ss []*EtcdServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeTransporter) Handler() http.Handler { return nil }
|
func (s *fakeTransporter) Handler() http.Handler { return nil }
|
||||||
func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
|
|
||||||
func (s *fakeTransporter) Send(msgs []raftpb.Message) {
|
func (s *fakeTransporter) Send(msgs []raftpb.Message) {
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
s.ss[m.To-1].node.Step(context.TODO(), m)
|
s.ss[m.To-1].node.Step(context.TODO(), m)
|
||||||
@ -1632,7 +1630,6 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
|||||||
type nopTransporter struct{}
|
type nopTransporter struct{}
|
||||||
|
|
||||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||||
func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender { return nil }
|
|
||||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||||
|
@ -40,30 +40,25 @@ var (
|
|||||||
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
|
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
|
||||||
)
|
)
|
||||||
|
|
||||||
type SenderFinder interface {
|
func NewHandler(r Raft, cid types.ID) http.Handler {
|
||||||
// Sender returns the sender of the given id.
|
|
||||||
Sender(id types.ID) Sender
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHandler(p Processor, cid types.ID) http.Handler {
|
|
||||||
return &handler{
|
return &handler{
|
||||||
p: p,
|
r: r,
|
||||||
cid: cid,
|
cid: cid,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStreamHandler returns a handler which initiates streamer when receiving
|
// NewStreamHandler returns a handler which initiates streamer when receiving
|
||||||
// stream request from follower.
|
// stream request from follower.
|
||||||
func NewStreamHandler(finder SenderFinder, id, cid types.ID) http.Handler {
|
func NewStreamHandler(tr *Transport, id, cid types.ID) http.Handler {
|
||||||
return &streamHandler{
|
return &streamHandler{
|
||||||
finder: finder,
|
tr: tr,
|
||||||
id: id,
|
id: id,
|
||||||
cid: cid,
|
cid: cid,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type handler struct {
|
type handler struct {
|
||||||
p Processor
|
r Raft
|
||||||
cid types.ID
|
cid types.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +94,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
|
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := h.p.Process(context.TODO(), m); err != nil {
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||||
switch v := err.(type) {
|
switch v := err.(type) {
|
||||||
case writerToResponse:
|
case writerToResponse:
|
||||||
v.WriteTo(w)
|
v.WriteTo(w)
|
||||||
@ -113,9 +108,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type streamHandler struct {
|
type streamHandler struct {
|
||||||
finder SenderFinder
|
tr *Transport
|
||||||
id types.ID
|
id types.ID
|
||||||
cid types.ID
|
cid types.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -132,8 +127,8 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "invalid path", http.StatusNotFound)
|
http.Error(w, "invalid path", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s := h.finder.Sender(from)
|
p := h.tr.Peer(from)
|
||||||
if s == nil {
|
if p == nil {
|
||||||
log.Printf("rafthttp: fail to find sender %s", from)
|
log.Printf("rafthttp: fail to find sender %s", from)
|
||||||
http.Error(w, "error sender not found", http.StatusNotFound)
|
http.Error(w, "error sender not found", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
@ -164,7 +159,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
|
|
||||||
done, err := s.StartStreaming(w.(WriteFlusher), from, term)
|
done, err := p.StartStreaming(w.(WriteFlusher), from, term)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
|
log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
|
||||||
// TODO: consider http status and info here
|
// TODO: consider http status and info here
|
||||||
|
@ -36,7 +36,7 @@ func TestServeRaft(t *testing.T) {
|
|||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
method string
|
method string
|
||||||
body io.Reader
|
body io.Reader
|
||||||
p Processor
|
p Raft
|
||||||
clusterID string
|
clusterID string
|
||||||
|
|
||||||
wcode int
|
wcode int
|
||||||
|
@ -45,53 +45,33 @@ const (
|
|||||||
ConnWriteTimeout = 5 * time.Second
|
ConnWriteTimeout = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sender interface {
|
func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer {
|
||||||
// StartStreaming enables streaming in the sender using the given writer,
|
p := &peer{
|
||||||
// which provides a fast and efficient way to send appendEntry messages.
|
|
||||||
StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
|
|
||||||
Update(u string)
|
|
||||||
// Send sends the data to the remote node. It is always non-blocking.
|
|
||||||
// It may be fail to send data if it returns nil error.
|
|
||||||
Send(m raftpb.Message) error
|
|
||||||
// Stop performs any necessary finalization and terminates the Sender
|
|
||||||
// elegantly.
|
|
||||||
Stop()
|
|
||||||
|
|
||||||
// Pause pauses the sender. The sender will simply drops all incoming
|
|
||||||
// messages without retruning an error.
|
|
||||||
Pause()
|
|
||||||
|
|
||||||
// Resume resumes a paused sender.
|
|
||||||
Resume()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
|
|
||||||
s := &sender{
|
|
||||||
id: id,
|
id: id,
|
||||||
active: true,
|
active: true,
|
||||||
tr: tr,
|
tr: tr,
|
||||||
u: u,
|
u: u,
|
||||||
cid: cid,
|
cid: cid,
|
||||||
p: p,
|
r: r,
|
||||||
fs: fs,
|
fs: fs,
|
||||||
shouldstop: shouldstop,
|
shouldstop: shouldstop,
|
||||||
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
|
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
|
||||||
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
|
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
|
||||||
q: make(chan *raftpb.Message, senderBufSize),
|
q: make(chan *raftpb.Message, senderBufSize),
|
||||||
}
|
}
|
||||||
s.wg.Add(connPerSender)
|
p.wg.Add(connPerSender)
|
||||||
for i := 0; i < connPerSender; i++ {
|
for i := 0; i < connPerSender; i++ {
|
||||||
go s.handle()
|
go p.handle()
|
||||||
}
|
}
|
||||||
return s
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
type sender struct {
|
type peer struct {
|
||||||
id types.ID
|
id types.ID
|
||||||
cid types.ID
|
cid types.ID
|
||||||
|
|
||||||
tr http.RoundTripper
|
tr http.RoundTripper
|
||||||
p Processor
|
r Raft
|
||||||
fs *stats.FollowerStats
|
fs *stats.FollowerStats
|
||||||
shouldstop chan struct{}
|
shouldstop chan struct{}
|
||||||
|
|
||||||
@ -115,201 +95,210 @@ type sender struct {
|
|||||||
paused bool
|
paused bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
|
// StartStreaming enables streaming in the peer using the given writer,
|
||||||
s.strmSrvMu.Lock()
|
// which provides a fast and efficient way to send appendEntry messages.
|
||||||
defer s.strmSrvMu.Unlock()
|
func (p *peer) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
|
||||||
if s.strmSrv != nil {
|
p.strmSrvMu.Lock()
|
||||||
|
defer p.strmSrvMu.Unlock()
|
||||||
|
if p.strmSrv != nil {
|
||||||
// ignore lower-term streaming request
|
// ignore lower-term streaming request
|
||||||
if term < s.strmSrv.term {
|
if term < p.strmSrv.term {
|
||||||
return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
|
return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, p.strmSrv.term)
|
||||||
}
|
}
|
||||||
// stop the existing one
|
// stop the existing one
|
||||||
s.strmSrv.stop()
|
p.strmSrv.stop()
|
||||||
s.strmSrv = nil
|
p.strmSrv = nil
|
||||||
}
|
}
|
||||||
s.strmSrv = startStreamServer(w, to, term, s.fs)
|
p.strmSrv = startStreamServer(w, to, term, p.fs)
|
||||||
return s.strmSrv.stopNotify(), nil
|
return p.strmSrv.stopNotify(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) Update(u string) {
|
func (p *peer) Update(u string) {
|
||||||
s.mu.Lock()
|
p.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
s.u = u
|
p.u = u
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send sends the data to the remote node. It is always non-blocking.
|
||||||
|
// It may be fail to send data if it returns nil error.
|
||||||
// TODO (xiangli): reasonable retry logic
|
// TODO (xiangli): reasonable retry logic
|
||||||
func (s *sender) Send(m raftpb.Message) error {
|
func (p *peer) Send(m raftpb.Message) error {
|
||||||
s.mu.RLock()
|
p.mu.RLock()
|
||||||
pause := s.paused
|
pause := p.paused
|
||||||
s.mu.RUnlock()
|
p.mu.RUnlock()
|
||||||
if pause {
|
if pause {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.maybeStopStream(m.Term)
|
p.maybeStopStream(m.Term)
|
||||||
if shouldInitStream(m) && !s.hasStreamClient() {
|
if shouldInitStream(m) && !p.hasStreamClient() {
|
||||||
s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
|
p.initStream(types.ID(m.From), types.ID(m.To), m.Term)
|
||||||
s.batcher.Reset(time.Now())
|
p.batcher.Reset(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
switch {
|
switch {
|
||||||
case isProposal(m):
|
case isProposal(m):
|
||||||
s.propBatcher.Batch(m)
|
p.propBatcher.Batch(m)
|
||||||
case canBatch(m) && s.hasStreamClient():
|
case canBatch(m) && p.hasStreamClient():
|
||||||
if !s.batcher.ShouldBatch(time.Now()) {
|
if !p.batcher.ShouldBatch(time.Now()) {
|
||||||
err = s.send(m)
|
err = p.send(m)
|
||||||
}
|
}
|
||||||
case canUseStream(m):
|
case canUseStream(m):
|
||||||
if ok := s.tryStream(m); !ok {
|
if ok := p.tryStream(m); !ok {
|
||||||
err = s.send(m)
|
err = p.send(m)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err = s.send(m)
|
err = p.send(m)
|
||||||
}
|
}
|
||||||
// send out batched MsgProp if needed
|
// send out batched MsgProp if needed
|
||||||
// TODO: it is triggered by all outcoming send now, and it needs
|
// TODO: it is triggered by all outcoming send now, and it needs
|
||||||
// more clear solution. Either use separate goroutine to trigger it
|
// more clear solution. Either use separate goroutine to trigger it
|
||||||
// or use streaming.
|
// or use streaming.
|
||||||
if !s.propBatcher.IsEmpty() {
|
if !p.propBatcher.IsEmpty() {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
if !s.propBatcher.ShouldBatch(t) {
|
if !p.propBatcher.ShouldBatch(t) {
|
||||||
s.send(s.propBatcher.Message)
|
p.send(p.propBatcher.Message)
|
||||||
s.propBatcher.Reset(t)
|
p.propBatcher.Reset(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) send(m raftpb.Message) error {
|
func (p *peer) send(m raftpb.Message) error {
|
||||||
// TODO: don't block. we should be able to have 1000s
|
// TODO: don't block. we should be able to have 1000s
|
||||||
// of messages out at a time.
|
// of messages out at a time.
|
||||||
select {
|
select {
|
||||||
case s.q <- &m:
|
case p.q <- &m:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
|
log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
|
||||||
m.Type, senderBufSize, s.u)
|
m.Type, senderBufSize, p.u)
|
||||||
return fmt.Errorf("reach maximal serving")
|
return fmt.Errorf("reach maximal serving")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) Stop() {
|
// Stop performs any necessary finalization and terminates the peer
|
||||||
close(s.q)
|
// elegantly.
|
||||||
s.wg.Wait()
|
func (p *peer) Stop() {
|
||||||
s.strmSrvMu.Lock()
|
close(p.q)
|
||||||
if s.strmSrv != nil {
|
p.wg.Wait()
|
||||||
s.strmSrv.stop()
|
p.strmSrvMu.Lock()
|
||||||
s.strmSrv = nil
|
if p.strmSrv != nil {
|
||||||
|
p.strmSrv.stop()
|
||||||
|
p.strmSrv = nil
|
||||||
}
|
}
|
||||||
s.strmSrvMu.Unlock()
|
p.strmSrvMu.Unlock()
|
||||||
if s.strmCln != nil {
|
if p.strmCln != nil {
|
||||||
s.strmCln.stop()
|
p.strmCln.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) Pause() {
|
// Pause pauses the peer. The peer will simply drops all incoming
|
||||||
s.mu.Lock()
|
// messages without retruning an error.
|
||||||
defer s.mu.Unlock()
|
func (p *peer) Pause() {
|
||||||
s.paused = true
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
p.paused = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) Resume() {
|
// Resume resumes a paused peer.
|
||||||
s.mu.Lock()
|
func (p *peer) Resume() {
|
||||||
defer s.mu.Unlock()
|
p.mu.Lock()
|
||||||
s.paused = false
|
defer p.mu.Unlock()
|
||||||
|
p.paused = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) maybeStopStream(term uint64) {
|
func (p *peer) maybeStopStream(term uint64) {
|
||||||
if s.strmCln != nil && term > s.strmCln.term {
|
if p.strmCln != nil && term > p.strmCln.term {
|
||||||
s.strmCln.stop()
|
p.strmCln.stop()
|
||||||
s.strmCln = nil
|
p.strmCln = nil
|
||||||
}
|
}
|
||||||
s.strmSrvMu.Lock()
|
p.strmSrvMu.Lock()
|
||||||
defer s.strmSrvMu.Unlock()
|
defer p.strmSrvMu.Unlock()
|
||||||
if s.strmSrv != nil && term > s.strmSrv.term {
|
if p.strmSrv != nil && term > p.strmSrv.term {
|
||||||
s.strmSrv.stop()
|
p.strmSrv.stop()
|
||||||
s.strmSrv = nil
|
p.strmSrv = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) hasStreamClient() bool {
|
func (p *peer) hasStreamClient() bool {
|
||||||
return s.strmCln != nil && !s.strmCln.isStopped()
|
return p.strmCln != nil && !p.strmCln.isStopped()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) initStream(from, to types.ID, term uint64) {
|
func (p *peer) initStream(from, to types.ID, term uint64) {
|
||||||
strmCln := newStreamClient(from, to, term, s.p)
|
strmCln := newStreamClient(from, to, term, p.r)
|
||||||
s.mu.Lock()
|
p.mu.Lock()
|
||||||
u := s.u
|
u := p.u
|
||||||
s.mu.Unlock()
|
p.mu.Unlock()
|
||||||
if err := strmCln.start(s.tr, u, s.cid); err != nil {
|
if err := strmCln.start(p.tr, u, p.cid); err != nil {
|
||||||
log.Printf("rafthttp: start stream client error: %v", err)
|
log.Printf("rafthttp: start stream client error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.strmCln = strmCln
|
p.strmCln = strmCln
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) tryStream(m raftpb.Message) bool {
|
func (p *peer) tryStream(m raftpb.Message) bool {
|
||||||
s.strmSrvMu.Lock()
|
p.strmSrvMu.Lock()
|
||||||
defer s.strmSrvMu.Unlock()
|
defer p.strmSrvMu.Unlock()
|
||||||
if s.strmSrv == nil || m.Term != s.strmSrv.term {
|
if p.strmSrv == nil || m.Term != p.strmSrv.term {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if err := s.strmSrv.send(m.Entries); err != nil {
|
if err := p.strmSrv.send(m.Entries); err != nil {
|
||||||
log.Printf("rafthttp: send stream message error: %v", err)
|
log.Printf("rafthttp: send stream message error: %v", err)
|
||||||
s.strmSrv.stop()
|
p.strmSrv.stop()
|
||||||
s.strmSrv = nil
|
p.strmSrv = nil
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sender) handle() {
|
func (p *peer) handle() {
|
||||||
defer s.wg.Done()
|
defer p.wg.Done()
|
||||||
for m := range s.q {
|
for m := range p.q {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := s.post(pbutil.MustMarshal(m))
|
err := p.post(pbutil.MustMarshal(m))
|
||||||
end := time.Now()
|
end := time.Now()
|
||||||
|
|
||||||
s.mu.Lock()
|
p.mu.Lock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.errored == nil || s.errored.Error() != err.Error() {
|
if p.errored == nil || p.errored.Error() != err.Error() {
|
||||||
log.Printf("sender: error posting to %s: %v", s.id, err)
|
log.Printf("sender: error posting to %s: %v", p.id, err)
|
||||||
s.errored = err
|
p.errored = err
|
||||||
}
|
}
|
||||||
if s.active {
|
if p.active {
|
||||||
log.Printf("sender: the connection with %s becomes inactive", s.id)
|
log.Printf("sender: the connection with %s becomes inactive", p.id)
|
||||||
s.active = false
|
p.active = false
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp {
|
||||||
s.fs.Fail()
|
p.fs.Fail()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if !s.active {
|
if !p.active {
|
||||||
log.Printf("sender: the connection with %s becomes active", s.id)
|
log.Printf("sender: the connection with %s becomes active", p.id)
|
||||||
s.active = true
|
p.active = true
|
||||||
s.errored = nil
|
p.errored = nil
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp {
|
||||||
s.fs.Succ(end.Sub(start))
|
p.fs.Succ(end.Sub(start))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
p.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// post POSTs a data payload to a url. Returns nil if the POST succeeds,
|
// post POSTs a data payload to a url. Returns nil if the POST succeeds,
|
||||||
// error on any failure.
|
// error on any failure.
|
||||||
func (s *sender) post(data []byte) error {
|
func (p *peer) post(data []byte) error {
|
||||||
s.mu.RLock()
|
p.mu.RLock()
|
||||||
req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
|
req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data))
|
||||||
s.mu.RUnlock()
|
p.mu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/protobuf")
|
req.Header.Set("Content-Type", "application/protobuf")
|
||||||
req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
|
req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
|
||||||
resp, err := s.tr.RoundTrip(req)
|
resp, err := p.tr.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -318,14 +307,14 @@ func (s *sender) post(data []byte) error {
|
|||||||
switch resp.StatusCode {
|
switch resp.StatusCode {
|
||||||
case http.StatusPreconditionFailed:
|
case http.StatusPreconditionFailed:
|
||||||
select {
|
select {
|
||||||
case s.shouldstop <- struct{}{}:
|
case p.shouldstop <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
|
log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
|
||||||
return nil
|
return nil
|
||||||
case http.StatusForbidden:
|
case http.StatusForbidden:
|
||||||
select {
|
select {
|
||||||
case s.shouldstop <- struct{}{}:
|
case p.shouldstop <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
log.Println("rafthttp: this member has been permanently removed from the cluster")
|
log.Println("rafthttp: this member has been permanently removed from the cluster")
|
@ -34,12 +34,12 @@ import (
|
|||||||
func TestSenderSend(t *testing.T) {
|
func TestSenderSend(t *testing.T) {
|
||||||
tr := &roundTripperRecorder{}
|
tr := &roundTripperRecorder{}
|
||||||
fs := &stats.FollowerStats{}
|
fs := &stats.FollowerStats{}
|
||||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||||
|
|
||||||
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
||||||
t.Fatalf("unexpect send error: %v", err)
|
t.Fatalf("unexpect send error: %v", err)
|
||||||
}
|
}
|
||||||
s.Stop()
|
p.Stop()
|
||||||
|
|
||||||
if tr.Request() == nil {
|
if tr.Request() == nil {
|
||||||
t.Errorf("sender fails to post the data")
|
t.Errorf("sender fails to post the data")
|
||||||
@ -54,12 +54,12 @@ func TestSenderSend(t *testing.T) {
|
|||||||
func TestSenderExceedMaximalServing(t *testing.T) {
|
func TestSenderExceedMaximalServing(t *testing.T) {
|
||||||
tr := newRoundTripperBlocker()
|
tr := newRoundTripperBlocker()
|
||||||
fs := &stats.FollowerStats{}
|
fs := &stats.FollowerStats{}
|
||||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||||
|
|
||||||
// keep the sender busy and make the buffer full
|
// keep the sender busy and make the buffer full
|
||||||
// nothing can go out as we block the sender
|
// nothing can go out as we block the sender
|
||||||
for i := 0; i < connPerSender+senderBufSize; i++ {
|
for i := 0; i < connPerSender+senderBufSize; i++ {
|
||||||
if err := s.Send(raftpb.Message{}); err != nil {
|
if err := p.Send(raftpb.Message{}); err != nil {
|
||||||
t.Errorf("send err = %v, want nil", err)
|
t.Errorf("send err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
// force the sender to grab data
|
// force the sender to grab data
|
||||||
@ -67,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// try to send a data when we are sure the buffer is full
|
// try to send a data when we are sure the buffer is full
|
||||||
if err := s.Send(raftpb.Message{}); err == nil {
|
if err := p.Send(raftpb.Message{}); err == nil {
|
||||||
t.Errorf("unexpect send success")
|
t.Errorf("unexpect send success")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,22 +76,22 @@ func TestSenderExceedMaximalServing(t *testing.T) {
|
|||||||
testutil.ForceGosched()
|
testutil.ForceGosched()
|
||||||
|
|
||||||
// It could send new data after previous ones succeed
|
// It could send new data after previous ones succeed
|
||||||
if err := s.Send(raftpb.Message{}); err != nil {
|
if err := p.Send(raftpb.Message{}); err != nil {
|
||||||
t.Errorf("send err = %v, want nil", err)
|
t.Errorf("send err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
s.Stop()
|
p.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSenderSendFailed tests that when send func meets the post error,
|
// TestSenderSendFailed tests that when send func meets the post error,
|
||||||
// it increases fail count in stats.
|
// it increases fail count in stats.
|
||||||
func TestSenderSendFailed(t *testing.T) {
|
func TestSenderSendFailed(t *testing.T) {
|
||||||
fs := &stats.FollowerStats{}
|
fs := &stats.FollowerStats{}
|
||||||
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||||
|
|
||||||
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
||||||
t.Fatalf("unexpect Send error: %v", err)
|
t.Fatalf("unexpect Send error: %v", err)
|
||||||
}
|
}
|
||||||
s.Stop()
|
p.Stop()
|
||||||
|
|
||||||
fs.Lock()
|
fs.Lock()
|
||||||
defer fs.Unlock()
|
defer fs.Unlock()
|
||||||
@ -102,11 +102,11 @@ func TestSenderSendFailed(t *testing.T) {
|
|||||||
|
|
||||||
func TestSenderPost(t *testing.T) {
|
func TestSenderPost(t *testing.T) {
|
||||||
tr := &roundTripperRecorder{}
|
tr := &roundTripperRecorder{}
|
||||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
|
p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
|
||||||
if err := s.post([]byte("some data")); err != nil {
|
if err := p.post([]byte("some data")); err != nil {
|
||||||
t.Fatalf("unexpect post error: %v", err)
|
t.Fatalf("unexpect post error: %v", err)
|
||||||
}
|
}
|
||||||
s.Stop()
|
p.Stop()
|
||||||
|
|
||||||
if g := tr.Request().Method; g != "POST" {
|
if g := tr.Request().Method; g != "POST" {
|
||||||
t.Errorf("method = %s, want %s", g, "POST")
|
t.Errorf("method = %s, want %s", g, "POST")
|
||||||
@ -145,9 +145,9 @@ func TestSenderPostBad(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
shouldstop := make(chan struct{})
|
shouldstop := make(chan struct{})
|
||||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||||
err := s.post([]byte("some data"))
|
err := p.post([]byte("some data"))
|
||||||
s.Stop()
|
p.Stop()
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("#%d: err = nil, want not nil", i)
|
t.Errorf("#%d: err = nil, want not nil", i)
|
||||||
@ -166,9 +166,9 @@ func TestSenderPostShouldStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
shouldstop := make(chan struct{}, 1)
|
shouldstop := make(chan struct{}, 1)
|
||||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||||
s.post([]byte("some data"))
|
p.post([]byte("some data"))
|
||||||
s.Stop()
|
p.Stop()
|
||||||
select {
|
select {
|
||||||
case <-shouldstop:
|
case <-shouldstop:
|
||||||
default:
|
default:
|
@ -1,153 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 CoreOS, Inc.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package rafthttp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"path"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/stats"
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
raftPrefix = "/raft"
|
|
||||||
)
|
|
||||||
|
|
||||||
type sendHub struct {
|
|
||||||
tr http.RoundTripper
|
|
||||||
cid types.ID
|
|
||||||
p Processor
|
|
||||||
ss *stats.ServerStats
|
|
||||||
ls *stats.LeaderStats
|
|
||||||
mu sync.RWMutex // protect the sender map
|
|
||||||
senders map[types.ID]Sender
|
|
||||||
shouldstop chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// newSendHub creates the default send hub used to transport raft messages
|
|
||||||
// to other members. The returned sendHub will update the given ServerStats and
|
|
||||||
// LeaderStats appropriately.
|
|
||||||
func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
|
|
||||||
return &sendHub{
|
|
||||||
tr: t,
|
|
||||||
cid: cid,
|
|
||||||
p: p,
|
|
||||||
ss: ss,
|
|
||||||
ls: ls,
|
|
||||||
senders: make(map[types.ID]Sender),
|
|
||||||
shouldstop: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) Sender(id types.ID) Sender {
|
|
||||||
h.mu.RLock()
|
|
||||||
defer h.mu.RUnlock()
|
|
||||||
return h.senders[id]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) Send(msgs []raftpb.Message) {
|
|
||||||
for _, m := range msgs {
|
|
||||||
// intentionally dropped message
|
|
||||||
if m.To == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
to := types.ID(m.To)
|
|
||||||
s, ok := h.senders[to]
|
|
||||||
if !ok {
|
|
||||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.Type == raftpb.MsgApp {
|
|
||||||
h.ss.SendAppendReq(m.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Send(m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) Stop() {
|
|
||||||
for _, s := range h.senders {
|
|
||||||
s.Stop()
|
|
||||||
}
|
|
||||||
if tr, ok := h.tr.(*http.Transport); ok {
|
|
||||||
tr.CloseIdleConnections()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) ShouldStopNotify() <-chan struct{} {
|
|
||||||
return h.shouldstop
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) AddPeer(id types.ID, urls []string) {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
if _, ok := h.senders[id]; ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: considering how to switch between all available peer urls
|
|
||||||
peerURL := urls[0]
|
|
||||||
u, err := url.Parse(peerURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("unexpect peer url %s", peerURL)
|
|
||||||
}
|
|
||||||
u.Path = path.Join(u.Path, raftPrefix)
|
|
||||||
fs := h.ls.Follower(id.String())
|
|
||||||
s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop)
|
|
||||||
h.senders[id] = s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) RemovePeer(id types.ID) {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
h.senders[id].Stop()
|
|
||||||
delete(h.senders, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) UpdatePeer(id types.ID, urls []string) {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
// TODO: return error or just panic?
|
|
||||||
if _, ok := h.senders[id]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
peerURL := urls[0]
|
|
||||||
u, err := url.Parse(peerURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("unexpect peer url %s", peerURL)
|
|
||||||
}
|
|
||||||
u.Path = path.Join(u.Path, raftPrefix)
|
|
||||||
h.senders[id].Update(u.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// for testing
|
|
||||||
func (h *sendHub) Pause() {
|
|
||||||
for _, s := range h.senders {
|
|
||||||
s.Pause()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *sendHub) Resume() {
|
|
||||||
for _, s := range h.senders {
|
|
||||||
s.Resume()
|
|
||||||
}
|
|
||||||
}
|
|
@ -107,18 +107,18 @@ type streamClient struct {
|
|||||||
id types.ID
|
id types.ID
|
||||||
to types.ID
|
to types.ID
|
||||||
term uint64
|
term uint64
|
||||||
p Processor
|
r Raft
|
||||||
|
|
||||||
closer io.Closer
|
closer io.Closer
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
|
func newStreamClient(id, to types.ID, term uint64, r Raft) *streamClient {
|
||||||
return &streamClient{
|
return &streamClient{
|
||||||
id: id,
|
id: id,
|
||||||
to: to,
|
to: to,
|
||||||
term: term,
|
term: term,
|
||||||
p: p,
|
r: r,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -199,7 +199,7 @@ func (s *streamClient) handle(r io.Reader) {
|
|||||||
Index: ents[0].Index - 1,
|
Index: ents[0].Index - 1,
|
||||||
Entries: ents,
|
Entries: ents,
|
||||||
}
|
}
|
||||||
if err := s.p.Process(context.TODO(), msg); err != nil {
|
if err := s.r.Process(context.TODO(), msg); err != nil {
|
||||||
log.Printf("rafthttp: process raft message error: %v", err)
|
log.Printf("rafthttp: process raft message error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,11 @@
|
|||||||
package rafthttp
|
package rafthttp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/stats"
|
"github.com/coreos/etcd/etcdserver/stats"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -10,7 +14,11 @@ import (
|
|||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Processor interface {
|
const (
|
||||||
|
raftPrefix = "/raft"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Raft interface {
|
||||||
Process(ctx context.Context, m raftpb.Message) error
|
Process(ctx context.Context, m raftpb.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,22 +36,119 @@ type Transport struct {
|
|||||||
RoundTripper http.RoundTripper
|
RoundTripper http.RoundTripper
|
||||||
ID types.ID
|
ID types.ID
|
||||||
ClusterID types.ID
|
ClusterID types.ID
|
||||||
Processor Processor
|
Raft Raft
|
||||||
ServerStats *stats.ServerStats
|
ServerStats *stats.ServerStats
|
||||||
LeaderStats *stats.LeaderStats
|
LeaderStats *stats.LeaderStats
|
||||||
|
|
||||||
*sendHub
|
mu sync.RWMutex // protect the peer map
|
||||||
handler http.Handler
|
peers map[types.ID]*peer // remote peers
|
||||||
|
shouldstop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) Start() {
|
func (t *Transport) Start() {
|
||||||
t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats)
|
t.peers = make(map[types.ID]*peer)
|
||||||
h := NewHandler(t.Processor, t.ClusterID)
|
t.shouldstop = make(chan struct{}, 1)
|
||||||
sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID)
|
}
|
||||||
|
|
||||||
|
func (t *Transport) Handler() http.Handler {
|
||||||
|
h := NewHandler(t.Raft, t.ClusterID)
|
||||||
|
sh := NewStreamHandler(t, t.ID, t.ClusterID)
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle(RaftPrefix, h)
|
mux.Handle(RaftPrefix, h)
|
||||||
mux.Handle(RaftStreamPrefix+"/", sh)
|
mux.Handle(RaftStreamPrefix+"/", sh)
|
||||||
t.handler = mux
|
return mux
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) Handler() http.Handler { return t.handler }
|
func (t *Transport) Peer(id types.ID) *peer {
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
return t.peers[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) Send(msgs []raftpb.Message) {
|
||||||
|
for _, m := range msgs {
|
||||||
|
// intentionally dropped message
|
||||||
|
if m.To == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
to := types.ID(m.To)
|
||||||
|
p, ok := t.peers[to]
|
||||||
|
if !ok {
|
||||||
|
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Type == raftpb.MsgApp {
|
||||||
|
t.ServerStats.SendAppendReq(m.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Send(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) Stop() {
|
||||||
|
for _, p := range t.peers {
|
||||||
|
p.Stop()
|
||||||
|
}
|
||||||
|
if tr, ok := t.RoundTripper.(*http.Transport); ok {
|
||||||
|
tr.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) ShouldStopNotify() <-chan struct{} {
|
||||||
|
return t.shouldstop
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) AddPeer(id types.ID, urls []string) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
if _, ok := t.peers[id]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// TODO: considering how to switch between all available peer urls
|
||||||
|
peerURL := urls[0]
|
||||||
|
u, err := url.Parse(peerURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("unexpect peer url %s", peerURL)
|
||||||
|
}
|
||||||
|
u.Path = path.Join(u.Path, raftPrefix)
|
||||||
|
fs := t.LeaderStats.Follower(id.String())
|
||||||
|
t.peers[id] = NewPeer(t.RoundTripper, u.String(), id, t.ClusterID,
|
||||||
|
t.Raft, fs, t.shouldstop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) RemovePeer(id types.ID) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
t.peers[id].Stop()
|
||||||
|
delete(t.peers, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) UpdatePeer(id types.ID, urls []string) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
// TODO: return error or just panic?
|
||||||
|
if _, ok := t.peers[id]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peerURL := urls[0]
|
||||||
|
u, err := url.Parse(peerURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("unexpect peer url %s", peerURL)
|
||||||
|
}
|
||||||
|
u.Path = path.Join(u.Path, raftPrefix)
|
||||||
|
t.peers[id].Update(u.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// for testing
|
||||||
|
func (t *Transport) Pause() {
|
||||||
|
for _, p := range t.peers {
|
||||||
|
p.Pause()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transport) Resume() {
|
||||||
|
for _, p := range t.peers {
|
||||||
|
p.Resume()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -27,50 +27,58 @@ import (
|
|||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSendHubAdd(t *testing.T) {
|
func TestTransportAdd(t *testing.T) {
|
||||||
ls := stats.NewLeaderStats("")
|
ls := stats.NewLeaderStats("")
|
||||||
h := newSendHub(nil, 0, nil, nil, ls)
|
tr := &Transport{
|
||||||
h.AddPeer(1, []string{"http://a"})
|
LeaderStats: ls,
|
||||||
|
}
|
||||||
|
tr.Start()
|
||||||
|
tr.AddPeer(1, []string{"http://a"})
|
||||||
|
|
||||||
if _, ok := ls.Followers["1"]; !ok {
|
if _, ok := ls.Followers["1"]; !ok {
|
||||||
t.Errorf("FollowerStats[1] is nil, want exists")
|
t.Errorf("FollowerStats[1] is nil, want exists")
|
||||||
}
|
}
|
||||||
s, ok := h.senders[types.ID(1)]
|
s, ok := tr.peers[types.ID(1)]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("senders[1] is nil, want exists")
|
t.Fatalf("senders[1] is nil, want exists")
|
||||||
}
|
}
|
||||||
|
|
||||||
h.AddPeer(1, []string{"http://a"})
|
// duplicate AddPeer is ignored
|
||||||
ns := h.senders[types.ID(1)]
|
tr.AddPeer(1, []string{"http://a"})
|
||||||
|
ns := tr.peers[types.ID(1)]
|
||||||
if s != ns {
|
if s != ns {
|
||||||
t.Errorf("sender = %v, want %v", ns, s)
|
t.Errorf("sender = %v, want %v", ns, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendHubRemove(t *testing.T) {
|
func TestTransportRemove(t *testing.T) {
|
||||||
ls := stats.NewLeaderStats("")
|
tr := &Transport{
|
||||||
h := newSendHub(nil, 0, nil, nil, ls)
|
LeaderStats: stats.NewLeaderStats(""),
|
||||||
h.AddPeer(1, []string{"http://a"})
|
}
|
||||||
h.RemovePeer(types.ID(1))
|
tr.Start()
|
||||||
|
tr.AddPeer(1, []string{"http://a"})
|
||||||
|
tr.RemovePeer(types.ID(1))
|
||||||
|
|
||||||
if _, ok := h.senders[types.ID(1)]; ok {
|
if _, ok := tr.peers[types.ID(1)]; ok {
|
||||||
t.Fatalf("senders[1] exists, want removed")
|
t.Fatalf("senders[1] exists, want removed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendHubShouldStop(t *testing.T) {
|
func TestTransportShouldStop(t *testing.T) {
|
||||||
tr := newRespRoundTripper(http.StatusForbidden, nil)
|
tr := &Transport{
|
||||||
ls := stats.NewLeaderStats("")
|
RoundTripper: newRespRoundTripper(http.StatusForbidden, nil),
|
||||||
h := newSendHub(tr, 0, nil, nil, ls)
|
LeaderStats: stats.NewLeaderStats(""),
|
||||||
h.AddPeer(1, []string{"http://a"})
|
}
|
||||||
|
tr.Start()
|
||||||
|
tr.AddPeer(1, []string{"http://a"})
|
||||||
|
|
||||||
shouldstop := h.ShouldStopNotify()
|
shouldstop := tr.ShouldStopNotify()
|
||||||
select {
|
select {
|
||||||
case <-shouldstop:
|
case <-shouldstop:
|
||||||
t.Fatalf("received unexpected shouldstop notification")
|
t.Fatalf("received unexpected shouldstop notification")
|
||||||
case <-time.After(10 * time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
}
|
}
|
||||||
h.senders[1].Send(raftpb.Message{})
|
tr.peers[1].Send(raftpb.Message{})
|
||||||
|
|
||||||
testutil.ForceGosched()
|
testutil.ForceGosched()
|
||||||
select {
|
select {
|
Loading…
x
Reference in New Issue
Block a user