From 15be030aaaaa3dab1b8b2ab9968783b88e5a0091 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 2 Jan 2015 20:00:29 -0800 Subject: [PATCH] etcdserver: collect error from errorc --- etcdserver/server.go | 29 +++++++++++++---------------- etcdserver/server_test.go | 1 - rafthttp/peer.go | 21 ++++++++++----------- rafthttp/peer_test.go | 13 ++++++------- rafthttp/transport.go | 18 ++++++------------ rafthttp/transport_test.go | 14 +++++++------- 6 files changed, 42 insertions(+), 54 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 0983101e9..57abc0326 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -121,8 +121,9 @@ type RaftTimer interface { type EtcdServer struct { cfg *ServerConfig w wait.Wait - done chan struct{} stop chan struct{} + done chan struct{} + errorc chan error id types.ID attributes Attributes @@ -253,6 +254,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv := &EtcdServer{ cfg: cfg, + errorc: make(chan error, 1), store: st, node: n, raftStorage: s, @@ -268,7 +270,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), } - tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, sstats, lstats) + tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) // add all the remote members into sendhub for _, m := range cfg.Cluster.Members() { if m.Name != cfg.Name { @@ -341,7 +343,6 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { func (s *EtcdServer) run() { var syncC <-chan time.Time var shouldstop bool - shouldstopC := s.transport.ShouldStopNotify() // load initial state from raft storage snap, err := s.raftStorage.Snapshot() @@ -420,9 +421,7 @@ func (s *EtcdServer) run() { } if len(ents) > 0 { if appliedi, shouldstop = s.apply(ents, &confState); shouldstop { - m1 := fmt.Sprintf("etcdserver: removed local member %s from cluster %s", s.ID(), s.Cluster.ID()) - m2 := fmt.Sprint("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") - go s.stopWithDelay(10*100*time.Millisecond, m1, m2) + go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } } @@ -436,7 +435,9 @@ func (s *EtcdServer) run() { } case <-syncC: s.sync(defaultSyncTimeout) - case <-shouldstopC: + case err := <-s.errorc: + log.Printf("etcdserver: %s", err) + log.Printf("etcdserver: the data-dir used by this member must be removed.") return case <-s.stop: return @@ -447,24 +448,20 @@ func (s *EtcdServer) run() { // Stop stops the server gracefully, and shuts down the running goroutine. // Stop should be called after a Start(s), otherwise it will block forever. func (s *EtcdServer) Stop() { - s.stopWithMessages() -} - -func (s *EtcdServer) stopWithMessages(msgs ...string) { select { case s.stop <- struct{}{}: - for _, msg := range msgs { - log.Println(msg) - } case <-s.done: return } <-s.done } -func (s *EtcdServer) stopWithDelay(d time.Duration, msgs ...string) { +func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { time.Sleep(d) - s.stopWithMessages(msgs...) + select { + case s.errorc <- err: + default: + } } // StopNotify returns a channel that receives a empty struct diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index fe86ecdad..84309882e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1370,6 +1370,5 @@ func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} func (s *nopTransporter) Stop() {} -func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil } func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5493371f9..6f0c8a829 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -49,10 +49,10 @@ type peer struct { id types.ID cid types.ID - tr http.RoundTripper - r Raft - fs *stats.FollowerStats - shouldstop chan struct{} + tr http.RoundTripper + r Raft + fs *stats.FollowerStats + errorc chan error batcher *Batcher propBatcher *ProposalBatcher @@ -72,7 +72,7 @@ type peer struct { paused bool } -func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer { +func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { p := &peer{ id: id, active: true, @@ -82,7 +82,7 @@ func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, r: r, fs: fs, stream: &stream{}, - shouldstop: shouldstop, + errorc: errorc, batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), q: make(chan *raftpb.Message, senderBufSize), @@ -224,19 +224,18 @@ func (p *peer) post(data []byte) error { switch resp.StatusCode { case http.StatusPreconditionFailed: + err := fmt.Errorf("conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) select { - case p.shouldstop <- struct{}{}: + case p.errorc <- err: default: } - log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) return nil case http.StatusForbidden: + err := fmt.Errorf("the member has been permanently removed from the cluster") select { - case p.shouldstop <- struct{}{}: + case p.errorc <- err: default: } - log.Println("rafthttp: this member has been permanently removed from the cluster") - log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") return nil case http.StatusNoContent: return nil diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index 4edafbd0b..6d15ba6f1 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -144,8 +144,7 @@ func TestSenderPostBad(t *testing.T) { {"http://10.0.0.1", http.StatusCreated, nil}, } for i, tt := range tests { - shouldstop := make(chan struct{}) - p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) + p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, make(chan error)) err := p.post([]byte("some data")) p.Stop() @@ -155,7 +154,7 @@ func TestSenderPostBad(t *testing.T) { } } -func TestSenderPostShouldStop(t *testing.T) { +func TestPeerPostErrorc(t *testing.T) { tests := []struct { u string code int @@ -165,14 +164,14 @@ func TestSenderPostShouldStop(t *testing.T) { {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, } for i, tt := range tests { - shouldstop := make(chan struct{}, 1) - p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) + errorc := make(chan error, 1) + p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, errorc) p.post([]byte("some data")) p.Stop() select { - case <-shouldstop: + case <-errorc: default: - t.Fatalf("#%d: cannot receive shouldstop notification", i) + t.Fatalf("#%d: cannot receive from errorc", i) } } } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index e208bc1e0..e5d7a100f 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -25,7 +25,6 @@ type Transporter interface { RemovePeer(id types.ID) UpdatePeer(id types.ID, urls []string) Stop() - ShouldStopNotify() <-chan struct{} } type transport struct { @@ -36,12 +35,12 @@ type transport struct { serverStats *stats.ServerStats leaderStats *stats.LeaderStats - mu sync.RWMutex // protect the peer map - peers map[types.ID]*peer // remote peers - shouldstop chan struct{} + mu sync.RWMutex // protect the peer map + peers map[types.ID]*peer // remote peers + errorc chan error } -func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter { +func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter { return &transport{ roundTripper: rt, id: id, @@ -50,7 +49,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.Se serverStats: ss, leaderStats: ls, peers: make(map[types.ID]*peer), - shouldstop: make(chan struct{}, 1), + errorc: errorc, } } @@ -99,10 +98,6 @@ func (t *transport) Stop() { } } -func (t *transport) ShouldStopNotify() <-chan struct{} { - return t.shouldstop -} - func (t *transport) AddPeer(id types.ID, urls []string) { t.mu.Lock() defer t.mu.Unlock() @@ -117,8 +112,7 @@ func (t *transport) AddPeer(id types.ID, urls []string) { } u.Path = path.Join(u.Path, RaftPrefix) fs := t.leaderStats.Follower(id.String()) - t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID, - t.raft, fs, t.shouldstop) + t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID, t.raft, fs, t.errorc) } func (t *transport) RemovePeer(id types.ID) { diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 35fc87033..b3b451c3a 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -64,27 +64,27 @@ func TestTransportRemove(t *testing.T) { } } -func TestTransportShouldStop(t *testing.T) { +func TestTransportErrorc(t *testing.T) { + errorc := make(chan error, 1) tr := &transport{ roundTripper: newRespRoundTripper(http.StatusForbidden, nil), leaderStats: stats.NewLeaderStats(""), peers: make(map[types.ID]*peer), - shouldstop: make(chan struct{}, 1), + errorc: errorc, } tr.AddPeer(1, []string{"http://a"}) - shouldstop := tr.ShouldStopNotify() select { - case <-shouldstop: - t.Fatalf("received unexpected shouldstop notification") + case <-errorc: + t.Fatalf("received unexpected from errorc") case <-time.After(10 * time.Millisecond): } tr.peers[1].Send(raftpb.Message{}) testutil.ForceGosched() select { - case <-shouldstop: + case <-errorc: default: - t.Fatalf("cannot receive stop notification") + t.Fatalf("cannot receive error from errorc") } }