From 49078c683bb52e0440c6fdfc9671022f8cc6ef39 Mon Sep 17 00:00:00 2001 From: Sam Batschelet Date: Fri, 19 Feb 2021 08:19:57 -0500 Subject: [PATCH] *: add support for socket options Signed-off-by: Sam Batschelet --- pkg/transport/listener.go | 34 +++++++++++-- pkg/transport/listener_test.go | 70 ++++++++++++++++++++++++++ pkg/transport/sockopt.go | 45 +++++++++++++++++ pkg/transport/sockopt_unix.go | 20 ++++++++ pkg/transport/sockopt_windows.go | 18 +++++++ pkg/transport/timeout_listener.go | 25 ++++++--- server/embed/config.go | 5 ++ server/embed/etcd.go | 14 ++++-- server/etcdmain/config.go | 2 + server/etcdmain/help.go | 4 ++ server/etcdserver/api/rafthttp/util.go | 4 ++ server/etcdserver/config.go | 3 ++ 12 files changed, 232 insertions(+), 12 deletions(-) create mode 100644 pkg/transport/sockopt.go create mode 100644 pkg/transport/sockopt_unix.go create mode 100644 pkg/transport/sockopt_windows.go diff --git a/pkg/transport/listener.go b/pkg/transport/listener.go index df9a895bb..31ba4876f 100644 --- a/pkg/transport/listener.go +++ b/pkg/transport/listener.go @@ -15,6 +15,7 @@ package transport import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/rand" @@ -39,18 +40,34 @@ import ( // NewListener creates a new listner. func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) { - if l, err = newListener(addr, scheme); err != nil { + if l, err = newListener(addr, scheme, nil); err != nil { return nil, err } return wrapTLS(scheme, tlsinfo, l) } -func newListener(addr string, scheme string) (net.Listener, error) { +// NewListenerWithSocketOpts creates new listener with support for socket options. +func NewListenerWithSocketOpts(addr, scheme string, tlsinfo *TLSInfo, sopts *SocketOpts) (net.Listener, error) { + ln, err := newListener(addr, scheme, sopts) + if err != nil { + return nil, err + } + if tlsinfo != nil { + wrapTLS(scheme, tlsinfo, ln) + } + return ln, nil +} + +func newListener(addr string, scheme string, sopts *SocketOpts) (net.Listener, error) { if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr return NewUnixListener(addr) } - return net.Listen("tcp", addr) + config, err := newListenConfig(sopts) + if err != nil { + return nil, err + } + return config.Listen(context.TODO(), "tcp", addr) } func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) { @@ -63,6 +80,17 @@ func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, err return newTLSListener(l, tlsinfo, checkSAN) } +func newListenConfig(sopts *SocketOpts) (net.ListenConfig, error) { + lc := net.ListenConfig{} + if sopts != nil { + ctls := getControls(sopts) + if len(ctls) > 0 { + lc.Control = ctls.Control + } + } + return lc, nil +} + type TLSInfo struct { CertFile string KeyFile string diff --git a/pkg/transport/listener_test.go b/pkg/transport/listener_test.go index a34d97055..b79f34a31 100644 --- a/pkg/transport/listener_test.go +++ b/pkg/transport/listener_test.go @@ -61,6 +61,58 @@ func TestNewListenerTLSInfo(t *testing.T) { testNewListenerTLSInfoAccept(t, *tlsInfo) } +func TestNewListenerWithSocketOpts(t *testing.T) { + tlsInfo, del, err := createSelfCert() + if err != nil { + t.Fatalf("unable to create cert: %v", err) + } + defer del() + tests := map[string]struct { + socketOpts *SocketOpts + expectedErr bool + }{ + "nil": { + socketOpts: nil, + expectedErr: true, + }, + "empty": { + socketOpts: &SocketOpts{}, + expectedErr: true, + }, + "reuse address": { + socketOpts: &SocketOpts{ReuseAddress: true}, + expectedErr: true, + }, + "reuse address and reuse port": { + socketOpts: &SocketOpts{ReuseAddress: true, ReusePort: true}, + expectedErr: false, + }, + "reuse port": { + socketOpts: &SocketOpts{ReusePort: true}, + expectedErr: false, + }, + } + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + ln, err := NewListenerWithSocketOpts("127.0.0.1:0", "https", tlsInfo, test.socketOpts) + if err != nil { + t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err) + } + defer ln.Close() + ln2, err := NewListenerWithSocketOpts(ln.Addr().String(), "https", tlsInfo, test.socketOpts) + if test.expectedErr && err == nil { + t.Fatalf("expected error") + } + if !test.expectedErr && err != nil { + t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err) + } + if ln2 != nil { + ln2.Close() + } + }) + } +} + func testNewListenerTLSInfoAccept(t *testing.T, tlsInfo TLSInfo) { ln, err := NewListener("127.0.0.1:0", "https", &tlsInfo) if err != nil { @@ -401,3 +453,21 @@ func TestIsClosedConnError(t *testing.T) { t.Fatalf("expect true, got false (%v)", err) } } + +func TestSocktOptsEmpty(t *testing.T) { + tests := []struct { + sopts SocketOpts + want bool + }{ + {SocketOpts{}, true}, + {SocketOpts{ReuseAddress: true, ReusePort: false}, false}, + {SocketOpts{ReusePort: true}, false}, + } + + for i, tt := range tests { + got := tt.sopts.Empty() + if tt.want != got { + t.Errorf("#%d: result of Empty() incorrect: want=%t got=%t", i, tt.want, got) + } + } +} diff --git a/pkg/transport/sockopt.go b/pkg/transport/sockopt.go new file mode 100644 index 000000000..9941048a8 --- /dev/null +++ b/pkg/transport/sockopt.go @@ -0,0 +1,45 @@ +package transport + +import ( + "syscall" +) + +type Controls []func(network, addr string, conn syscall.RawConn) error + +func (ctls Controls) Control(network, addr string, conn syscall.RawConn) error { + for _, s := range ctls { + if err := s(network, addr, conn); err != nil { + return err + } + } + return nil +} + +type SocketOpts struct { + // ReusePort enables socket option SO_REUSEPORT [1] which allows rebind of + // a port already in use. User should keep in mind that flock can fail + // in which case lock on data file could result in unexpected + // condition. User should take caution to protect against lock race. + // [1] https://man7.org/linux/man-pages/man7/socket.7.html + ReusePort bool + // ReuseAddress enables a socket option SO_REUSEADDR which allows + // binding to an address in `TIME_WAIT` state. Useful to improve MTTR + // in cases where etcd slow to restart due to excessive `TIME_WAIT`. + // [1] https://man7.org/linux/man-pages/man7/socket.7.html + ReuseAddress bool +} + +func getControls(sopts *SocketOpts) Controls { + ctls := Controls{} + if sopts.ReuseAddress { + ctls = append(ctls, setReuseAddress) + } + if sopts.ReusePort { + ctls = append(ctls, setReusePort) + } + return ctls +} + +func (sopts *SocketOpts) Empty() bool { + return sopts.ReuseAddress == false && sopts.ReusePort == false +} diff --git a/pkg/transport/sockopt_unix.go b/pkg/transport/sockopt_unix.go new file mode 100644 index 000000000..a3322fded --- /dev/null +++ b/pkg/transport/sockopt_unix.go @@ -0,0 +1,20 @@ +// +build !windows + +package transport + +import ( + "golang.org/x/sys/unix" + "syscall" +) + +func setReusePort(network, address string, conn syscall.RawConn) error { + return conn.Control(func(fd uintptr) { + syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }) +} + +func setReuseAddress(network, address string, conn syscall.RawConn) error { + return conn.Control(func(fd uintptr) { + syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1) + }) +} diff --git a/pkg/transport/sockopt_windows.go b/pkg/transport/sockopt_windows.go new file mode 100644 index 000000000..000077991 --- /dev/null +++ b/pkg/transport/sockopt_windows.go @@ -0,0 +1,18 @@ +// +build windows + +package transport + +import ( + "fmt" + "syscall" +) + +func setReusePort(network, address string, c syscall.RawConn) error { + return fmt.Errorf("port reuse is not supported on Windows") +} + +// Windows supports SO_REUSEADDR, but it may cause undefined behavior, as +// there is no protection against port hijacking. +func setReuseAddress(network, addr string, conn syscall.RawConn) error { + return fmt.Errorf("address reuse is not supported on Windows") +} diff --git a/pkg/transport/timeout_listener.go b/pkg/transport/timeout_listener.go index 273e99fe0..29a62d997 100644 --- a/pkg/transport/timeout_listener.go +++ b/pkg/transport/timeout_listener.go @@ -23,19 +23,32 @@ import ( // If read/write on the accepted connection blocks longer than its time limit, // it will return timeout error. func NewTimeoutListener(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (net.Listener, error) { - ln, err := newListener(addr, scheme) + ln, err := newListener(addr, scheme, nil) if err != nil { return nil, err } - ln = &rwTimeoutListener{ + return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo) +} + +// NewTimeoutListerWithSocketOpts returns a listener that listens on the given address. +// If read/write on the accepted connection blocks longer than its time limit, +// it will return timeout error. Socket options can be passed and will be applied to the +// ListenConfig. +func NewTimeoutListerWithSocketOpts(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration, sopts *SocketOpts) (net.Listener, error) { + ln, err := newListener(addr, scheme, sopts) + if err != nil { + return nil, err + } + return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo) +} + +func newTimeoutListener(ln net.Listener, scheme string, rdtimeoutd, wtimeoutd time.Duration, tlsinfo *TLSInfo) (net.Listener, error) { + timeoutListener := &rwTimeoutListener{ Listener: ln, rdtimeoutd: rdtimeoutd, wtimeoutd: wtimeoutd, } - if ln, err = wrapTLS(scheme, tlsinfo, ln); err != nil { - return nil, err - } - return ln, nil + return wrapTLS(scheme, tlsinfo, timeoutListener) } type rwTimeoutListener struct { diff --git a/server/embed/config.go b/server/embed/config.go index e91518cbc..5558254c1 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -232,6 +232,9 @@ type Config struct { // before closing a non-responsive connection. 0 to disable. GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` + // SocketOpts are socket options passed to listener config. + SocketOpts transport.SocketOpts + // PreVote is true to enable Raft Pre-Vote. // If enabled, Raft runs an additional election phase // to check whether it would get enough votes to win @@ -398,6 +401,8 @@ func NewConfig() *Config { GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval, GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout, + SocketOpts: transport.SocketOpts{}, + TickMs: 100, ElectionMs: 1000, InitialElectionTickAdvance: true, diff --git a/server/embed/etcd.go b/server/embed/etcd.go index c5b99a9ff..3d75bf1b2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -110,6 +110,13 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e = nil }() + if !cfg.SocketOpts.Empty() { + cfg.logger.Info( + "configuring socket options", + zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress), + zap.Bool("reuse-port", cfg.SocketOpts.ReusePort), + ) + } e.cfg.logger.Info( "configuring peer listeners", zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), @@ -181,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + SocketOpts: cfg.SocketOpts, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, AuthToken: cfg.AuthToken, @@ -458,7 +466,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { } } peers[i] = &peerListener{close: func(context.Context) error { return nil }} - peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo) + peers[i].Listener, err = rafthttp.NewListenerWithSocketOpts(u, &cfg.PeerTLSInfo, &cfg.SocketOpts) if err != nil { return nil, err } @@ -565,7 +573,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro continue } - if sctx.l, err = net.Listen(network, addr); err != nil { + if sctx.l, err = transport.NewListenerWithSocketOpts(addr, u.Scheme, nil, &cfg.SocketOpts); err != nil { return nil, err } // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking @@ -678,7 +686,7 @@ func (e *Etcd) serveMetrics() (err error) { if murl.Scheme == "http" { tlsInfo = nil } - ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo) + ml, err := transport.NewListenerWithSocketOpts(murl.Host, murl.Scheme, tlsInfo, &e.cfg.SocketOpts) if err != nil { return err } diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index ef7dcc283..ce62ae69e 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -160,6 +160,8 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.ec.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.") fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).") fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).") + fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") + fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") // clustering fs.Var( diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 0834881ab..1579f5558 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -85,6 +85,10 @@ Member: Frequency duration of server-to-client ping to check if a connection is alive (0 to disable). --grpc-keepalive-timeout '20s' Additional duration of wait before closing a non-responsive connection (0 to disable). + --socket-reuse-port 'false' + Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use. + --socket-reuse-address 'false' + Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in TIME_WAIT state. Clustering: --initial-advertise-peer-urls 'http://localhost:2380' diff --git a/server/etcdserver/api/rafthttp/util.go b/server/etcdserver/api/rafthttp/util.go index 37bdac8e6..905974451 100644 --- a/server/etcdserver/api/rafthttp/util.go +++ b/server/etcdserver/api/rafthttp/util.go @@ -42,6 +42,10 @@ func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) { return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout) } +func NewListenerWithSocketOpts(u url.URL, tlsinfo *transport.TLSInfo, sopts *transport.SocketOpts) (net.Listener, error) { + return transport.NewTimeoutListerWithSocketOpts(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout, sopts) +} + // NewRoundTripper returns a roundTripper used to send requests // to rafthttp listener of remote peers. func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { diff --git a/server/etcdserver/config.go b/server/etcdserver/config.go index 49fe04005..4a6716509 100644 --- a/server/etcdserver/config.go +++ b/server/etcdserver/config.go @@ -138,6 +138,9 @@ type ServerConfig struct { // PreVote is true to enable Raft Pre-Vote. PreVote bool + // SocketOpts are socket options passed to listener config. + SocketOpts transport.SocketOpts + // Logger logs server-side operations. // If not nil, it disables "capnslog" and uses the given logger. Logger *zap.Logger