mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14356 from ahrtr/functional_test_20220816
Minor refactoring on the keepAliveListener and keepAliveConn
This commit is contained in:
commit
ba0c7c31a7
@ -17,6 +17,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
### Deprecations
|
||||
|
||||
- Deprecated [V2 discovery](https://etcd.io/docs/v3.5/dev-internal/discovery_protocol/).
|
||||
- Deprecated [SetKeepAlive and SetKeepAlivePeriod in limitListenerConn](https://github.com/etcd-io/etcd/pull/14356).
|
||||
- Removed [etcdctl defrag --data-dir](https://github.com/etcd-io/etcd/pull/13793).
|
||||
- Removed [etcdctl snapshot status](https://github.com/etcd-io/etcd/pull/13809).
|
||||
- Removed [etcdctl snapshot restore](https://github.com/etcd-io/etcd/pull/13809).
|
||||
|
@ -21,26 +21,29 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type keepAliveConn interface {
|
||||
SetKeepAlive(bool) error
|
||||
SetKeepAlivePeriod(d time.Duration) error
|
||||
}
|
||||
|
||||
// NewKeepAliveListener returns a listener that listens on the given address.
|
||||
// Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
|
||||
// Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
|
||||
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
|
||||
//
|
||||
// Note(ahrtr):
|
||||
// only `net.TCPConn` supports `SetKeepAlive` and `SetKeepAlivePeriod`
|
||||
// by default, so if you want to wrap multiple layers of net.Listener,
|
||||
// the `keepaliveListener` should be the one which is closest to the
|
||||
// original `net.Listener` implementation, namely `TCPListener`.
|
||||
func NewKeepAliveListener(l net.Listener, scheme string, tlscfg *tls.Config) (net.Listener, error) {
|
||||
kal := &keepaliveListener{
|
||||
Listener: l,
|
||||
}
|
||||
|
||||
if scheme == "https" {
|
||||
if tlscfg == nil {
|
||||
return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented")
|
||||
}
|
||||
return newTLSKeepaliveListener(l, tlscfg), nil
|
||||
return newTLSKeepaliveListener(kal, tlscfg), nil
|
||||
}
|
||||
|
||||
return &keepaliveListener{
|
||||
Listener: l,
|
||||
}, nil
|
||||
return kal, nil
|
||||
}
|
||||
|
||||
type keepaliveListener struct{ net.Listener }
|
||||
@ -50,13 +53,43 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kac := c.(keepAliveConn)
|
||||
|
||||
kac, err := createKeepaliveConn(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create keepalive connection failed, %w", err)
|
||||
}
|
||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||
// default on linux: 30 + 8 * 30
|
||||
// default on osx: 30 + 8 * 75
|
||||
kac.SetKeepAlive(true)
|
||||
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||
return c, nil
|
||||
if err := kac.SetKeepAlive(true); err != nil {
|
||||
return nil, fmt.Errorf("SetKeepAlive failed, %w", err)
|
||||
}
|
||||
if err := kac.SetKeepAlivePeriod(30 * time.Second); err != nil {
|
||||
return nil, fmt.Errorf("SetKeepAlivePeriod failed, %w", err)
|
||||
}
|
||||
return kac, nil
|
||||
}
|
||||
|
||||
func createKeepaliveConn(c net.Conn) (*keepAliveConn, error) {
|
||||
tcpc, ok := c.(*net.TCPConn)
|
||||
if !ok {
|
||||
return nil, ErrNotTCP
|
||||
}
|
||||
return &keepAliveConn{tcpc}, nil
|
||||
}
|
||||
|
||||
type keepAliveConn struct {
|
||||
*net.TCPConn
|
||||
}
|
||||
|
||||
// SetKeepAlive sets keepalive
|
||||
func (l *keepAliveConn) SetKeepAlive(doKeepAlive bool) error {
|
||||
return l.TCPConn.SetKeepAlive(doKeepAlive)
|
||||
}
|
||||
|
||||
// SetKeepAlivePeriod sets keepalive period
|
||||
func (l *keepAliveConn) SetKeepAlivePeriod(d time.Duration) error {
|
||||
return l.TCPConn.SetKeepAlivePeriod(d)
|
||||
}
|
||||
|
||||
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
|
||||
@ -72,12 +105,7 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
kac := c.(keepAliveConn)
|
||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||
// default on linux: 30 + 8 * 30
|
||||
// default on osx: 30 + 8 * 75
|
||||
kac.SetKeepAlive(true)
|
||||
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||
|
||||
c = tls.Server(c, l.config)
|
||||
return c, nil
|
||||
}
|
||||
|
@ -40,6 +40,9 @@ func TestNewKeepAliveListener(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected Accept error: %v", err)
|
||||
}
|
||||
if _, ok := conn.(*keepAliveConn); !ok {
|
||||
t.Fatalf("Unexpected conn type: %T, wanted *keepAliveConn", conn)
|
||||
}
|
||||
conn.Close()
|
||||
ln.Close()
|
||||
|
||||
|
@ -63,6 +63,9 @@ func (l *limitListenerConn) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// SetKeepAlive sets keepalive
|
||||
//
|
||||
// Deprecated: use (*keepAliveConn) SetKeepAlive instead.
|
||||
func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
|
||||
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
@ -71,6 +74,9 @@ func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
|
||||
return tcpc.SetKeepAlive(doKeepAlive)
|
||||
}
|
||||
|
||||
// SetKeepAlivePeriod sets keepalive period
|
||||
//
|
||||
// Deprecated: use (*keepAliveConn) SetKeepAlivePeriod instead.
|
||||
func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
|
||||
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
|
@ -68,7 +68,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
|
||||
fallthrough
|
||||
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
|
||||
// timeout listener with socket options.
|
||||
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr)
|
||||
ln, err := newKeepAliveListener(&lnOpts.ListenConfig, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -78,7 +78,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
|
||||
writeTimeout: lnOpts.writeTimeout,
|
||||
}
|
||||
case lnOpts.IsTimeout():
|
||||
ln, err := net.Listen("tcp", addr)
|
||||
ln, err := newKeepAliveListener(nil, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -88,7 +88,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
|
||||
writeTimeout: lnOpts.writeTimeout,
|
||||
}
|
||||
default:
|
||||
ln, err := net.Listen("tcp", addr)
|
||||
ln, err := newKeepAliveListener(nil, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -102,6 +102,19 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
|
||||
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
|
||||
}
|
||||
|
||||
func newKeepAliveListener(cfg *net.ListenConfig, addr string) (ln net.Listener, err error) {
|
||||
if cfg != nil {
|
||||
ln, err = cfg.Listen(context.TODO(), "tcp", addr)
|
||||
} else {
|
||||
ln, err = net.Listen("tcp", addr)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return NewKeepAliveListener(ln, "tcp", nil)
|
||||
}
|
||||
|
||||
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
|
||||
if scheme != "https" && scheme != "unixs" {
|
||||
return l, nil
|
||||
|
@ -205,6 +205,15 @@ func TestNewListenerWithSocketOpts(t *testing.T) {
|
||||
if !test.expectedErr && err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if test.scheme == "http" {
|
||||
lnOpts := newListenOpts(test.opts...)
|
||||
if !lnOpts.IsSocketOpts() && !lnOpts.IsTimeout() {
|
||||
if _, ok := ln.(*keepaliveListener); !ok {
|
||||
t.Fatalf("ln: unexpected listener type: %T, wanted *keepaliveListener", ln)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -666,12 +666,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
|
||||
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
|
||||
}
|
||||
|
||||
if network == "tcp" {
|
||||
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
defer func(u url.URL) {
|
||||
if err == nil {
|
||||
return
|
||||
|
@ -125,7 +125,7 @@ func (srv *Server) createEtcd(fromSnapshot bool, failpoints string) error {
|
||||
func (srv *Server) runEtcd() error {
|
||||
errc := make(chan error)
|
||||
go func() {
|
||||
time.Sleep(5 * time.Second)
|
||||
time.Sleep(1 * time.Second)
|
||||
// server advertise client/peer listener had to start first
|
||||
// before setting up proxy listener
|
||||
errc <- srv.startProxy()
|
||||
@ -137,17 +137,19 @@ func (srv *Server) runEtcd() error {
|
||||
zap.String("command-path", srv.etcdCmd.Path),
|
||||
)
|
||||
err := srv.etcdCmd.Start()
|
||||
perr := <-errc
|
||||
|
||||
srv.lg.Info(
|
||||
"started etcd command",
|
||||
zap.String("command-path", srv.etcdCmd.Path),
|
||||
zap.Strings("command-args", srv.etcdCmd.Args),
|
||||
zap.Errors("errors", []error{err, perr}),
|
||||
zap.Strings("envs", srv.etcdCmd.Env),
|
||||
zap.Error(err),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return perr
|
||||
|
||||
return <-errc
|
||||
}
|
||||
|
||||
select {
|
||||
@ -218,6 +220,11 @@ func (srv *Server) startProxy() error {
|
||||
return err
|
||||
}
|
||||
|
||||
srv.lg.Info("Checking client target's connectivity", zap.String("target", listenClientURL.Host))
|
||||
if err := checkTCPConnect(srv.lg, listenClientURL.Host); err != nil {
|
||||
return fmt.Errorf("check client target failed, %w", err)
|
||||
}
|
||||
|
||||
srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
|
||||
srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
|
||||
Logger: srv.lg,
|
||||
@ -226,6 +233,7 @@ func (srv *Server) startProxy() error {
|
||||
})
|
||||
select {
|
||||
case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
|
||||
srv.lg.Info("starting client proxy failed", zap.Error(err))
|
||||
return err
|
||||
case <-time.After(2 * time.Second):
|
||||
srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
|
||||
@ -242,6 +250,11 @@ func (srv *Server) startProxy() error {
|
||||
return err
|
||||
}
|
||||
|
||||
srv.lg.Info("Checking peer target's connectivity", zap.String("target", listenPeerURL.Host))
|
||||
if err := checkTCPConnect(srv.lg, listenPeerURL.Host); err != nil {
|
||||
return fmt.Errorf("check peer target failed, %w", err)
|
||||
}
|
||||
|
||||
srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
|
||||
srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
|
||||
Logger: srv.lg,
|
||||
@ -250,6 +263,7 @@ func (srv *Server) startProxy() error {
|
||||
})
|
||||
select {
|
||||
case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
|
||||
srv.lg.Info("starting peer proxy failed", zap.Error(err))
|
||||
return err
|
||||
case <-time.After(2 * time.Second):
|
||||
srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
|
||||
|
@ -126,6 +126,23 @@ func loadFileData(filePath string) ([]byte, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func checkTCPConnect(lg *zap.Logger, target string) error {
|
||||
for i := 0; i < 10; i++ {
|
||||
if conn, err := net.Dial("tcp", target); err != nil {
|
||||
lg.Error("The target isn't reachable", zap.Int("retries", i), zap.String("target", target), zap.Error(err))
|
||||
} else {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
lg.Info("The target is reachable", zap.Int("retries", i), zap.String("target", target))
|
||||
return nil
|
||||
}
|
||||
lg.Error("The target isn't reachable due to the returned conn is nil", zap.Int("retries", i), zap.String("target", target))
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return fmt.Errorf("timed out waiting for the target (%s) to be reachable", target)
|
||||
}
|
||||
|
||||
func cleanPageCache() error {
|
||||
// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
|
||||
// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c
|
||||
|
Loading…
x
Reference in New Issue
Block a user