etcd/pkg/proxy/server.go
Chun-Hung Tseng fd967e08d0 Remove LatencyAccept of the reverse proxy from the e2e test
Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737

During the development of https://github.com/etcd-io/etcd/pull/17938,
we agreed that during the transition to L7 forward proxy, unused
features and features targeting L4 reverse proxy will be dropped.

This feature falls under the unused feature. Also, the initial
implementation has a bug: if connections are not created continuously,
the latency accept will not work. Consider the following case:
a) set latency accept
b) put latency accept into effect
c) latency accept will start idling the goroutine
d) block-wait at accept() - waiting for new connections
e) new connection comes in - establish it
f) go to c -> as we can see, if the request come every x seconds, where
x is larger than the latency accept time we set, we can see that the
latency accept has no effect.

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
2024-09-26 22:40:51 +02:00

868 lines
18 KiB
Go

// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"errors"
"fmt"
"io"
mrand "math/rand"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
humanize "github.com/dustin/go-humanize"
"go.uber.org/zap"
"go.etcd.io/etcd/client/pkg/v3/transport"
)
var (
defaultDialTimeout = 3 * time.Second
defaultBufferSize = 48 * 1024
defaultRetryInterval = 10 * time.Millisecond
)
// Server defines proxy server layer that simulates common network faults:
// latency spikes and packet drop or corruption. The proxy overhead is very
// small overhead (<500μs per request). Please run tests to compute actual
// overhead.
type Server interface {
// From returns proxy source address in "scheme://host:port" format.
From() string
// To returns proxy destination address in "scheme://host:port" format.
To() string
// Ready returns when proxy is ready to serve.
Ready() <-chan struct{}
// Done returns when proxy has been closed.
Done() <-chan struct{}
// Error sends errors while serving proxy.
Error() <-chan error
// Close closes listener and transport.
Close() error
// DelayTx adds latency ± random variable for "outgoing" traffic
// in "sending" layer.
DelayTx(latency, rv time.Duration)
// UndelayTx removes sending latencies.
UndelayTx()
// LatencyTx returns current send latency.
LatencyTx() time.Duration
// DelayRx adds latency ± random variable for "incoming" traffic
// in "receiving" layer.
DelayRx(latency, rv time.Duration)
// UndelayRx removes "receiving" latencies.
UndelayRx()
// LatencyRx returns current receive latency.
LatencyRx() time.Duration
// ModifyTx alters/corrupts/drops "outgoing" packets from the listener
// with the given edit function.
ModifyTx(f func(data []byte) []byte)
// UnmodifyTx removes modify operation on "forwarding".
UnmodifyTx()
// ModifyRx alters/corrupts/drops "incoming" packets to client
// with the given edit function.
ModifyRx(f func(data []byte) []byte)
// UnmodifyRx removes modify operation on "receiving".
UnmodifyRx()
// BlackholeTx drops all "outgoing" packets before "forwarding".
// "BlackholeTx" operation is a wrapper around "ModifyTx" with
// a function that returns empty bytes.
BlackholeTx()
// UnblackholeTx removes blackhole operation on "sending".
UnblackholeTx()
// BlackholeRx drops all "incoming" packets to client.
// "BlackholeRx" operation is a wrapper around "ModifyRx" with
// a function that returns empty bytes.
BlackholeRx()
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()
// ResetListener closes and restarts listener.
ResetListener() error
}
// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
}
type server struct {
lg *zap.Logger
from url.URL
fromPort int
to url.URL
toPort int
tlsInfo transport.TLSInfo
dialTimeout time.Duration
bufferSize int
retryInterval time.Duration
readyc chan struct{}
donec chan struct{}
errc chan error
closeOnce sync.Once
closeWg sync.WaitGroup
listenerMu sync.RWMutex
listener net.Listener
modifyTxMu sync.RWMutex
modifyTx func(data []byte) []byte
modifyRxMu sync.RWMutex
modifyRx func(data []byte) []byte
pauseTxMu sync.Mutex
pauseTxc chan struct{}
pauseRxMu sync.Mutex
pauseRxc chan struct{}
latencyTxMu sync.RWMutex
latencyTx time.Duration
latencyRxMu sync.RWMutex
latencyRx time.Duration
}
// NewServer returns a proxy implementation with no iptables/tc dependencies.
// The proxy layer overhead is <1ms.
func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,
from: cfg.From,
to: cfg.To,
tlsInfo: cfg.TLSInfo,
dialTimeout: cfg.DialTimeout,
bufferSize: cfg.BufferSize,
retryInterval: cfg.RetryInterval,
readyc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 16),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}
_, fromPort, err := net.SplitHostPort(cfg.From.Host)
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
if s.dialTimeout == 0 {
s.dialTimeout = defaultDialTimeout
}
if s.bufferSize == 0 {
s.bufferSize = defaultBufferSize
}
if s.retryInterval == 0 {
s.retryInterval = defaultRetryInterval
}
close(s.pauseTxc)
close(s.pauseRxc)
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
addr := fmt.Sprintf(":%d", s.fromPort)
if s.fromPort == 0 { // unix
addr = s.from.Host
}
var ln net.Listener
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
} else {
ln, err = net.Listen(s.from.Scheme, addr)
}
if err != nil {
s.errc <- err
s.Close()
return s
}
s.listener = ln
s.closeWg.Add(1)
go s.listenAndServe()
s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))
return s
}
func (s *server) From() string {
return fmt.Sprintf("%s://%s", s.from.Scheme, s.from.Host)
}
func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
// TODO: implement packet reordering from multiple TCP connections
// buffer packets per connection for awhile, reorder before transmit
// - https://github.com/etcd-io/etcd/issues/5614
// - https://github.com/etcd-io/etcd/pull/6918#issuecomment-264093034
func (s *server) listenAndServe() {
defer s.closeWg.Done()
ctx := context.Background()
s.lg.Info("proxy is listening on", zap.String("from", s.From()))
close(s.readyc)
for {
s.listenerMu.RLock()
ln := s.listener
s.listenerMu.RUnlock()
in, err := ln.Accept()
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Debug("listener accept error", zap.Error(err))
if strings.HasSuffix(err.Error(), "use of closed network connection") {
select {
case <-time.After(s.retryInterval):
case <-s.donec:
return
}
s.lg.Debug("listener is closed; retry listening on", zap.String("from", s.From()))
if err = s.ResetListener(); err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Warn("failed to reset listener", zap.Error(err))
}
}
continue
}
var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Debug("failed to dial", zap.Error(err))
continue
}
s.closeWg.Add(2)
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection
s.transmit(out, in)
out.Close()
in.Close()
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.receive(in, out)
in.Close()
out.Close()
}()
}
}
func (s *server) transmit(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyTx)
}
func (s *server) receive(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyRx)
}
type proxyType uint8
const (
proxyTx proxyType = iota
proxyRx
)
func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
return
}
// connection already closed
if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
return
}
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Debug("failed to read", zap.Error(err))
return
}
if nr1 == 0 {
return
}
data := buf[:nr1]
// alters/corrupts/drops data
switch ptype {
case proxyTx:
s.modifyTxMu.RLock()
if s.modifyTx != nil {
data = s.modifyTx(data)
}
s.modifyTxMu.RUnlock()
case proxyRx:
s.modifyRxMu.RLock()
if s.modifyRx != nil {
data = s.modifyRx(data)
}
s.modifyRxMu.RUnlock()
default:
panic("unknown proxy type")
}
nr2 := len(data)
switch ptype {
case proxyTx:
s.lg.Debug(
"modified tx",
zap.String("data-received", humanize.Bytes(uint64(nr1))),
zap.String("data-modified", humanize.Bytes(uint64(nr2))),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
case proxyRx:
s.lg.Debug(
"modified rx",
zap.String("data-received", humanize.Bytes(uint64(nr1))),
zap.String("data-modified", humanize.Bytes(uint64(nr2))),
zap.String("from", s.To()),
zap.String("to", s.From()),
)
default:
panic("unknown proxy type")
}
// pause before packet dropping, blocking, and forwarding
var pausec chan struct{}
switch ptype {
case proxyTx:
s.pauseTxMu.Lock()
pausec = s.pauseTxc
s.pauseTxMu.Unlock()
case proxyRx:
s.pauseRxMu.Lock()
pausec = s.pauseRxc
s.pauseRxMu.Unlock()
default:
panic("unknown proxy type")
}
select {
case <-pausec:
case <-s.donec:
return
}
// pause first, and then drop packets
if nr2 == 0 {
continue
}
// block before forwarding
var lat time.Duration
switch ptype {
case proxyTx:
s.latencyTxMu.RLock()
lat = s.latencyTx
s.latencyTxMu.RUnlock()
case proxyRx:
s.latencyRxMu.RLock()
lat = s.latencyRx
s.latencyRxMu.RUnlock()
default:
panic("unknown proxy type")
}
if lat > 0 {
select {
case <-time.After(lat):
case <-s.donec:
return
}
}
// now forward packets to target
var nw int
nw, err = dst.Write(data)
if err != nil {
if errors.Is(err, io.EOF) {
return
}
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
switch ptype {
case proxyTx:
s.lg.Debug("write fail on tx", zap.Error(err))
case proxyRx:
s.lg.Debug("write fail on rx", zap.Error(err))
default:
panic("unknown proxy type")
}
return
}
if nr2 != nw {
select {
case s.errc <- io.ErrShortWrite:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
switch ptype {
case proxyTx:
s.lg.Debug(
"write fail on tx; read/write bytes are different",
zap.Int("read-bytes", nr1),
zap.Int("write-bytes", nw),
zap.Error(io.ErrShortWrite),
)
case proxyRx:
s.lg.Debug(
"write fail on rx; read/write bytes are different",
zap.Int("read-bytes", nr1),
zap.Int("write-bytes", nw),
zap.Error(io.ErrShortWrite),
)
default:
panic("unknown proxy type")
}
return
}
switch ptype {
case proxyTx:
s.lg.Debug(
"transmitted",
zap.String("data-size", humanize.Bytes(uint64(nr1))),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
case proxyRx:
s.lg.Debug(
"received",
zap.String("data-size", humanize.Bytes(uint64(nr1))),
zap.String("from", s.To()),
zap.String("to", s.From()),
)
default:
panic("unknown proxy type")
}
}
}
func (s *server) Ready() <-chan struct{} { return s.readyc }
func (s *server) Done() <-chan struct{} { return s.donec }
func (s *server) Error() <-chan error { return s.errc }
func (s *server) Close() (err error) {
s.closeOnce.Do(func() {
close(s.donec)
s.listenerMu.Lock()
if s.listener != nil {
err = s.listener.Close()
s.lg.Info(
"closed proxy listener",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
s.lg.Sync()
s.listenerMu.Unlock()
})
s.closeWg.Wait()
return err
}
func (s *server) DelayTx(latency, rv time.Duration) {
if latency <= 0 {
return
}
d := computeLatency(latency, rv)
s.latencyTxMu.Lock()
s.latencyTx = d
s.latencyTxMu.Unlock()
s.lg.Info(
"set transmit latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) UndelayTx() {
s.latencyTxMu.Lock()
d := s.latencyTx
s.latencyTx = 0
s.latencyTxMu.Unlock()
s.lg.Info(
"removed transmit latency",
zap.Duration("latency", d),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) LatencyTx() time.Duration {
s.latencyTxMu.RLock()
d := s.latencyTx
s.latencyTxMu.RUnlock()
return d
}
func (s *server) DelayRx(latency, rv time.Duration) {
if latency <= 0 {
return
}
d := computeLatency(latency, rv)
s.latencyRxMu.Lock()
s.latencyRx = d
s.latencyRxMu.Unlock()
s.lg.Info(
"set receive latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) UndelayRx() {
s.latencyRxMu.Lock()
d := s.latencyRx
s.latencyRx = 0
s.latencyRxMu.Unlock()
s.lg.Info(
"removed receive latency",
zap.Duration("latency", d),
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) LatencyRx() time.Duration {
s.latencyRxMu.RLock()
d := s.latencyRx
s.latencyRxMu.RUnlock()
return d
}
func computeLatency(lat, rv time.Duration) time.Duration {
if rv == 0 {
return lat
}
if rv < 0 {
rv *= -1
}
if rv > lat {
rv = lat / 10
}
now := time.Now()
sign := 1
if now.Second()%2 == 0 {
sign = -1
}
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
}
func (s *server) ModifyTx(f func([]byte) []byte) {
s.modifyTxMu.Lock()
s.modifyTx = f
s.modifyTxMu.Unlock()
s.lg.Info(
"modifying tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) UnmodifyTx() {
s.modifyTxMu.Lock()
s.modifyTx = nil
s.modifyTxMu.Unlock()
s.lg.Info(
"unmodifyed tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) ModifyRx(f func([]byte) []byte) {
s.modifyRxMu.Lock()
s.modifyRx = f
s.modifyRxMu.Unlock()
s.lg.Info(
"modifying rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) UnmodifyRx() {
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()
s.lg.Info(
"unmodifyed rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) BlackholeTx() {
s.ModifyTx(func([]byte) []byte { return nil })
s.lg.Info(
"blackholed tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) UnblackholeTx() {
s.UnmodifyTx()
s.lg.Info(
"unblackholed tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) BlackholeRx() {
s.ModifyRx(func([]byte) []byte { return nil })
s.lg.Info(
"blackholed rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) UnblackholeRx() {
s.UnmodifyRx()
s.lg.Info(
"unblackholed rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
s.pauseTxMu.Unlock()
s.lg.Info(
"paused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) UnpauseTx() {
s.pauseTxMu.Lock()
select {
case <-s.pauseTxc: // already unpaused
case <-s.donec:
s.pauseTxMu.Unlock()
return
default:
close(s.pauseTxc)
}
s.pauseTxMu.Unlock()
s.lg.Info(
"unpaused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) PauseRx() {
s.pauseRxMu.Lock()
s.pauseRxc = make(chan struct{})
s.pauseRxMu.Unlock()
s.lg.Info(
"paused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) UnpauseRx() {
s.pauseRxMu.Lock()
select {
case <-s.pauseRxc: // already unpaused
case <-s.donec:
s.pauseRxMu.Unlock()
return
default:
close(s.pauseRxc)
}
s.pauseRxMu.Unlock()
s.lg.Info(
"unpaused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) ResetListener() error {
s.listenerMu.Lock()
defer s.listenerMu.Unlock()
if err := s.listener.Close(); err != nil {
// already closed
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
return err
}
}
var ln net.Listener
var err error
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
} else {
ln, err = net.Listen(s.from.Scheme, s.from.Host)
}
if err != nil {
return err
}
s.listener = ln
s.lg.Info(
"reset listener on",
zap.String("from", s.From()),
)
return nil
}