rafthttp: various clean up

This commit is contained in:
Xiang Li 2015-10-17 23:30:30 -07:00
parent 7d3af5e15f
commit 427a154aae
9 changed files with 105 additions and 77 deletions

View File

@ -31,7 +31,7 @@ import (
const (
// connReadLimitByte limits the number of bytes
// a single read can read out.
//
//
// 64KB should be large enough for not causing
// throughput bottleneck as well as small enough
// for not causing a read timeout.
@ -61,7 +61,7 @@ type pipelineHandler struct {
cid types.ID
}
// newPipelineHandler returns a handler for handling raft messages
// newPipelineHandler returns a handler for handling raft messages
// from pipeline for RaftPrefix.
//
// The handler reads out the raft message from request body,

View File

@ -362,9 +362,9 @@ func newFakePeer() *fakePeer {
}
}
func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls }
func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls }
func (pr *fakePeer) setTerm(term uint64) { pr.term = term }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
func (pr *fakePeer) Stop() {}
func (pr *fakePeer) stop() {}

View File

@ -53,13 +53,13 @@ const (
)
type Peer interface {
// Send sends the message to the remote peer. The function is non-blocking
// send sends the message to the remote peer. The function is non-blocking
// and has no promise that the message will be received by the remote.
// When it fails to send message out, it will report the status to underlying
// raft.
Send(m raftpb.Message)
// Update updates the urls of remote peer.
Update(urls types.URLs)
send(m raftpb.Message)
// update updates the urls of remote peer.
update(urls types.URLs)
// setTerm sets the term of ongoing communication.
setTerm(term uint64)
// attachOutgoingConn attachs the outgoing connection to the peer for
@ -70,9 +70,9 @@ type Peer interface {
// activeSince returns the time that the connection with the
// peer becomes active.
activeSince() time.Time
// Stop performs any necessary finalization and terminates the peer
// stop performs any necessary finalization and terminates the peer
// elegantly.
Stop()
stop()
}
// peer is the representative of a remote raft node. Local raft node sends
@ -208,14 +208,14 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
return p
}
func (p *peer) Send(m raftpb.Message) {
func (p *peer) send(m raftpb.Message) {
select {
case p.sendc <- m:
case <-p.done:
}
}
func (p *peer) Update(urls types.URLs) {
func (p *peer) update(urls types.URLs) {
select {
case p.newURLsC <- urls:
case <-p.done:
@ -258,7 +258,7 @@ func (p *peer) Resume() {
}
}
func (p *peer) Stop() {
func (p *peer) stop() {
close(p.stopc)
<-p.done
}

View File

@ -96,8 +96,9 @@ func (p *pipeline) handle() {
end := time.Now()
if err != nil {
reportSentFailure(pipelineMsg, m)
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
reportSentFailure(pipelineMsg, m)
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
@ -105,16 +106,17 @@ func (p *pipeline) handle() {
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
} else {
p.status.activate()
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
}
reportSentDuration(pipelineMsg, m, time.Since(start))
return
}
p.status.activate()
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
}
reportSentDuration(pipelineMsg, m, time.Since(start))
}
}

View File

@ -37,7 +37,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID,
}
}
func (g *remote) Send(m raftpb.Message) {
func (g *remote) send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
@ -49,6 +49,6 @@ func (g *remote) Send(m raftpb.Message) {
}
}
func (g *remote) Stop() {
func (g *remote) stop() {
g.pipeline.stop()
}

View File

@ -0,0 +1,45 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"io"
)
// snapshotStore is the store of snapshot. Caller could put one
// snapshot into the store, and get it later.
// snapshotStore stores at most one snapshot at a time, or it panics.
type snapshotStore struct {
rc io.ReadCloser
// index of the stored snapshot
// index is 0 if and only if there is no snapshot stored.
index uint64
}
func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
if s.index != 0 {
plog.Panicf("unexpected put when there is one snapshot stored")
}
s.rc, s.index = rc, index
}
func (s *snapshotStore) get(index uint64) io.ReadCloser {
if s.index == index {
// set index to 0 to indicate no snapshot stored
s.index = 0
return s.rc
}
return nil
}

View File

@ -346,27 +346,32 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
for {
m, err := dec.decode()
switch {
case err != nil:
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
case isLinkHeartbeatMessage(m):
// do nothing for linkHeartbeatMessage
}
if isLinkHeartbeatMessage(m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
default:
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
default:
if cr.status.isActive() {
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
} else {
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
}
if cr.status.isActive() {
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
} else {
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
}
}
}

View File

@ -183,13 +183,15 @@ func (t *Transport) maybeUpdatePeersTerm(term uint64) {
func (t *Transport) Send(msgs []raftpb.Message) {
for _, m := range msgs {
// intentionally dropped message
if m.To == 0 {
// ignore intentionally dropped message
continue
}
to := types.ID(m.To)
if m.Type != raftpb.MsgProp { // proposal message does not have a valid term
// update terms for all the peers
// ignore MsgProp since it does not have a valid term
if m.Type != raftpb.MsgProp {
t.maybeUpdatePeersTerm(m.Term)
}
@ -198,13 +200,13 @@ func (t *Transport) Send(msgs []raftpb.Message) {
if m.Type == raftpb.MsgApp {
t.ServerStats.SendAppendReq(m.Size())
}
p.Send(m)
p.send(m)
continue
}
g, ok := t.remotes[to]
if ok {
g.Send(m)
g.send(m)
continue
}
@ -214,10 +216,10 @@ func (t *Transport) Send(msgs []raftpb.Message) {
func (t *Transport) Stop() {
for _, r := range t.remotes {
r.Stop()
r.stop()
}
for _, p := range t.peers {
p.Stop()
p.stop()
}
t.prober.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
@ -273,7 +275,7 @@ func (t *Transport) RemoveAllPeers() {
// the caller of this function must have the peers mutex.
func (t *Transport) removePeer(id types.ID) {
if peer, ok := t.peers[id]; ok {
peer.Stop()
peer.stop()
} else {
plog.Panicf("unexpected removal of unknown peer '%d'", id)
}
@ -293,7 +295,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
if err != nil {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.peers[id].Update(urls)
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
@ -329,29 +331,3 @@ func (t *Transport) Resume() {
p.(Pausable).Resume()
}
}
// snapshotStore is the store of snapshot. Caller could put one
// snapshot into the store, and get it later.
// snapshotStore stores at most one snapshot at a time, or it panics.
type snapshotStore struct {
rc io.ReadCloser
// index of the stored snapshot
// index is 0 if and only if there is no snapshot stored.
index uint64
}
func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
if s.index != 0 {
plog.Panicf("unexpected put when there is one snapshot stored")
}
s.rc, s.index = rc, index
}
func (s *snapshotStore) get(index uint64) io.ReadCloser {
if s.index == index {
// set index to 0 to indicate no snapshot stored
s.index = 0
return s.rc
}
return nil
}

View File

@ -149,7 +149,7 @@ func TestTransportErrorc(t *testing.T) {
t.Fatalf("received unexpected from errorc")
case <-time.After(10 * time.Millisecond):
}
tr.peers[1].Send(raftpb.Message{})
tr.peers[1].send(raftpb.Message{})
testutil.WaitSchedule()
select {