mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
parent
5015480e0c
commit
f292a4c953
@ -20,6 +20,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
@ -54,8 +55,11 @@ type Etcd struct {
|
||||
Server *etcdserver.EtcdServer
|
||||
|
||||
cfg Config
|
||||
stopc chan struct{}
|
||||
errc chan error
|
||||
sctxs map[string]*serveCtx
|
||||
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
@ -65,7 +69,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
if err = inCfg.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e = &Etcd{cfg: *inCfg}
|
||||
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
|
||||
cfg := &e.cfg
|
||||
defer func() {
|
||||
if e != nil && err != nil {
|
||||
@ -141,6 +145,8 @@ func (e *Etcd) Config() Config {
|
||||
}
|
||||
|
||||
func (e *Etcd) Close() {
|
||||
e.closeOnce.Do(func() { close(e.stopc) })
|
||||
|
||||
for _, sctx := range e.sctxs {
|
||||
sctx.cancel()
|
||||
}
|
||||
@ -319,7 +325,7 @@ func (e *Etcd) serve() (err error) {
|
||||
ph := v2http.NewPeerHandler(e.Server)
|
||||
for _, l := range e.Peers {
|
||||
go func(l net.Listener) {
|
||||
e.errc <- servePeerHTTP(l, ph)
|
||||
e.errHandler(servePeerHTTP(l, ph))
|
||||
}(l)
|
||||
}
|
||||
|
||||
@ -335,8 +341,20 @@ func (e *Etcd) serve() (err error) {
|
||||
// read timeout does not work with http close notify
|
||||
// TODO: https://github.com/golang/go/issues/9524
|
||||
go func(s *serveCtx) {
|
||||
e.errc <- s.serve(e.Server, ctlscfg, v2h, e.errc)
|
||||
e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler))
|
||||
}(sctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Etcd) errHandler(err error) {
|
||||
select {
|
||||
case <-e.stopc:
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-e.stopc:
|
||||
case e.errc <- err:
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func newServeCtx() *serveCtx {
|
||||
// serve accepts incoming connections on the listener l,
|
||||
// creating a new service goroutine for each. The service goroutines
|
||||
// read requests and then call handler to reply to them.
|
||||
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error {
|
||||
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error {
|
||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||
<-s.ReadyNotify()
|
||||
plog.Info("ready to serve client requests")
|
||||
@ -76,7 +76,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
|
||||
sctx.serviceRegister(gs)
|
||||
}
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
go func() { errc <- gs.Serve(grpcl) }()
|
||||
go func() { errHandler(gs.Serve(grpcl)) }()
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
@ -93,7 +93,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
httpl := m.Match(cmux.HTTP1())
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
go func() { errHandler(srvhttp.Serve(httpl)) }()
|
||||
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
|
||||
}
|
||||
|
||||
@ -124,7 +124,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
|
||||
TLSConfig: tlscfg,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
go func() { errc <- srv.Serve(tlsl) }()
|
||||
go func() { errHandler(srv.Serve(tlsl)) }()
|
||||
|
||||
plog.Infof("serving client requests on %s", sctx.l.Addr().String())
|
||||
}
|
||||
|
@ -94,6 +94,11 @@ func TestEmbedEtcd(t *testing.T) {
|
||||
t.Errorf("%d: expected %d clients, got %d", i, tt.wclients, len(e.Clients))
|
||||
}
|
||||
e.Close()
|
||||
select {
|
||||
case err := <-e.Err():
|
||||
t.Errorf("#%d: unexpected error on close (%v)", i, err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user