rafthttp: support multiple peer urls

This commit is contained in:
Yicheng Qin 2015-02-27 07:54:06 -08:00
parent c3f32504ec
commit 0fe9861197
10 changed files with 212 additions and 93 deletions

View File

@ -333,7 +333,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
type fakePeer struct {
msgs []raftpb.Message
u string
urls types.URLs
connc chan *outgoingConn
}
@ -344,6 +344,6 @@ func newFakePeer() *fakePeer {
}
func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
func (pr *fakePeer) Update(u string) { pr.u = u }
func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) Stop() {}

View File

@ -59,7 +59,7 @@ type Peer interface {
// raft.
Send(m raftpb.Message)
// Update updates the urls of remote peer.
Update(u string)
Update(urls types.URLs)
// attachOutgoingConn attachs the outgoing connection to the peer for
// stream usage. After the call, the ownership of the outgoing
// connection hands over to the peer. The peer will close the connection
@ -91,7 +91,7 @@ type peer struct {
sendc chan raftpb.Message
recvc chan raftpb.Message
newURLc chan string
newURLc chan types.URLs
// for testing
pausec chan struct{}
@ -101,15 +101,16 @@ type peer struct {
done chan struct{}
}
func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
picker := newURLPicker(urls)
p := &peer{
id: to,
msgAppWriter: startStreamWriter(fs, r),
writer: startStreamWriter(fs, r),
pipeline: newPipeline(tr, u, to, cid, fs, r, errorc),
pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc),
sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize),
newURLc: make(chan string),
newURLc: make(chan types.URLs),
pausec: make(chan struct{}),
resumec: make(chan struct{}),
stopc: make(chan struct{}),
@ -117,8 +118,8 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft,
}
go func() {
var paused bool
msgAppReader := startStreamReader(tr, u, streamTypeMsgApp, local, to, cid, p.recvc)
reader := startStreamReader(tr, u, streamTypeMessage, local, to, cid, p.recvc)
msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc)
reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc)
for {
select {
case m := <-p.sendc:
@ -139,10 +140,8 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft,
if err := r.Process(context.TODO(), mm); err != nil {
log.Printf("peer: process raft message error: %v", err)
}
case u := <-p.newURLc:
msgAppReader.update(u)
reader.update(u)
p.pipeline.update(u)
case urls := <-p.newURLc:
picker.update(urls)
case <-p.pausec:
paused = true
case <-p.resumec:
@ -170,9 +169,9 @@ func (p *peer) Send(m raftpb.Message) {
}
}
func (p *peer) Update(u string) {
func (p *peer) Update(urls types.URLs) {
select {
case p.newURLc <- u:
case p.newURLc <- urls:
case <-p.done:
log.Panicf("peer: unexpected stopped")
}

View File

@ -42,9 +42,8 @@ type pipeline struct {
id types.ID
cid types.ID
tr http.RoundTripper
// the url this pipeline sends to
u string
tr http.RoundTripper
picker *urlPicker
fs *stats.FollowerStats
r Raft
errorc chan error
@ -59,12 +58,12 @@ type pipeline struct {
errored error
}
func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
func newPipeline(tr http.RoundTripper, picker *urlPicker, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
p := &pipeline{
id: id,
cid: cid,
tr: tr,
u: u,
picker: picker,
fs: fs,
r: r,
errorc: errorc,
@ -78,8 +77,6 @@ func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.Fol
return p
}
func (p *pipeline) update(u string) { p.u = u }
func (p *pipeline) stop() {
close(p.msgc)
p.wg.Wait()
@ -130,16 +127,19 @@ func (p *pipeline) handle() {
// post POSTs a data payload to a url. Returns nil if the POST succeeds,
// error on any failure.
func (p *pipeline) post(data []byte) error {
p.Lock()
req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data))
p.Unlock()
u := p.picker.pick()
uu := u
uu.Path = RaftPrefix
req, err := http.NewRequest("POST", uu.String(), bytes.NewBuffer(data))
if err != nil {
p.picker.unreachable(u)
return err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
resp, err := p.tr.RoundTrip(req)
if err != nil {
p.picker.unreachable(u)
return err
}
resp.Body.Close()

View File

@ -31,8 +31,9 @@ import (
// and increase success count in stats.
func TestPipelineSend(t *testing.T) {
tr := &roundTripperRecorder{}
picker := mustNewURLPicker(t, []string{"http://localhost:7001"})
fs := &stats.FollowerStats{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
p.stop()
@ -49,8 +50,9 @@ func TestPipelineSend(t *testing.T) {
func TestPipelineExceedMaximalServing(t *testing.T) {
tr := newRoundTripperBlocker()
picker := mustNewURLPicker(t, []string{"http://localhost:7001"})
fs := &stats.FollowerStats{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
// keep the sender busy and make the buffer full
// nothing can go out as we block the sender
@ -88,8 +90,9 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
// TestPipelineSendFailed tests that when send func meets the post error,
// it increases fail count in stats.
func TestPipelineSendFailed(t *testing.T) {
picker := mustNewURLPicker(t, []string{"http://localhost:7001"})
fs := &stats.FollowerStats{}
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
p.stop()
@ -103,7 +106,8 @@ func TestPipelineSendFailed(t *testing.T) {
func TestPipelinePost(t *testing.T) {
tr := &roundTripperRecorder{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
picker := mustNewURLPicker(t, []string{"http://localhost:7001"})
p := newPipeline(tr, picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
if err := p.post([]byte("some data")); err != nil {
t.Fatalf("unexpect post error: %v", err)
}
@ -112,8 +116,8 @@ func TestPipelinePost(t *testing.T) {
if g := tr.Request().Method; g != "POST" {
t.Errorf("method = %s, want %s", g, "POST")
}
if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
if g := tr.Request().URL.String(); g != "http://localhost:7001/raft" {
t.Errorf("url = %s, want %s", g, "http://localhost:7001/raft")
}
if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
t.Errorf("content type = %s, want %s", g, "application/protobuf")
@ -136,16 +140,15 @@ func TestPipelinePostBad(t *testing.T) {
code int
err error
}{
// bad url
{":bad url", http.StatusNoContent, nil},
// RoundTrip returns error
{"http://10.0.0.1", 0, errors.New("blah")},
{"http://localhost:7001", 0, errors.New("blah")},
// unexpected response status code
{"http://10.0.0.1", http.StatusOK, nil},
{"http://10.0.0.1", http.StatusCreated, nil},
{"http://localhost:7001", http.StatusOK, nil},
{"http://localhost:7001", http.StatusCreated, nil},
}
for i, tt := range tests {
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
picker := mustNewURLPicker(t, []string{tt.u})
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
err := p.post([]byte("some data"))
p.stop()
@ -161,12 +164,13 @@ func TestPipelinePostErrorc(t *testing.T) {
code int
err error
}{
{"http://10.0.0.1", http.StatusForbidden, nil},
{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
{"http://localhost:7001", http.StatusForbidden, nil},
{"http://localhost:7001", http.StatusPreconditionFailed, nil},
}
for i, tt := range tests {
picker := mustNewURLPicker(t, []string{tt.u})
errorc := make(chan error, 1)
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
p.post([]byte("some data"))
p.stop()
select {

View File

@ -20,7 +20,6 @@ import (
"log"
"net"
"net/http"
"net/url"
"path"
"strconv"
"sync"
@ -191,7 +190,7 @@ func (cw *streamWriter) stop() {
// endponit and reads messages from the response body returned.
type streamReader struct {
tr http.RoundTripper
u string
picker *urlPicker
t streamType
from, to types.ID
cid types.ID
@ -205,17 +204,17 @@ type streamReader struct {
done chan struct{}
}
func startStreamReader(tr http.RoundTripper, u string, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader {
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader {
r := &streamReader{
tr: tr,
u: u,
t: t,
from: from,
to: to,
cid: cid,
recvc: recvc,
stopc: make(chan struct{}),
done: make(chan struct{}),
tr: tr,
picker: picker,
t: t,
from: from,
to: to,
cid: cid,
recvc: recvc,
stopc: make(chan struct{}),
done: make(chan struct{}),
}
go r.run()
return r
@ -278,13 +277,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
}
}
func (cr *streamReader) update(u string) {
cr.mu.Lock()
defer cr.mu.Unlock()
cr.u = u
cr.resetCloser()
}
func (cr *streamReader) updateMsgAppTerm(term uint64) {
cr.mu.Lock()
defer cr.mu.Unlock()
@ -312,15 +304,12 @@ func (cr *streamReader) isWorking() bool {
}
func (cr *streamReader) dial() (io.ReadCloser, error) {
u := cr.picker.pick()
cr.mu.Lock()
u := cr.u
term := cr.msgAppTerm
cr.mu.Unlock()
uu, err := url.Parse(u)
if err != nil {
return nil, fmt.Errorf("parse url %s error: %v", u, err)
}
uu := u
switch cr.t {
case streamTypeMsgApp:
// for backward compatibility of v2.0
@ -332,6 +321,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
}
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("new request to %s error: %v", u, err)
}
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
@ -344,6 +334,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
cr.mu.Unlock()
resp, err := cr.tr.RoundTrip(req)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
}
if resp.StatusCode != http.StatusOK {

View File

@ -84,7 +84,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
tr := &roundTripperRecorder{}
sr := &streamReader{
tr: tr,
u: "http://localhost:7001",
picker: mustNewURLPicker(t, []string{"http://localhost:7001"}),
t: tt,
from: types.ID(1),
to: types.ID(2),
@ -136,12 +136,12 @@ func TestStreamReaderDialResult(t *testing.T) {
for i, tt := range tests {
tr := newRespRoundTripper(tt.code, tt.err)
sr := &streamReader{
tr: tr,
u: "http://localhost:7001",
t: streamTypeMessage,
from: types.ID(1),
to: types.ID(2),
cid: types.ID(1),
tr: tr,
picker: mustNewURLPicker(t, []string{"http://localhost:7001"}),
t: streamTypeMessage,
from: types.ID(1),
to: types.ID(2),
cid: types.ID(1),
}
_, err := sr.dial()
@ -188,7 +188,8 @@ func TestStream(t *testing.T) {
h.sw = sw
recvc := make(chan raftpb.Message)
sr := startStreamReader(&http.Transport{}, srv.URL, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc)
picker := mustNewURLPicker(t, []string{srv.URL})
sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc)
defer sr.stop()
if tt.t == streamTypeMsgApp {
sr.updateMsgAppTerm(tt.term)

View File

@ -17,8 +17,6 @@ package rafthttp
import (
"log"
"net/http"
"net/url"
"path"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@ -135,21 +133,18 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddPeer(id types.ID, urls []string) {
func (t *transport) AddPeer(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.peers[id]; ok {
return
}
// TODO: considering how to switch between all available peer urls
peerURL := urls[0]
u, err := url.Parse(peerURL)
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
log.Panicf("newURLs %+v should never fail: %+v", us, err)
}
u.Path = path.Join(u.Path, RaftPrefix)
fs := t.leaderStats.Follower(id.String())
t.peers[id] = startPeer(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, fs, t.errorc)
t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc)
}
func (t *transport) RemovePeer(id types.ID) {
@ -177,20 +172,18 @@ func (t *transport) removePeer(id types.ID) {
delete(t.leaderStats.Followers, id.String())
}
func (t *transport) UpdatePeer(id types.ID, urls []string) {
func (t *transport) UpdatePeer(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
// TODO: return error or just panic?
if _, ok := t.peers[id]; !ok {
return
}
peerURL := urls[0]
u, err := url.Parse(peerURL)
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
log.Panicf("newURLs %+v should never fail: %+v", us, err)
}
u.Path = path.Join(u.Path, RaftPrefix)
t.peers[id].Update(u.String())
t.peers[id].Update(urls)
}
type Pausable interface {

View File

@ -72,7 +72,7 @@ func TestTransportAdd(t *testing.T) {
leaderStats: ls,
peers: make(map[types.ID]Peer),
}
tr.AddPeer(1, []string{"http://a"})
tr.AddPeer(1, []string{"http://localhost:7001"})
defer tr.Stop()
if _, ok := ls.Followers["1"]; !ok {
@ -84,7 +84,7 @@ func TestTransportAdd(t *testing.T) {
}
// duplicate AddPeer is ignored
tr.AddPeer(1, []string{"http://a"})
tr.AddPeer(1, []string{"http://localhost:7001"})
ns := tr.peers[types.ID(1)]
if s != ns {
t.Errorf("sender = %v, want %v", ns, s)
@ -97,7 +97,7 @@ func TestTransportRemove(t *testing.T) {
leaderStats: stats.NewLeaderStats(""),
peers: make(map[types.ID]Peer),
}
tr.AddPeer(1, []string{"http://a"})
tr.AddPeer(1, []string{"http://localhost:7001"})
tr.RemovePeer(types.ID(1))
defer tr.Stop()
@ -113,8 +113,9 @@ func TestTransportUpdate(t *testing.T) {
}
u := "http://localhost:7001"
tr.UpdatePeer(types.ID(1), []string{u})
if w := "http://localhost:7001/raft"; peer.u != w {
t.Errorf("url = %s, want %s", peer.u, w)
wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:7001"}))
if !reflect.DeepEqual(peer.urls, wurls) {
t.Errorf("urls = %+v, want %+v", peer.urls, wurls)
}
}
@ -126,7 +127,7 @@ func TestTransportErrorc(t *testing.T) {
peers: make(map[types.ID]Peer),
errorc: errorc,
}
tr.AddPeer(1, []string{"http://a"})
tr.AddPeer(1, []string{"http://localhost:7001"})
defer tr.Stop()
select {

57
rafthttp/urlpick.go Normal file
View File

@ -0,0 +1,57 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/url"
"sync"
"github.com/coreos/etcd/pkg/types"
)
type urlPicker struct {
urls types.URLs
mu sync.Mutex
picked int
}
func newURLPicker(urls types.URLs) *urlPicker {
return &urlPicker{
urls: urls,
}
}
func (p *urlPicker) update(urls types.URLs) {
p.mu.Lock()
defer p.mu.Unlock()
p.urls = urls
p.picked = 0
}
func (p *urlPicker) pick() url.URL {
p.mu.Lock()
defer p.mu.Unlock()
return p.urls[p.picked]
}
// unreachable notices the picker that the given url is unreachable,
// and it should use other possible urls.
func (p *urlPicker) unreachable(u url.URL) {
p.mu.Lock()
defer p.mu.Unlock()
if u == p.urls[p.picked] {
p.picked = (p.picked + 1) % len(p.urls)
}
}

73
rafthttp/urlpick_test.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/url"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
// TestURLPickerPickTwice tests that pick returns a possible url,
// and always returns the same one.
func TestURLPickerPick(t *testing.T) {
picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"})
u := picker.pick()
urlmap := map[url.URL]bool{
url.URL{Scheme: "http", Host: "127.0.0.1:2380"}: true,
url.URL{Scheme: "http", Host: "127.0.0.1:7001"}: true,
}
if !urlmap[u] {
t.Errorf("url picked = %+v, want a possible url in %+v", u, urlmap)
}
// pick out the same url when calling pick again
uu := picker.pick()
if u != uu {
t.Errorf("url picked = %+v, want %+v", uu, u)
}
}
func TestURLPickerUpdate(t *testing.T) {
picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"})
picker.update(testutil.MustNewURLs(t, []string{"http://localhost:2380", "http://localhost:7001"}))
u := picker.pick()
urlmap := map[url.URL]bool{
url.URL{Scheme: "http", Host: "localhost:2380"}: true,
url.URL{Scheme: "http", Host: "localhost:7001"}: true,
}
if !urlmap[u] {
t.Errorf("url picked = %+v, want a possible url in %+v", u, urlmap)
}
}
func TestURLPickerUnreachable(t *testing.T) {
picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"})
u := picker.pick()
picker.unreachable(u)
uu := picker.pick()
if u == uu {
t.Errorf("url picked = %+v, want other possible urls", uu)
}
}
func mustNewURLPicker(t *testing.T, us []string) *urlPicker {
urls := testutil.MustNewURLs(t, us)
return newURLPicker(urls)
}