From db0b505de515f9f64bb782000c802bf097552737 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 2 Feb 2016 21:00:15 -0800 Subject: [PATCH] rafthttp: add requester to transport if peer does not exist cluster integration now supports adding members with stopped nodes, too Fixes #3699 --- integration/cluster.go | 59 +++++++++++++++++++++++++++---------- integration/cluster_test.go | 47 +++++++++++++++++++++++++++++ rafthttp/http.go | 14 +++++++-- rafthttp/http_test.go | 16 ++++++---- rafthttp/peer.go | 21 +++++++++---- rafthttp/stream.go | 34 +++++++++++++-------- rafthttp/stream_test.go | 39 +++++++++++++----------- rafthttp/transport.go | 4 +-- rafthttp/transport_test.go | 2 +- 9 files changed, 175 insertions(+), 61 deletions(-) diff --git a/integration/cluster.go b/integration/cluster.go index c556a0b30..f4b1b626c 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -153,9 +153,15 @@ func (c *cluster) URL(i int) string { return c.Members[i].ClientURLs[0].String() } +// URLs returns a list of all active client URLs in the cluster func (c *cluster) URLs() []string { urls := make([]string, 0) for _, m := range c.Members { + select { + case <-m.s.StopNotify(): + continue + default: + } for _, u := range m.ClientURLs { urls = append(urls, u.String()) } @@ -163,9 +169,10 @@ func (c *cluster) URLs() []string { return urls } +// HTTPMembers returns a list of all active members as client.Members func (c *cluster) HTTPMembers() []client.Member { - ms := make([]client.Member, len(c.Members)) - for i, m := range c.Members { + ms := []client.Member{} + for _, m := range c.Members { pScheme, cScheme := "http", "http" if m.PeerTLSInfo != nil { pScheme = "https" @@ -173,13 +180,14 @@ func (c *cluster) HTTPMembers() []client.Member { if m.ClientTLSInfo != nil { cScheme = "https" } - ms[i].Name = m.Name + cm := client.Member{Name: m.Name} for _, ln := range m.PeerListeners { - ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String()) + cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String()) } for _, ln := range m.ClientListeners { - ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String()) + cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String()) } + ms = append(ms, cm) } return ms } @@ -206,18 +214,17 @@ func (c *cluster) addMember(t *testing.T) { } // send add request to the cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) - ma := client.NewMembersAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() - if _, err := ma.Add(ctx, peerURL); err != nil { - t.Fatalf("add member on %s error: %v", c.URL(0), err) + var err error + for i := 0; i < len(c.Members); i++ { + clientURL := c.URL(i) + peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() + if err = c.addMemberByURL(t, clientURL, peerURL); err == nil { + break + } + } + if err != nil { + t.Fatalf("add member failed on all members error: %v", err) } - cancel() - - // wait for the add node entry applied in the cluster - members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) - c.waitMembersMatch(t, members) m.InitialPeerURLsMap = types.URLsMap{} for _, mm := range c.Members { @@ -233,6 +240,21 @@ func (c *cluster) addMember(t *testing.T) { c.waitMembersMatch(t, c.HTTPMembers()) } +func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { + cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := ma.Add(ctx, peerURL); err != nil { + return err + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + return nil +} + func (c *cluster) AddMember(t *testing.T) { c.addMember(t) } @@ -299,6 +321,11 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int { for lead == 0 || !possibleLead[lead] { lead = 0 for _, m := range membs { + select { + case <-m.s.StopNotify(): + continue + default: + } if lead != 0 && lead != m.s.Lead() { lead = 0 break diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 06069ce53..1492ba4a2 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -290,6 +290,53 @@ func TestIssue2904(t *testing.T) { c.waitMembersMatch(t, c.HTTPMembers()) } +// TestIssue3699 tests minority failure during cluster configuration; it was +// deadlocking. +func TestIssue3699(t *testing.T) { + // start a cluster of 3 nodes a, b, c + defer testutil.AfterTest(t) + c := NewCluster(t, 3) + c.Launch(t) + defer c.Terminate(t) + + // make node a unavailable + c.Members[0].Stop(t) + <-c.Members[0].s.StopNotify() + + // add node d + c.AddMember(t) + + // electing node d as leader makes node a unable to participate + leaderID := c.waitLeader(t, c.Members) + for leaderID != 3 { + c.Members[leaderID].Stop(t) + <-c.Members[leaderID].s.StopNotify() + c.Members[leaderID].Restart(t) + leaderID = c.waitLeader(t, c.Members) + } + + // bring back node a + // node a will remain useless as long as d is the leader. + err := c.Members[0].Restart(t) + select { + case <-c.Members[0].s.StopNotify(): + t.Fatalf("should not be stopped") + default: + } + // must waitLeader so goroutines don't leak on terminate + leaderID = c.waitLeader(t, c.Members) + + // try to participate in cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) + kapi := client.NewKeysAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + _, err = kapi.Set(ctx, "/foo", "bar", nil) + cancel() + if err != nil { + t.Fatalf("unexpected error on Set (%v)", err) + } +} + // clusterMustProgress ensures that cluster can make progress. It creates // a random key first, and check the new key could be got from all client urls // of the cluster. diff --git a/rafthttp/http.go b/rafthttp/http.go index 516de20d5..2493d4a3f 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "net/http" "path" + "strings" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pioutil "github.com/coreos/etcd/pkg/ioutil" @@ -198,15 +199,17 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type streamHandler struct { + tr *Transport peerGetter peerGetter r Raft id types.ID cid types.ID } -func newStreamHandler(peerGetter peerGetter, r Raft, id, cid types.ID) http.Handler { +func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { return &streamHandler{ - peerGetter: peerGetter, + tr: tr, + peerGetter: pg, r: r, id: id, cid: cid, @@ -253,6 +256,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } p := h.peerGetter.Get(from) + if p == nil { + if urls := r.Header.Get("X-Server-Peers"); urls != "" { + h.tr.AddPeer(from, strings.Split(urls, ",")) + } + p = h.peerGetter.Get(from) + } + if p == nil { // This may happen in following cases: // 1. user starts a remote peer that belongs to a different cluster diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index eafa65357..5d293afe2 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -21,6 +21,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -183,7 +184,8 @@ func TestServeRaftStreamPrefix(t *testing.T) { peer := newFakePeer() peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}} - h := newStreamHandler(peerGetter, &fakeRaft{}, types.ID(2), types.ID(1)) + tr := &Transport{} + h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1)) rw := httptest.NewRecorder() go h.ServeHTTP(rw, req) @@ -296,9 +298,10 @@ func TestServeRaftStreamPrefixBad(t *testing.T) { req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Raft-To", tt.remote) rw := httptest.NewRecorder() + tr := &Transport{} peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}} r := &fakeRaft{removedID: removedID} - h := newStreamHandler(peerGetter, r, types.ID(1), types.ID(1)) + h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1)) h.ServeHTTP(rw, req) if rw.Code != tt.wcode { @@ -343,19 +346,22 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message snapMsgs []snap.Message - urls types.URLs + peerURLs types.URLs connc chan *outgoingConn } func newFakePeer() *fakePeer { + fakeURL, _ := url.Parse("http://localhost") return &fakePeer{ - connc: make(chan *outgoingConn, 1), + connc: make(chan *outgoingConn, 1), + peerURLs: types.URLs{*fakeURL}, } } func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } func (pr *fakePeer) sendSnap(m snap.Message) { pr.snapMsgs = append(pr.snapMsgs, m) } -func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls } +func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) stop() {} +func (pr *fakePeer) urls() types.URLs { return pr.peerURLs } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5b45edbc9..0213594c3 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,7 +15,6 @@ package rafthttp import ( - "net/http" "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -65,6 +64,10 @@ type Peer interface { // update updates the urls of remote peer. update(urls types.URLs) + + // urls retrieves the urls of the remote peer + urls() types.URLs + // attachOutgoingConn attaches the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection @@ -97,6 +100,8 @@ type peer struct { status *peerStatus + picker *urlPicker + msgAppV2Writer *streamWriter writer *streamWriter pipeline *pipeline @@ -116,14 +121,16 @@ type peer struct { done chan struct{} } -func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { - picker := newURLPicker(urls) +func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { status := newPeerStatus(to) + picker := newURLPicker(urls) + pipelineRt := transport.pipelineRt p := &peer{ id: to, r: r, v3demo: v3demo, status: status, + picker: picker, msgAppV2Writer: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), @@ -154,8 +161,8 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t } }() - p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) - reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) + p.msgAppV2Reader = startStreamReader(p, transport.streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) + reader := startStreamReader(p, transport.streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) go func() { var paused bool for { @@ -222,6 +229,10 @@ func (p *peer) update(urls types.URLs) { } } +func (p *peer) urls() types.URLs { + return p.picker.urls +} + func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index c50afe748..870bcdfc0 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -226,6 +226,7 @@ func (cw *streamWriter) stop() { // streamReader is a long-running go-routine that dials to the remote stream // endpoint and reads messages from the response body returned. type streamReader struct { + localPeer Peer tr http.RoundTripper picker *urlPicker t streamType @@ -243,20 +244,21 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { +func startStreamReader(p Peer, tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { r := &streamReader{ - tr: tr, - picker: picker, - t: t, - local: local, - remote: remote, - cid: cid, - status: status, - recvc: recvc, - propc: propc, - errorc: errorc, - stopc: make(chan struct{}), - done: make(chan struct{}), + localPeer: p, + tr: tr, + picker: picker, + t: t, + local: local, + remote: remote, + cid: cid, + status: status, + recvc: recvc, + propc: propc, + errorc: errorc, + stopc: make(chan struct{}), + done: make(chan struct{}), } go r.run() return r @@ -372,6 +374,12 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Raft-To", cr.remote.String()) + var peerURLs []string + for _, url := range cr.localPeer.urls() { + peerURLs = append(peerURLs, url.String()) + } + req.Header.Set("X-Server-Peers", strings.Join(peerURLs, ",")) + cr.mu.Lock() select { case <-cr.stopc: diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index ccdcb0a1a..4aece9987 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -116,11 +116,12 @@ func TestStreamReaderDialRequest(t *testing.T) { for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { tr := &roundTripperRecorder{} sr := &streamReader{ - tr: tr, - picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - local: types.ID(1), - remote: types.ID(2), - cid: types.ID(1), + tr: tr, + localPeer: newFakePeer(), + picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + local: types.ID(1), + remote: types.ID(2), + cid: types.ID(1), } sr.dial(tt) @@ -166,12 +167,13 @@ func TestStreamReaderDialResult(t *testing.T) { err: tt.err, } sr := &streamReader{ - tr: tr, - picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - local: types.ID(1), - remote: types.ID(2), - cid: types.ID(1), - errorc: make(chan error, 1), + tr: tr, + localPeer: newFakePeer(), + picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + local: types.ID(1), + remote: types.ID(2), + cid: types.ID(1), + errorc: make(chan error, 1), } _, err := sr.dial(streamTypeMessage) @@ -194,11 +196,12 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { header: http.Header{}, } sr := &streamReader{ - tr: tr, - picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - local: types.ID(1), - remote: types.ID(2), - cid: types.ID(1), + tr: tr, + localPeer: newFakePeer(), + picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + local: types.ID(1), + remote: types.ID(2), + cid: types.ID(1), } _, err := sr.dial(typ) @@ -254,7 +257,9 @@ func TestStream(t *testing.T) { h.sw = sw picker := mustNewURLPicker(t, []string{srv.URL}) - sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil) + tr := &http.Transport{} + peer := newFakePeer() + sr := startStreamReader(peer, tr, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil) defer sr.stop() // wait for stream to work var writec chan<- raftpb.Message diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 25a7cf5ed..289e5ba93 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -140,7 +140,7 @@ func (t *Transport) Start() error { func (t *Transport) Handler() http.Handler { pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID) - streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID) + streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) snapHandler := newSnapshotHandler(t.Raft, t.Snapshotter, t.ClusterID) mux := http.NewServeMux() mux.Handle(RaftPrefix, pipelineHandler) @@ -226,7 +226,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo) + t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo) addPeerToProber(t.prober, id.String(), us) } diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 64a64ee00..ca465f83f 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -120,7 +120,7 @@ func TestTransportUpdate(t *testing.T) { u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"})) - if !reflect.DeepEqual(peer.urls, wurls) { + if !reflect.DeepEqual(peer.peerURLs, wurls) { t.Errorf("urls = %+v, want %+v", peer.urls, wurls) } }