From ba36a16bc5b0fa5fe09dc1bac2836a7d975dde55 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 30 Apr 2014 16:04:58 -0700 Subject: [PATCH 1/6] feat(peer_server): stop service when removed It doesn't modify the exit logic, but makes external code know when removal happens and be able to determine what it should do. --- etcd/etcd.go | 8 ++++++++ server/peer_server.go | 34 +++++++++++++++++++++++++++++----- server/remove_command.go | 3 +-- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 213bfec54..414fa8c0b 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -201,6 +201,14 @@ func (e *Etcd) Run() { // the cluster could be out of work as long as the two nodes cannot transfer messages. e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) + go func() { + select { + case <-e.PeerServer.StopNotify(): + case <-e.PeerServer.RemoveNotify(): + log.Fatal("peer server is removed") + } + }() + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) diff --git a/server/peer_server.go b/server/peer_server.go index fe0891fbd..547a8328d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -62,6 +62,8 @@ type PeerServer struct { store store.Store snapConf *snapshotConf + stopNotify chan bool + removeNotify chan bool closeChan chan bool routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} @@ -261,6 +263,8 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.findCluster(discoverURL, peers) + s.stopNotify = make(chan bool) + s.removeNotify = make(chan bool) s.closeChan = make(chan bool) s.startRoutine(s.monitorSync) @@ -279,13 +283,33 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() - - if s.closeChan != nil { - close(s.closeChan) - } + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - s.closeChan = nil + close(s.stopNotify) +} + +func (s *PeerServer) asyncRemove() { + s.Lock() + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. + go func() { + defer s.Unlock() + s.raftServer.Stop() + s.routineGroup.Wait() + close(s.removeNotify) + }() +} + +func (s *PeerServer) StopNotify() <-chan bool { + return s.stopNotify +} + +func (s *PeerServer) RemoveNotify() <-chan bool { + return s.removeNotify } func (s *PeerServer) HTTPHandler() http.Handler { diff --git a/server/remove_command.go b/server/remove_command.go index 2663fbf48..b6edca8d2 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "os" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" @@ -65,7 +64,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { // command and need to be removed if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { log.Debugf("server [%s] is removed", context.Server().Name()) - os.Exit(0) + ps.asyncRemove() } else { // else ignore remove log.Debugf("ignore previous remove command.") From cf25650b3c9d27543e7dc1501bdeb8f92b1c6cd9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 12:39:07 -0700 Subject: [PATCH 2/6] fix(etcd): exit 0 when removed --- etcd/etcd.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 414fa8c0b..05748f26f 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -205,7 +205,8 @@ func (e *Etcd) Run() { select { case <-e.PeerServer.StopNotify(): case <-e.PeerServer.RemoveNotify(): - log.Fatal("peer server is removed") + log.Infof("peer server is removed") + os.Exit(0) } }() From 206881bfec1e65a9076a4cc72ae8b73dbe57194e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 12:44:48 -0700 Subject: [PATCH 3/6] fix(peer_server): check running status before start/stop This makes peer server more robust. --- server/peer_server.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/peer_server.go b/server/peer_server.go index 547a8328d..618024176 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -64,6 +64,7 @@ type PeerServer struct { stopNotify chan bool removeNotify chan bool + started bool closeChan chan bool routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} @@ -240,6 +241,10 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) { func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { s.Lock() defer s.Unlock() + if s.started { + return nil + } + s.started = true // LoadSnapshot if snapshot { @@ -283,6 +288,11 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() + if !s.started { + return + } + s.started = false + close(s.closeChan) // TODO(yichengq): it should also call async stop for raft server, // but this functionality has not been implemented. @@ -293,6 +303,12 @@ func (s *PeerServer) Stop() { func (s *PeerServer) asyncRemove() { s.Lock() + if !s.started { + s.Unlock() + return + } + s.started = false + close(s.closeChan) // TODO(yichengq): it should also call async stop for raft server, // but this functionality has not been implemented. From bed20b78370c0c7da1d02d7de9208eb51a1cf083 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 12:51:41 -0700 Subject: [PATCH 4/6] chore(peer_server): add more function description --- server/peer_server.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/peer_server.go b/server/peer_server.go index 618024176..372604d40 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -237,7 +237,8 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) { return } -// Start the raft server +// Start starts the raft server. +// The function assumes that join has been accepted successfully. func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { s.Lock() defer s.Unlock() @@ -285,6 +286,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er return nil } +// Stop stops the server gracefully. func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() @@ -301,6 +303,9 @@ func (s *PeerServer) Stop() { close(s.stopNotify) } +// asyncRemove stops the server in peer mode. +// It is called to stop the server because it has been removed +// from the cluster. func (s *PeerServer) asyncRemove() { s.Lock() if !s.started { @@ -320,10 +325,13 @@ func (s *PeerServer) asyncRemove() { }() } +// StopNotify notifies the server is stopped. func (s *PeerServer) StopNotify() <-chan bool { return s.stopNotify } +// RemoveNotify notifies the server is removed from peer mode due to +// removal from the cluster. func (s *PeerServer) RemoveNotify() <-chan bool { return s.removeNotify } From 54652012927e47c36c0cec04f9c0765a6cb0ebb3 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 16:31:17 -0700 Subject: [PATCH 5/6] chore(peer_server): more explanation for asyncRemove --- server/peer_server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/peer_server.go b/server/peer_server.go index 372604d40..2ce614724 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -304,8 +304,11 @@ func (s *PeerServer) Stop() { } // asyncRemove stops the server in peer mode. -// It is called to stop the server because it has been removed +// It is called to stop the server internally when it has been removed // from the cluster. +// The function triggers the stop action first to notice server that it +// should not continue, and wait for its stop in separate goroutine because +// the caller should also exit. func (s *PeerServer) asyncRemove() { s.Lock() if !s.started { From 5c7a963cf015194c486efbe46dfab619355eaae0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 8 May 2014 13:20:46 -0700 Subject: [PATCH 6/6] chore(peer_server): adjust code to make it more clear --- server/peer_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/peer_server.go b/server/peer_server.go index 2ce614724..17e71453d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -321,10 +321,10 @@ func (s *PeerServer) asyncRemove() { // TODO(yichengq): it should also call async stop for raft server, // but this functionality has not been implemented. go func() { - defer s.Unlock() s.raftServer.Stop() s.routineGroup.Wait() close(s.removeNotify) + s.Unlock() }() }