From 0fe98611972d39447fe12325b1d9109b45a64e33 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 27 Feb 2015 07:54:06 -0800 Subject: [PATCH] rafthttp: support multiple peer urls --- rafthttp/http_test.go | 4 +-- rafthttp/peer.go | 25 +++++++------ rafthttp/pipeline.go | 20 +++++------ rafthttp/pipeline_test.go | 34 ++++++++++-------- rafthttp/stream.go | 39 ++++++++------------ rafthttp/stream_test.go | 17 ++++----- rafthttp/transport.go | 23 +++++------- rafthttp/transport_test.go | 13 +++---- rafthttp/urlpick.go | 57 +++++++++++++++++++++++++++++ rafthttp/urlpick_test.go | 73 ++++++++++++++++++++++++++++++++++++++ 10 files changed, 212 insertions(+), 93 deletions(-) create mode 100644 rafthttp/urlpick.go create mode 100644 rafthttp/urlpick_test.go diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index bb4112a64..b152431ef 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -333,7 +333,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message - u string + urls types.URLs connc chan *outgoingConn } @@ -344,6 +344,6 @@ func newFakePeer() *fakePeer { } func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } -func (pr *fakePeer) Update(u string) { pr.u = u } +func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5da67d491..a0304c3fc 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -59,7 +59,7 @@ type Peer interface { // raft. Send(m raftpb.Message) // Update updates the urls of remote peer. - Update(u string) + Update(urls types.URLs) // attachOutgoingConn attachs the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection @@ -91,7 +91,7 @@ type peer struct { sendc chan raftpb.Message recvc chan raftpb.Message - newURLc chan string + newURLc chan types.URLs // for testing pausec chan struct{} @@ -101,15 +101,16 @@ type peer struct { done chan struct{} } -func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { +func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { + picker := newURLPicker(urls) p := &peer{ id: to, msgAppWriter: startStreamWriter(fs, r), writer: startStreamWriter(fs, r), - pipeline: newPipeline(tr, u, to, cid, fs, r, errorc), + pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), - newURLc: make(chan string), + newURLc: make(chan types.URLs), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), @@ -117,8 +118,8 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, } go func() { var paused bool - msgAppReader := startStreamReader(tr, u, streamTypeMsgApp, local, to, cid, p.recvc) - reader := startStreamReader(tr, u, streamTypeMessage, local, to, cid, p.recvc) + msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc) + reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc) for { select { case m := <-p.sendc: @@ -139,10 +140,8 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, if err := r.Process(context.TODO(), mm); err != nil { log.Printf("peer: process raft message error: %v", err) } - case u := <-p.newURLc: - msgAppReader.update(u) - reader.update(u) - p.pipeline.update(u) + case urls := <-p.newURLc: + picker.update(urls) case <-p.pausec: paused = true case <-p.resumec: @@ -170,9 +169,9 @@ func (p *peer) Send(m raftpb.Message) { } } -func (p *peer) Update(u string) { +func (p *peer) Update(urls types.URLs) { select { - case p.newURLc <- u: + case p.newURLc <- urls: case <-p.done: log.Panicf("peer: unexpected stopped") } diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index ce3253122..78a6de7ba 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -42,9 +42,8 @@ type pipeline struct { id types.ID cid types.ID - tr http.RoundTripper - // the url this pipeline sends to - u string + tr http.RoundTripper + picker *urlPicker fs *stats.FollowerStats r Raft errorc chan error @@ -59,12 +58,12 @@ type pipeline struct { errored error } -func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { +func newPipeline(tr http.RoundTripper, picker *urlPicker, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { p := &pipeline{ id: id, cid: cid, tr: tr, - u: u, + picker: picker, fs: fs, r: r, errorc: errorc, @@ -78,8 +77,6 @@ func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.Fol return p } -func (p *pipeline) update(u string) { p.u = u } - func (p *pipeline) stop() { close(p.msgc) p.wg.Wait() @@ -130,16 +127,19 @@ func (p *pipeline) handle() { // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (p *pipeline) post(data []byte) error { - p.Lock() - req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data)) - p.Unlock() + u := p.picker.pick() + uu := u + uu.Path = RaftPrefix + req, err := http.NewRequest("POST", uu.String(), bytes.NewBuffer(data)) if err != nil { + p.picker.unreachable(u) return err } req.Header.Set("Content-Type", "application/protobuf") req.Header.Set("X-Etcd-Cluster-ID", p.cid.String()) resp, err := p.tr.RoundTrip(req) if err != nil { + p.picker.unreachable(u) return err } resp.Body.Close() diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index b99c98094..5e87f2e0d 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -31,8 +31,9 @@ import ( // and increase success count in stats. func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} + picker := mustNewURLPicker(t, []string{"http://localhost:7001"}) fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -49,8 +50,9 @@ func TestPipelineSend(t *testing.T) { func TestPipelineExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() + picker := mustNewURLPicker(t, []string{"http://localhost:7001"}) fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -88,8 +90,9 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // TestPipelineSendFailed tests that when send func meets the post error, // it increases fail count in stats. func TestPipelineSendFailed(t *testing.T) { + picker := mustNewURLPicker(t, []string{"http://localhost:7001"}) fs := &stats.FollowerStats{} - p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -103,7 +106,8 @@ func TestPipelineSendFailed(t *testing.T) { func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &fakeRaft{}, nil) + picker := mustNewURLPicker(t, []string{"http://localhost:7001"}) + p := newPipeline(tr, picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, nil) if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -112,8 +116,8 @@ func TestPipelinePost(t *testing.T) { if g := tr.Request().Method; g != "POST" { t.Errorf("method = %s, want %s", g, "POST") } - if g := tr.Request().URL.String(); g != "http://10.0.0.1" { - t.Errorf("url = %s, want %s", g, "http://10.0.0.1") + if g := tr.Request().URL.String(); g != "http://localhost:7001/raft" { + t.Errorf("url = %s, want %s", g, "http://localhost:7001/raft") } if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" { t.Errorf("content type = %s, want %s", g, "application/protobuf") @@ -136,16 +140,15 @@ func TestPipelinePostBad(t *testing.T) { code int err error }{ - // bad url - {":bad url", http.StatusNoContent, nil}, // RoundTrip returns error - {"http://10.0.0.1", 0, errors.New("blah")}, + {"http://localhost:7001", 0, errors.New("blah")}, // unexpected response status code - {"http://10.0.0.1", http.StatusOK, nil}, - {"http://10.0.0.1", http.StatusCreated, nil}, + {"http://localhost:7001", http.StatusOK, nil}, + {"http://localhost:7001", http.StatusCreated, nil}, } for i, tt := range tests { - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error)) + picker := mustNewURLPicker(t, []string{tt.u}) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error)) err := p.post([]byte("some data")) p.stop() @@ -161,12 +164,13 @@ func TestPipelinePostErrorc(t *testing.T) { code int err error }{ - {"http://10.0.0.1", http.StatusForbidden, nil}, - {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, + {"http://localhost:7001", http.StatusForbidden, nil}, + {"http://localhost:7001", http.StatusPreconditionFailed, nil}, } for i, tt := range tests { + picker := mustNewURLPicker(t, []string{tt.u}) errorc := make(chan error, 1) - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc) p.post([]byte("some data")) p.stop() select { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 9d03d0564..16df554b4 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -20,7 +20,6 @@ import ( "log" "net" "net/http" - "net/url" "path" "strconv" "sync" @@ -191,7 +190,7 @@ func (cw *streamWriter) stop() { // endponit and reads messages from the response body returned. type streamReader struct { tr http.RoundTripper - u string + picker *urlPicker t streamType from, to types.ID cid types.ID @@ -205,17 +204,17 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, u string, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader { r := &streamReader{ - tr: tr, - u: u, - t: t, - from: from, - to: to, - cid: cid, - recvc: recvc, - stopc: make(chan struct{}), - done: make(chan struct{}), + tr: tr, + picker: picker, + t: t, + from: from, + to: to, + cid: cid, + recvc: recvc, + stopc: make(chan struct{}), + done: make(chan struct{}), } go r.run() return r @@ -278,13 +277,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { } } -func (cr *streamReader) update(u string) { - cr.mu.Lock() - defer cr.mu.Unlock() - cr.u = u - cr.resetCloser() -} - func (cr *streamReader) updateMsgAppTerm(term uint64) { cr.mu.Lock() defer cr.mu.Unlock() @@ -312,15 +304,12 @@ func (cr *streamReader) isWorking() bool { } func (cr *streamReader) dial() (io.ReadCloser, error) { + u := cr.picker.pick() cr.mu.Lock() - u := cr.u term := cr.msgAppTerm cr.mu.Unlock() - uu, err := url.Parse(u) - if err != nil { - return nil, fmt.Errorf("parse url %s error: %v", u, err) - } + uu := u switch cr.t { case streamTypeMsgApp: // for backward compatibility of v2.0 @@ -332,6 +321,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { } req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { + cr.picker.unreachable(u) return nil, fmt.Errorf("new request to %s error: %v", u, err) } req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) @@ -344,6 +334,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { cr.mu.Unlock() resp, err := cr.tr.RoundTrip(req) if err != nil { + cr.picker.unreachable(u) return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err) } if resp.StatusCode != http.StatusOK { diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index d0ebb204d..a8c284226 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -84,7 +84,7 @@ func TestStreamReaderDialRequest(t *testing.T) { tr := &roundTripperRecorder{} sr := &streamReader{ tr: tr, - u: "http://localhost:7001", + picker: mustNewURLPicker(t, []string{"http://localhost:7001"}), t: tt, from: types.ID(1), to: types.ID(2), @@ -136,12 +136,12 @@ func TestStreamReaderDialResult(t *testing.T) { for i, tt := range tests { tr := newRespRoundTripper(tt.code, tt.err) sr := &streamReader{ - tr: tr, - u: "http://localhost:7001", - t: streamTypeMessage, - from: types.ID(1), - to: types.ID(2), - cid: types.ID(1), + tr: tr, + picker: mustNewURLPicker(t, []string{"http://localhost:7001"}), + t: streamTypeMessage, + from: types.ID(1), + to: types.ID(2), + cid: types.ID(1), } _, err := sr.dial() @@ -188,7 +188,8 @@ func TestStream(t *testing.T) { h.sw = sw recvc := make(chan raftpb.Message) - sr := startStreamReader(&http.Transport{}, srv.URL, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc) + picker := mustNewURLPicker(t, []string{srv.URL}) + sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc) defer sr.stop() if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index a97cf31c4..aaff28c22 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -17,8 +17,6 @@ package rafthttp import ( "log" "net/http" - "net/url" - "path" "sync" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -135,21 +133,18 @@ func (t *transport) Stop() { } } -func (t *transport) AddPeer(id types.ID, urls []string) { +func (t *transport) AddPeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock() if _, ok := t.peers[id]; ok { return } - // TODO: considering how to switch between all available peer urls - peerURL := urls[0] - u, err := url.Parse(peerURL) + urls, err := types.NewURLs(us) if err != nil { - log.Panicf("unexpect peer url %s", peerURL) + log.Panicf("newURLs %+v should never fail: %+v", us, err) } - u.Path = path.Join(u.Path, RaftPrefix) fs := t.leaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, fs, t.errorc) + t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc) } func (t *transport) RemovePeer(id types.ID) { @@ -177,20 +172,18 @@ func (t *transport) removePeer(id types.ID) { delete(t.leaderStats.Followers, id.String()) } -func (t *transport) UpdatePeer(id types.ID, urls []string) { +func (t *transport) UpdatePeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock() // TODO: return error or just panic? if _, ok := t.peers[id]; !ok { return } - peerURL := urls[0] - u, err := url.Parse(peerURL) + urls, err := types.NewURLs(us) if err != nil { - log.Panicf("unexpect peer url %s", peerURL) + log.Panicf("newURLs %+v should never fail: %+v", us, err) } - u.Path = path.Join(u.Path, RaftPrefix) - t.peers[id].Update(u.String()) + t.peers[id].Update(urls) } type Pausable interface { diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 87132f4ea..34f896357 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -72,7 +72,7 @@ func TestTransportAdd(t *testing.T) { leaderStats: ls, peers: make(map[types.ID]Peer), } - tr.AddPeer(1, []string{"http://a"}) + tr.AddPeer(1, []string{"http://localhost:7001"}) defer tr.Stop() if _, ok := ls.Followers["1"]; !ok { @@ -84,7 +84,7 @@ func TestTransportAdd(t *testing.T) { } // duplicate AddPeer is ignored - tr.AddPeer(1, []string{"http://a"}) + tr.AddPeer(1, []string{"http://localhost:7001"}) ns := tr.peers[types.ID(1)] if s != ns { t.Errorf("sender = %v, want %v", ns, s) @@ -97,7 +97,7 @@ func TestTransportRemove(t *testing.T) { leaderStats: stats.NewLeaderStats(""), peers: make(map[types.ID]Peer), } - tr.AddPeer(1, []string{"http://a"}) + tr.AddPeer(1, []string{"http://localhost:7001"}) tr.RemovePeer(types.ID(1)) defer tr.Stop() @@ -113,8 +113,9 @@ func TestTransportUpdate(t *testing.T) { } u := "http://localhost:7001" tr.UpdatePeer(types.ID(1), []string{u}) - if w := "http://localhost:7001/raft"; peer.u != w { - t.Errorf("url = %s, want %s", peer.u, w) + wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:7001"})) + if !reflect.DeepEqual(peer.urls, wurls) { + t.Errorf("urls = %+v, want %+v", peer.urls, wurls) } } @@ -126,7 +127,7 @@ func TestTransportErrorc(t *testing.T) { peers: make(map[types.ID]Peer), errorc: errorc, } - tr.AddPeer(1, []string{"http://a"}) + tr.AddPeer(1, []string{"http://localhost:7001"}) defer tr.Stop() select { diff --git a/rafthttp/urlpick.go b/rafthttp/urlpick.go new file mode 100644 index 000000000..f03037bf8 --- /dev/null +++ b/rafthttp/urlpick.go @@ -0,0 +1,57 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "net/url" + "sync" + + "github.com/coreos/etcd/pkg/types" +) + +type urlPicker struct { + urls types.URLs + mu sync.Mutex + picked int +} + +func newURLPicker(urls types.URLs) *urlPicker { + return &urlPicker{ + urls: urls, + } +} + +func (p *urlPicker) update(urls types.URLs) { + p.mu.Lock() + defer p.mu.Unlock() + p.urls = urls + p.picked = 0 +} + +func (p *urlPicker) pick() url.URL { + p.mu.Lock() + defer p.mu.Unlock() + return p.urls[p.picked] +} + +// unreachable notices the picker that the given url is unreachable, +// and it should use other possible urls. +func (p *urlPicker) unreachable(u url.URL) { + p.mu.Lock() + defer p.mu.Unlock() + if u == p.urls[p.picked] { + p.picked = (p.picked + 1) % len(p.urls) + } +} diff --git a/rafthttp/urlpick_test.go b/rafthttp/urlpick_test.go new file mode 100644 index 000000000..b8b390855 --- /dev/null +++ b/rafthttp/urlpick_test.go @@ -0,0 +1,73 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "net/url" + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +// TestURLPickerPickTwice tests that pick returns a possible url, +// and always returns the same one. +func TestURLPickerPick(t *testing.T) { + picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"}) + + u := picker.pick() + urlmap := map[url.URL]bool{ + url.URL{Scheme: "http", Host: "127.0.0.1:2380"}: true, + url.URL{Scheme: "http", Host: "127.0.0.1:7001"}: true, + } + if !urlmap[u] { + t.Errorf("url picked = %+v, want a possible url in %+v", u, urlmap) + } + + // pick out the same url when calling pick again + uu := picker.pick() + if u != uu { + t.Errorf("url picked = %+v, want %+v", uu, u) + } +} + +func TestURLPickerUpdate(t *testing.T) { + picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"}) + picker.update(testutil.MustNewURLs(t, []string{"http://localhost:2380", "http://localhost:7001"})) + + u := picker.pick() + urlmap := map[url.URL]bool{ + url.URL{Scheme: "http", Host: "localhost:2380"}: true, + url.URL{Scheme: "http", Host: "localhost:7001"}: true, + } + if !urlmap[u] { + t.Errorf("url picked = %+v, want a possible url in %+v", u, urlmap) + } +} + +func TestURLPickerUnreachable(t *testing.T) { + picker := mustNewURLPicker(t, []string{"http://127.0.0.1:2380", "http://127.0.0.1:7001"}) + u := picker.pick() + picker.unreachable(u) + + uu := picker.pick() + if u == uu { + t.Errorf("url picked = %+v, want other possible urls", uu) + } +} + +func mustNewURLPicker(t *testing.T, us []string) *urlPicker { + urls := testutil.MustNewURLs(t, us) + return newURLPicker(urls) +}