diff --git a/etcd/etcd.go b/etcd/etcd.go index 213bfec54..414fa8c0b 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -201,6 +201,14 @@ func (e *Etcd) Run() { // the cluster could be out of work as long as the two nodes cannot transfer messages. e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) + go func() { + select { + case <-e.PeerServer.StopNotify(): + case <-e.PeerServer.RemoveNotify(): + log.Fatal("peer server is removed") + } + }() + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) diff --git a/server/peer_server.go b/server/peer_server.go index fe0891fbd..547a8328d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -62,6 +62,8 @@ type PeerServer struct { store store.Store snapConf *snapshotConf + stopNotify chan bool + removeNotify chan bool closeChan chan bool routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} @@ -261,6 +263,8 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.findCluster(discoverURL, peers) + s.stopNotify = make(chan bool) + s.removeNotify = make(chan bool) s.closeChan = make(chan bool) s.startRoutine(s.monitorSync) @@ -279,13 +283,33 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() - - if s.closeChan != nil { - close(s.closeChan) - } + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - s.closeChan = nil + close(s.stopNotify) +} + +func (s *PeerServer) asyncRemove() { + s.Lock() + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. + go func() { + defer s.Unlock() + s.raftServer.Stop() + s.routineGroup.Wait() + close(s.removeNotify) + }() +} + +func (s *PeerServer) StopNotify() <-chan bool { + return s.stopNotify +} + +func (s *PeerServer) RemoveNotify() <-chan bool { + return s.removeNotify } func (s *PeerServer) HTTPHandler() http.Handler { diff --git a/server/remove_command.go b/server/remove_command.go index 2663fbf48..b6edca8d2 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "os" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" @@ -65,7 +64,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { // command and need to be removed if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { log.Debugf("server [%s] is removed", context.Server().Name()) - os.Exit(0) + ps.asyncRemove() } else { // else ignore remove log.Debugf("ignore previous remove command.")