Merge pull request #772 from unihorn/81

feat(peer_server): stop service when removed
This commit is contained in:
Yicheng Qin
2014-05-08 14:02:09 -07:00
3 changed files with 66 additions and 7 deletions

View File

@@ -211,6 +211,15 @@ func (e *Etcd) Run() {
// the cluster could be out of work as long as the two nodes cannot transfer messages. // the cluster could be out of work as long as the two nodes cannot transfer messages.
e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers)
go func() {
select {
case <-e.PeerServer.StopNotify():
case <-e.PeerServer.RemoveNotify():
log.Infof("peer server is removed")
os.Exit(0)
}
}()
log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL)
e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig)

View File

@@ -62,6 +62,9 @@ type PeerServer struct {
store store.Store store store.Store
snapConf *snapshotConf snapConf *snapshotConf
stopNotify chan bool
removeNotify chan bool
started bool
closeChan chan bool closeChan chan bool
routineGroup sync.WaitGroup routineGroup sync.WaitGroup
timeoutThresholdChan chan interface{} timeoutThresholdChan chan interface{}
@@ -234,10 +237,15 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) {
return return
} }
// Start the raft server // Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if s.started {
return nil
}
s.started = true
// LoadSnapshot // LoadSnapshot
if snapshot { if snapshot {
@@ -261,6 +269,8 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
s.findCluster(discoverURL, peers) s.findCluster(discoverURL, peers)
s.stopNotify = make(chan bool)
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool) s.closeChan = make(chan bool)
s.startRoutine(s.monitorSync) s.startRoutine(s.monitorSync)
@@ -276,16 +286,57 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
return nil return nil
} }
// Stop stops the server gracefully.
func (s *PeerServer) Stop() { func (s *PeerServer) Stop() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if !s.started {
if s.closeChan != nil { return
close(s.closeChan)
} }
s.started = false
close(s.closeChan)
// TODO(yichengq): it should also call async stop for raft server,
// but this functionality has not been implemented.
s.raftServer.Stop() s.raftServer.Stop()
s.routineGroup.Wait() s.routineGroup.Wait()
s.closeChan = nil close(s.stopNotify)
}
// asyncRemove stops the server in peer mode.
// It is called to stop the server internally when it has been removed
// from the cluster.
// The function triggers the stop action first to notice server that it
// should not continue, and wait for its stop in separate goroutine because
// the caller should also exit.
func (s *PeerServer) asyncRemove() {
s.Lock()
if !s.started {
s.Unlock()
return
}
s.started = false
close(s.closeChan)
// TODO(yichengq): it should also call async stop for raft server,
// but this functionality has not been implemented.
go func() {
s.raftServer.Stop()
s.routineGroup.Wait()
close(s.removeNotify)
s.Unlock()
}()
}
// StopNotify notifies the server is stopped.
func (s *PeerServer) StopNotify() <-chan bool {
return s.stopNotify
}
// RemoveNotify notifies the server is removed from peer mode due to
// removal from the cluster.
func (s *PeerServer) RemoveNotify() <-chan bool {
return s.removeNotify
} }
func (s *PeerServer) HTTPHandler() http.Handler { func (s *PeerServer) HTTPHandler() http.Handler {

View File

@@ -2,7 +2,6 @@ package server
import ( import (
"encoding/binary" "encoding/binary"
"os"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft" "github.com/coreos/etcd/third_party/github.com/goraft/raft"
@@ -65,7 +64,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) {
// command and need to be removed // command and need to be removed
if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
log.Debugf("server [%s] is removed", context.Server().Name()) log.Debugf("server [%s] is removed", context.Server().Name())
os.Exit(0) ps.asyncRemove()
} else { } else {
// else ignore remove // else ignore remove
log.Debugf("ignore previous remove command.") log.Debugf("ignore previous remove command.")