etcdserver/api/rafthttp: rename to "pipelineProber"

Preliminary work to add prober to "streamRt"

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee
2018-08-17 16:17:17 -07:00
parent 6976819792
commit 47cff4dfe5
2 changed files with 24 additions and 24 deletions

View File

@@ -130,7 +130,7 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
pipelineProber probing.Prober
}
func (t *Transport) Start() error {
@@ -145,7 +145,7 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
// If client didn't provide dial retry frequency, use the default
// (100ms backoff between attempts to create a new stream),
@@ -221,7 +221,7 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
@@ -317,7 +317,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.Logger, t.prober, id.String(), us)
addPeerToProber(t.Logger, t.pipelineProber, id.String(), us)
if t.Logger != nil {
t.Logger.Info(
@@ -358,7 +358,7 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
if t.Logger != nil {
t.Logger.Info(
@@ -388,8 +388,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.Logger, t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.Logger, t.pipelineProber, id.String(), us)
if t.Logger != nil {
t.Logger.Info(

View File

@@ -97,10 +97,10 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
@@ -125,10 +125,10 @@ func TestTransportAdd(t *testing.T) {
func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
@@ -142,8 +142,8 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
@@ -156,13 +156,13 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()