From 462d24af9c056a71e12ea9827ee87556d7efcae7 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 25 Sep 2024 23:02:13 +0200 Subject: [PATCH 1/4] Remove PauseAccept of the reverse proxy from the e2e test Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737 During the development of https://github.com/etcd-io/etcd/pull/17938, we agreed that during the transition to L7 forward proxy, unused features and features targeting L4 reverse proxy will be dropped. This feature falls under the unused feature. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 64 ++-------------------------------------- pkg/proxy/server_test.go | 49 ------------------------------ 2 files changed, 2 insertions(+), 111 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bc71c3a16..8166d3197 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,11 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - // PauseAccept stops accepting new connections. - PauseAccept() - // UnpauseAccept removes pause operation on accepting new connections. - UnpauseAccept() - // DelayAccept adds latency ± random variable to accepting // new incoming connections. DelayAccept(latency, rv time.Duration) @@ -115,16 +110,6 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() - // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. - PauseTx() - // UnpauseTx removes "forwarding" pause operation. - UnpauseTx() - - // PauseRx stops "receiving" packets; "incoming" traffic blocks. - PauseRx() - // UnpauseRx removes "receiving" pause operation. - UnpauseRx() - // ResetListener closes and restarts listener. ResetListener() error } @@ -164,9 +149,6 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - pauseAcceptMu sync.Mutex - pauseAcceptc chan struct{} - latencyAcceptMu sync.RWMutex latencyAccept time.Duration @@ -208,9 +190,8 @@ func NewServer(cfg ServerConfig) Server { donec: make(chan struct{}), errc: make(chan error, 16), - pauseAcceptc: make(chan struct{}), - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), + pauseTxc: make(chan struct{}), + pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -233,7 +214,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseAcceptc) close(s.pauseTxc) close(s.pauseRxc) @@ -290,15 +270,6 @@ func (s *server) listenAndServe() { close(s.readyc) for { - s.pauseAcceptMu.Lock() - pausec := s.pauseAcceptc - s.pauseAcceptMu.Unlock() - select { - case <-pausec: - case <-s.donec: - return - } - s.latencyAcceptMu.RLock() lat := s.latencyAccept s.latencyAcceptMu.RUnlock() @@ -645,37 +616,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) PauseAccept() { - s.pauseAcceptMu.Lock() - s.pauseAcceptc = make(chan struct{}) - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "paused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseAccept() { - s.pauseAcceptMu.Lock() - select { - case <-s.pauseAcceptc: // already unpaused - case <-s.donec: - s.pauseAcceptMu.Unlock() - return - default: - close(s.pauseAcceptc) - } - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "unpaused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - func (s *server) DelayAccept(latency, rv time.Duration) { if latency <= 0 { return diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index d19c947c6..90fd61305 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -234,55 +234,6 @@ func testServerDelayAccept(t *testing.T, secure bool) { } } -func TestServer_PauseTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.PauseTx() - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() - - select { - case d := <-recvc: - t.Fatalf("received unexpected data %q during pause", string(d)) - case <-time.After(200 * time.Millisecond): - } - - p.UnpauseTx() - - select { - case d := <-recvc: - if !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - case <-time.After(2 * time.Second): - t.Fatal("took too long to receive after unpause") - } -} - func TestServer_ModifyTx_corrupt(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix" From 925181a17f3c051df8f8c244dee428fb8cb28f02 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 25 Sep 2024 23:07:51 +0200 Subject: [PATCH 2/4] Remove DelayAccept of the reverse proxy from the e2e test Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737 During the development of https://github.com/etcd-io/etcd/pull/17938, we agreed that during the transition to L7 forward proxy, unused features and features targeting L4 reverse proxy will be dropped. This feature falls under the unused feature. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 40 --------------------------- pkg/proxy/server_test.go | 59 ---------------------------------------- 2 files changed, 99 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 8166d3197..4e15df3d5 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,13 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - // DelayAccept adds latency ± random variable to accepting - // new incoming connections. - DelayAccept(latency, rv time.Duration) - // UndelayAccept removes sending latencies. - UndelayAccept() - // LatencyAccept returns current latency on accepting - // new incoming connections. LatencyAccept() time.Duration // DelayTx adds latency ± random variable for "outgoing" traffic @@ -616,39 +609,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) DelayAccept(latency, rv time.Duration) { - if latency <= 0 { - return - } - d := computeLatency(latency, rv) - s.latencyAcceptMu.Lock() - s.latencyAccept = d - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "set accept latency", - zap.Duration("latency", d), - zap.Duration("given-latency", latency), - zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UndelayAccept() { - s.latencyAcceptMu.Lock() - d := s.latencyAccept - s.latencyAccept = 0 - s.latencyAcceptMu.Unlock() - - s.lg.Info( - "removed accept latency", - zap.Duration("latency", d), - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - func (s *server) LatencyAccept() time.Duration { s.latencyAcceptMu.RLock() d := s.latencyAccept diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index 90fd61305..baabfebe4 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -175,65 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { return transport.TLSInfo{Logger: lg} } -func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) } -func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) } -func testServerDelayAccept(t *testing.T, secure bool) { - lg := zaptest.NewLogger(t) - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - tlsInfo := createTLSInfo(lg, secure) - scheme := "unix" - ln := listen(t, scheme, dstAddr, tlsInfo) - defer ln.Close() - - cfg := ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo - } - p := NewServer(cfg) - - waitForServer(t, p) - - defer p.Close() - - data := []byte("Hello World!") - - now := time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took1 := time.Since(now) - t.Logf("took %v with no latency", took1) - - lat, rv := 700*time.Millisecond, 10*time.Millisecond - p.DelayAccept(lat, rv) - defer p.UndelayAccept() - if err := p.ResetListener(); err != nil { - t.Fatal(err) - } - time.Sleep(200 * time.Millisecond) - - now = time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - took2 := time.Since(now) - t.Logf("took %v with latency %v±%v", took2, lat, rv) - - if took1 >= took2 { - t.Fatalf("expected took1 %v < took2 %v", took1, took2) - } -} - func TestServer_ModifyTx_corrupt(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix" From fd967e08d08e7daf1d251ae4380bbeb2c23eaccf Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 25 Sep 2024 23:09:43 +0200 Subject: [PATCH 3/4] Remove LatencyAccept of the reverse proxy from the e2e test Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737 During the development of https://github.com/etcd-io/etcd/pull/17938, we agreed that during the transition to L7 forward proxy, unused features and features targeting L4 reverse proxy will be dropped. This feature falls under the unused feature. Also, the initial implementation has a bug: if connections are not created continuously, the latency accept will not work. Consider the following case: a) set latency accept b) put latency accept into effect c) latency accept will start idling the goroutine d) block-wait at accept() - waiting for new connections e) new connection comes in - establish it f) go to c -> as we can see, if the request come every x seconds, where x is larger than the latency accept time we set, we can see that the latency accept has no effect. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 4e15df3d5..de4127639 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,8 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - LatencyAccept() time.Duration - // DelayTx adds latency ± random variable for "outgoing" traffic // in "sending" layer. DelayTx(latency, rv time.Duration) @@ -142,9 +140,6 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - latencyAcceptMu sync.RWMutex - latencyAccept time.Duration - modifyTxMu sync.RWMutex modifyTx func(data []byte) []byte @@ -263,17 +258,6 @@ func (s *server) listenAndServe() { close(s.readyc) for { - s.latencyAcceptMu.RLock() - lat := s.latencyAccept - s.latencyAcceptMu.RUnlock() - if lat > 0 { - select { - case <-time.After(lat): - case <-s.donec: - return - } - } - s.listenerMu.RLock() ln := s.listener s.listenerMu.RUnlock() @@ -609,13 +593,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) LatencyAccept() time.Duration { - s.latencyAcceptMu.RLock() - d := s.latencyAccept - s.latencyAcceptMu.RUnlock() - return d -} - func (s *server) DelayTx(latency, rv time.Duration) { if latency <= 0 { return From 5fb0352b97e5c62edcd4b4ae3ba78b7efbaa6fea Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Thu, 26 Sep 2024 00:06:18 +0200 Subject: [PATCH 4/4] Remove PauseTx and PauseRx of the reverse proxy from the e2e test Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737 During the development of https://github.com/etcd-io/etcd/pull/17938, we agreed that during the transition to L7 forward proxy, unused features and features targeting L4 reverse proxy will be dropped. This feature falls under the unused feature. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 95 --------------------------------------------- 1 file changed, 95 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index de4127639..cd84e4e64 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -146,12 +146,6 @@ type server struct { modifyRxMu sync.RWMutex modifyRx func(data []byte) []byte - pauseTxMu sync.Mutex - pauseTxc chan struct{} - - pauseRxMu sync.Mutex - pauseRxc chan struct{} - latencyTxMu sync.RWMutex latencyTx time.Duration @@ -177,9 +171,6 @@ func NewServer(cfg ServerConfig) Server { readyc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 16), - - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -202,9 +193,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseTxc) - close(s.pauseRxc) - if strings.HasPrefix(s.from.Scheme, "http") { s.from.Scheme = "tcp" } @@ -443,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { panic("unknown proxy type") } - // pause before packet dropping, blocking, and forwarding - var pausec chan struct{} - switch ptype { - case proxyTx: - s.pauseTxMu.Lock() - pausec = s.pauseTxc - s.pauseTxMu.Unlock() - case proxyRx: - s.pauseRxMu.Lock() - pausec = s.pauseRxc - s.pauseRxMu.Unlock() - default: - panic("unknown proxy type") - } - select { - case <-pausec: - case <-s.donec: - return - } - - // pause first, and then drop packets if nr2 == 0 { continue } @@ -774,68 +741,6 @@ func (s *server) UnblackholeRx() { ) } -func (s *server) PauseTx() { - s.pauseTxMu.Lock() - s.pauseTxc = make(chan struct{}) - s.pauseTxMu.Unlock() - - s.lg.Info( - "paused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseTx() { - s.pauseTxMu.Lock() - select { - case <-s.pauseTxc: // already unpaused - case <-s.donec: - s.pauseTxMu.Unlock() - return - default: - close(s.pauseTxc) - } - s.pauseTxMu.Unlock() - - s.lg.Info( - "unpaused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) PauseRx() { - s.pauseRxMu.Lock() - s.pauseRxc = make(chan struct{}) - s.pauseRxMu.Unlock() - - s.lg.Info( - "paused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - -func (s *server) UnpauseRx() { - s.pauseRxMu.Lock() - select { - case <-s.pauseRxc: // already unpaused - case <-s.donec: - s.pauseRxMu.Unlock() - return - default: - close(s.pauseRxc) - } - s.pauseRxMu.Unlock() - - s.lg.Info( - "unpaused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - func (s *server) ResetListener() error { s.listenerMu.Lock() defer s.listenerMu.Unlock()