diff --git a/etcd/etcd.go b/etcd/etcd.go index 2c2334cd9..8abb4c92c 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -211,6 +211,15 @@ func (e *Etcd) Run() { // the cluster could be out of work as long as the two nodes cannot transfer messages. e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) + go func() { + select { + case <-e.PeerServer.StopNotify(): + case <-e.PeerServer.RemoveNotify(): + log.Infof("peer server is removed") + os.Exit(0) + } + }() + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) diff --git a/server/peer_server.go b/server/peer_server.go index 5d03e082c..34c90c0c6 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -62,6 +62,9 @@ type PeerServer struct { store store.Store snapConf *snapshotConf + stopNotify chan bool + removeNotify chan bool + started bool closeChan chan bool routineGroup sync.WaitGroup timeoutThresholdChan chan interface{} @@ -234,10 +237,15 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) { return } -// Start the raft server +// Start starts the raft server. +// The function assumes that join has been accepted successfully. func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { s.Lock() defer s.Unlock() + if s.started { + return nil + } + s.started = true // LoadSnapshot if snapshot { @@ -261,6 +269,8 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er s.findCluster(discoverURL, peers) + s.stopNotify = make(chan bool) + s.removeNotify = make(chan bool) s.closeChan = make(chan bool) s.startRoutine(s.monitorSync) @@ -276,16 +286,57 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er return nil } +// Stop stops the server gracefully. func (s *PeerServer) Stop() { s.Lock() defer s.Unlock() - - if s.closeChan != nil { - close(s.closeChan) + if !s.started { + return } + s.started = false + + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - s.closeChan = nil + close(s.stopNotify) +} + +// asyncRemove stops the server in peer mode. +// It is called to stop the server internally when it has been removed +// from the cluster. +// The function triggers the stop action first to notice server that it +// should not continue, and wait for its stop in separate goroutine because +// the caller should also exit. +func (s *PeerServer) asyncRemove() { + s.Lock() + if !s.started { + s.Unlock() + return + } + s.started = false + + close(s.closeChan) + // TODO(yichengq): it should also call async stop for raft server, + // but this functionality has not been implemented. + go func() { + s.raftServer.Stop() + s.routineGroup.Wait() + close(s.removeNotify) + s.Unlock() + }() +} + +// StopNotify notifies the server is stopped. +func (s *PeerServer) StopNotify() <-chan bool { + return s.stopNotify +} + +// RemoveNotify notifies the server is removed from peer mode due to +// removal from the cluster. +func (s *PeerServer) RemoveNotify() <-chan bool { + return s.removeNotify } func (s *PeerServer) HTTPHandler() http.Handler { diff --git a/server/remove_command.go b/server/remove_command.go index 2663fbf48..b6edca8d2 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "os" "github.com/coreos/etcd/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" @@ -65,7 +64,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { // command and need to be removed if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { log.Debugf("server [%s] is removed", context.Server().Name()) - os.Exit(0) + ps.asyncRemove() } else { // else ignore remove log.Debugf("ignore previous remove command.")