embed: split peer/client/metrics serve methods

Priliminary commit to start client server later.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee
2017-11-20 09:35:39 -08:00
parent 08434d0665
commit 75ababa61f

View File

@@ -39,8 +39,8 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@@ -64,15 +64,17 @@ const (
// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
Peers []*peerListener
Clients []net.Listener
// a map of contexts for the servers that serves client requests.
sctxs map[string]*serveCtx
metricsListeners []net.Listener
Server *etcdserver.EtcdServer
Server *etcdserver.EtcdServer
cfg Config
stopc chan struct{}
errc chan error
sctxs map[string]*serveCtx
closeOnce sync.Once
}
@@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e.Server.Start()
// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return e, err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}
if err = e.serve(); err != nil {
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
serving = true
return e, nil
}
@@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
return peers, nil
}
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}
// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}
func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil {
plog.Fatalf("could not get certs (%v)", err)
@@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}
func (e *Etcd) serve() (err error) {
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}
@@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}
// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
@@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
// start client servers in a goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}
return nil
}
func (e *Etcd) serveMetrics() (err error) {
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
@@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) {
}(murl, ml)
}
}
return nil
}