From ba36a16bc5b0fa5fe09dc1bac2836a7d975dde55 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 30 Apr 2014 16:04:58 -0700 Subject: [PATCH] feat(peer_server): stop service when removed It doesn't modify the exit logic, but makes external code know when removal happens and be able to determine what it should do. --- etcd/etcd.go | 8 ++++++++ server/peer_server.go | 34 +++++++++++++++++++++++++++++----- server/remove_command.go | 3 +-- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 213bfec54..414fa8c0b 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -201,6 +201,14 @@ func (e *Etcd) Run() { // the cluster could be out of work as long as the two nodes cannot transfer messages. e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) + go func() { + select { + case <-e.PeerServer.StopNotify(): + case <-e.PeerServer.RemoveNotify(): + log.Fatal("peer server is removed") + } + }() + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) diff --git a/server/peer_server.go b/server/peer_server.go index fe0891fbd..547a8328d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -62,6 +62,8 @@ type PeerServer struct { store store.Store snapConf *snapshotConf + stopNotify chan bool + removeNotify chan bool closeChan chan bool routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} @@ -261,6 +263,8 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.findCluster(discoverURL, peers) + s.stopNotify = make(chan bool) + s.removeNotify = make(chan bool) s.closeChan = make(chan bool) s.startRoutine(s.monitorSync) @@ -279,13 +283,33 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() - - if s.closeChan != nil { - close(s.closeChan) - } + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - s.closeChan = nil + close(s.stopNotify) +} + +func (s *PeerServer) asyncRemove() { + s.Lock() + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. + go func() { + defer s.Unlock() + s.raftServer.Stop() + s.routineGroup.Wait() + close(s.removeNotify) + }() +} + +func (s *PeerServer) StopNotify() <-chan bool { + return s.stopNotify +} + +func (s *PeerServer) RemoveNotify() <-chan bool { + return s.removeNotify } func (s *PeerServer) HTTPHandler() http.Handler { diff --git a/server/remove_command.go b/server/remove_command.go index 2663fbf48..b6edca8d2 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "os" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" @@ -65,7 +64,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { // command and need to be removed if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { log.Debugf("server [%s] is removed", context.Server().Name()) - os.Exit(0) + ps.asyncRemove() } else { // else ignore remove log.Debugf("ignore previous remove command.")