From 8bf71d796e23cb19a31f6e2a85121b4ad7dcf378 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 13 Nov 2014 23:03:34 -0800 Subject: [PATCH] *: gracefully stop etcdserver --- etcdmain/etcd.go | 32 ++++++++--------- etcdserver/sender.go | 74 +++++++++++++++++++++++---------------- etcdserver/sender_test.go | 61 +++++++++++++++++++++++++++++--- etcdserver/server.go | 36 +++++++++++++------ etcdserver/server_test.go | 58 ++++++++++++++++++++++++------ 5 files changed, 190 insertions(+), 71 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index b938252ba..da26390b8 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -170,8 +170,9 @@ func Main() { } shouldProxy := proxyFlag.String() != proxyFlagOff + var stopped <-chan struct{} if !shouldProxy { - err = startEtcd() + stopped, err = startEtcd() if err == discovery.ErrFullCluster && fallbackFlag.String() == fallbackFlagProxy { log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy) shouldProxy = true @@ -183,19 +184,18 @@ func Main() { if err != nil { log.Fatalf("etcd: %v", err) } - // Block indefinitely - <-make(chan struct{}) + <-stopped } // startEtcd launches the etcd server and HTTP handlers for client/server communication. -func startEtcd() error { +func startEtcd() (<-chan struct{}, error) { apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) if err != nil { - return err + return nil, err } cls, err := setupCluster(apurls) if err != nil { - return fmt.Errorf("error setting up initial cluster: %v", err) + return nil, fmt.Errorf("error setting up initial cluster: %v", err) } if *dir == "" { @@ -203,25 +203,25 @@ func startEtcd() error { log.Printf("no data-dir provided, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, privateDirMode); err != nil { - return fmt.Errorf("cannot create data directory: %v", err) + return nil, fmt.Errorf("cannot create data directory: %v", err) } if err := fileutil.IsDirWriteable(*dir); err != nil { - return fmt.Errorf("cannot write to data directory: %v", err) + return nil, fmt.Errorf("cannot write to data directory: %v", err) } pt, err := transport.NewTransport(peerTLSInfo) if err != nil { - return err + return nil, err } acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo) if err != nil { - return err + return nil, err } lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo) if err != nil { - return err + return nil, err } if !peerTLSInfo.Empty() { @@ -232,7 +232,7 @@ func startEtcd() error { var l net.Listener l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo) if err != nil { - return err + return nil, err } urlStr := u.String() @@ -248,7 +248,7 @@ func startEtcd() error { lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo) if err != nil { - return err + return nil, err } if !clientTLSInfo.Empty() { @@ -259,7 +259,7 @@ func startEtcd() error { var l net.Listener l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo) if err != nil { - return err + return nil, err } urlStr := u.String() @@ -289,7 +289,7 @@ func startEtcd() error { var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(cfg) if err != nil { - return err + return nil, err } s.Start() @@ -313,7 +313,7 @@ func startEtcd() error { log.Fatal(http.Serve(l, ch)) }(l) } - return nil + return s.StopNotify(), nil } // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 4feeaae81..527e8f9c3 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -37,23 +37,25 @@ const ( ) type sendHub struct { - tr *http.Transport - cl ClusterInfo - ss *stats.ServerStats - ls *stats.LeaderStats - senders map[types.ID]*sender + tr http.RoundTripper + cl ClusterInfo + ss *stats.ServerStats + ls *stats.LeaderStats + senders map[types.ID]*sender + shouldstop chan struct{} } // newSendHub creates the default send hub used to transport raft messages // to other members. The returned sendHub will update the given ServerStats and // LeaderStats appropriately. -func newSendHub(t *http.Transport, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { +func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { h := &sendHub{ - tr: t, - cl: cl, - ss: ss, - ls: ls, - senders: make(map[types.ID]*sender), + tr: t, + cl: cl, + ss: ss, + ls: ls, + senders: make(map[types.ID]*sender), + shouldstop: make(chan struct{}, 1), } for _, m := range cl.Members() { h.Add(m) @@ -94,6 +96,10 @@ func (h *sendHub) Stop() { } } +func (h *sendHub) ShouldStopNotify() <-chan struct{} { + return h.shouldstop +} + func (h *sendHub) Add(m *Member) { if _, ok := h.senders[m.ID]; ok { return @@ -101,7 +107,7 @@ func (h *sendHub) Add(m *Member) { // TODO: considering how to switch between all available peer urls u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix) fs := h.ls.Follower(m.ID.String()) - s := newSender(h.tr, u, h.cl.ID(), fs) + s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop) h.senders[m.ID] = s } @@ -128,22 +134,24 @@ func (h *sendHub) Update(m *Member) { } type sender struct { - tr http.RoundTripper - u string - cid types.ID - fs *stats.FollowerStats - q chan []byte - mu sync.RWMutex - wg sync.WaitGroup + tr http.RoundTripper + u string + cid types.ID + fs *stats.FollowerStats + q chan []byte + mu sync.RWMutex + wg sync.WaitGroup + shouldstop chan struct{} } -func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender { +func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { s := &sender{ - tr: tr, - u: u, - cid: cid, - fs: fs, - q: make(chan []byte), + tr: tr, + u: u, + cid: cid, + fs: fs, + q: make(chan []byte), + shouldstop: shouldstop, } s.wg.Add(connPerSender) for i := 0; i < connPerSender; i++ { @@ -201,13 +209,19 @@ func (s *sender) post(data []byte) error { switch resp.StatusCode { case http.StatusPreconditionFailed: - // TODO: shutdown the etcdserver gracefully? - log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) return nil case http.StatusForbidden: - // TODO: stop the server - log.Println("etcd: this member has been permanently removed from the cluster") - log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Println("etcdserver: this member has been permanently removed from the cluster") + log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") return nil case http.StatusNoContent: return nil diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index 2be6fdfdb..7c070f723 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -89,12 +89,40 @@ func TestSendHubRemove(t *testing.T) { } } +func TestSendHubShouldStop(t *testing.T) { + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), + } + tr := newRespRoundTripper(http.StatusForbidden, nil) + cl := newTestCluster(membs) + ls := stats.NewLeaderStats("") + h := newSendHub(tr, cl, nil, ls) + // wait for handle goroutines start + // TODO: wait for goroutines ready before return newSender + time.Sleep(10 * time.Millisecond) + + shouldstop := h.ShouldStopNotify() + select { + case <-shouldstop: + t.Fatalf("received unexpected shouldstop notification") + case <-time.After(10 * time.Millisecond): + } + h.senders[1].send([]byte("somedata")) + + testutil.ForceGosched() + select { + case <-shouldstop: + default: + t.Fatalf("cannot receive stop notification") + } +} + // TestSenderSend tests that send func could post data using roundtripper // and increase success count in stats. func TestSenderSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs) + s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) // wait for handle goroutines start // TODO: wait for goroutines ready before return newSender time.Sleep(10 * time.Millisecond) @@ -116,7 +144,7 @@ func TestSenderSend(t *testing.T) { func TestSenderExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs) + s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) // wait for handle goroutines start // TODO: wait for goroutines ready before return newSender time.Sleep(10 * time.Millisecond) @@ -144,7 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { // it increases fail count in stats. func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs) + s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) // wait for handle goroutines start // TODO: wait for goroutines ready before return newSender time.Sleep(10 * time.Millisecond) @@ -162,7 +190,7 @@ func TestSenderSendFailed(t *testing.T) { func TestSenderPost(t *testing.T) { tr := &roundTripperRecorder{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), nil) + s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) if err := s.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -204,7 +232,8 @@ func TestSenderPostBad(t *testing.T) { {"http://10.0.0.1", http.StatusCreated, nil}, } for i, tt := range tests { - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil) + shouldstop := make(chan struct{}) + s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) err := s.post([]byte("some data")) s.stop() @@ -214,6 +243,28 @@ func TestSenderPostBad(t *testing.T) { } } +func TestSenderPostShouldStop(t *testing.T) { + tests := []struct { + u string + code int + err error + }{ + {"http://10.0.0.1", http.StatusForbidden, nil}, + {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, + } + for i, tt := range tests { + shouldstop := make(chan struct{}, 1) + s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + s.post([]byte("some data")) + s.stop() + select { + case <-shouldstop: + default: + t.Fatalf("#%d: cannot receive shouldstop notification", i) + } + } +} + type roundTripperBlocker struct { c chan struct{} } diff --git a/etcdserver/server.go b/etcdserver/server.go index 9b82967ef..48c8c480f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -91,6 +91,7 @@ type Sender interface { Remove(id types.ID) Update(m *Member) Stop() + ShouldStopNotify() <-chan struct{} } type Storage interface { @@ -327,6 +328,14 @@ func (s *EtcdServer) run() { // snapi indicates the index of the last submitted snapshot request var snapi, appliedi uint64 var nodes []uint64 + var shouldstop bool + shouldstopC := s.sender.ShouldStopNotify() + + defer func() { + s.node.Stop() + s.sender.Stop() + close(s.done) + }() for { select { case <-s.Ticker: @@ -372,7 +381,9 @@ func (s *EtcdServer) run() { if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) { ents = rd.CommittedEntries[appliedi+1-firsti:] } - appliedi = s.apply(ents) + if appliedi, shouldstop = s.apply(ents); shouldstop { + return + } } s.node.Advance() @@ -386,10 +397,9 @@ func (s *EtcdServer) run() { } case <-syncC: s.sync(defaultSyncTimeout) + case <-shouldstopC: + return case <-s.stop: - s.node.Stop() - s.sender.Stop() - close(s.done) return } } @@ -612,7 +622,7 @@ func getExpirationTime(r *pb.Request) time.Time { // apply takes an Entry received from Raft (after it has been committed) and // applies it to the current state of the EtcdServer -func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { +func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) { var applied uint64 for i := range es { e := es[i] @@ -624,7 +634,11 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - s.w.Trigger(cc.ID, s.applyConfChange(cc)) + shouldstop, err := s.applyConfChange(cc) + s.w.Trigger(cc.ID, err) + if shouldstop { + return applied, true + } default: log.Panicf("entry type should be either EntryNormal or EntryConfChange") } @@ -632,7 +646,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { atomic.StoreUint64(&s.raftTerm, e.Term) applied = e.Index } - return applied + return applied, false } // applyRequest interprets r as a call to store.X and returns a Response interpreted @@ -686,11 +700,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) - return err + return false, err } s.node.ApplyConfChange(cc) switch cc.Type { @@ -714,6 +728,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { s.Cluster.RemoveMember(id) if id == s.id { log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID()) + log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + return true, nil } else { s.sender.Remove(id) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) @@ -734,7 +750,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } } - return nil + return false, nil } // TODO: non-blocking snapshot diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1ed391698..863e92f4e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -474,7 +474,7 @@ func TestApplyConfChangeError(t *testing.T) { node: n, Cluster: cl, } - err := srv.applyConfChange(tt.cc) + _, err := srv.applyConfChange(tt.cc) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -491,6 +491,42 @@ func TestApplyConfChangeError(t *testing.T) { } } +func TestApplyConfChangeShouldStop(t *testing.T) { + cl := newCluster("") + cl.SetStore(store.New()) + for i := 1; i <= 3; i++ { + cl.AddMember(&Member{ID: types.ID(i)}) + } + srv := &EtcdServer{ + id: 1, + node: &nodeRecorder{}, + Cluster: cl, + sender: &nopSender{}, + } + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeRemoveNode, + NodeID: 2, + } + // remove non-local member + shouldStop, err := srv.applyConfChange(cc) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if shouldStop != false { + t.Errorf("shouldStop = %t, want %t", shouldStop, false) + } + + // remove local member + cc.NodeID = 1 + shouldStop, err = srv.applyConfChange(cc) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if shouldStop != true { + t.Errorf("shouldStop = %t, want %t", shouldStop, true) + } +} + func TestClusterOf1(t *testing.T) { testServer(t, 1) } func TestClusterOf3(t *testing.T) { testServer(t, 3) } @@ -503,10 +539,11 @@ func (s *fakeSender) Send(msgs []raftpb.Message) { s.ss[m.To-1].node.Step(context.TODO(), m) } } -func (s *fakeSender) Add(m *Member) {} -func (s *fakeSender) Update(m *Member) {} -func (s *fakeSender) Remove(id types.ID) {} -func (s *fakeSender) Stop() {} +func (s *fakeSender) Add(m *Member) {} +func (s *fakeSender) Update(m *Member) {} +func (s *fakeSender) Remove(id types.ID) {} +func (s *fakeSender) Stop() {} +func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil } func testServer(t *testing.T, ns uint64) { ctx, cancel := context.WithCancel(context.Background()) @@ -1556,11 +1593,12 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} type nopSender struct{} -func (s *nopSender) Send(m []raftpb.Message) {} -func (s *nopSender) Add(m *Member) {} -func (s *nopSender) Remove(id types.ID) {} -func (s *nopSender) Update(m *Member) {} -func (s *nopSender) Stop() {} +func (s *nopSender) Send(m []raftpb.Message) {} +func (s *nopSender) Add(m *Member) {} +func (s *nopSender) Remove(id types.ID) {} +func (s *nopSender) Update(m *Member) {} +func (s *nopSender) Stop() {} +func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil } func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { peers := make([]raft.Peer, len(ids))