Remove PauseTx and PauseRx of the reverse proxy from the e2e test

Part of the patches to fix https://github.com/etcd-io/etcd/issues/17737

During the development of https://github.com/etcd-io/etcd/pull/17938,
we agreed that during the transition to L7 forward proxy, unused
features and features targeting L4 reverse proxy will be dropped.

This feature falls under the unused feature.

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
This commit is contained in:
Chun-Hung Tseng 2024-09-26 00:06:18 +02:00
parent fd967e08d0
commit 5fb0352b97

View File

@ -146,12 +146,6 @@ type server struct {
modifyRxMu sync.RWMutex
modifyRx func(data []byte) []byte
pauseTxMu sync.Mutex
pauseTxc chan struct{}
pauseRxMu sync.Mutex
pauseRxc chan struct{}
latencyTxMu sync.RWMutex
latencyTx time.Duration
@ -177,9 +171,6 @@ func NewServer(cfg ServerConfig) Server {
readyc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 16),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}
_, fromPort, err := net.SplitHostPort(cfg.From.Host)
@ -202,9 +193,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}
close(s.pauseTxc)
close(s.pauseRxc)
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
@ -443,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
panic("unknown proxy type")
}
// pause before packet dropping, blocking, and forwarding
var pausec chan struct{}
switch ptype {
case proxyTx:
s.pauseTxMu.Lock()
pausec = s.pauseTxc
s.pauseTxMu.Unlock()
case proxyRx:
s.pauseRxMu.Lock()
pausec = s.pauseRxc
s.pauseRxMu.Unlock()
default:
panic("unknown proxy type")
}
select {
case <-pausec:
case <-s.donec:
return
}
// pause first, and then drop packets
if nr2 == 0 {
continue
}
@ -774,68 +741,6 @@ func (s *server) UnblackholeRx() {
)
}
func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
s.pauseTxMu.Unlock()
s.lg.Info(
"paused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) UnpauseTx() {
s.pauseTxMu.Lock()
select {
case <-s.pauseTxc: // already unpaused
case <-s.donec:
s.pauseTxMu.Unlock()
return
default:
close(s.pauseTxc)
}
s.pauseTxMu.Unlock()
s.lg.Info(
"unpaused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}
func (s *server) PauseRx() {
s.pauseRxMu.Lock()
s.pauseRxc = make(chan struct{})
s.pauseRxMu.Unlock()
s.lg.Info(
"paused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) UnpauseRx() {
s.pauseRxMu.Lock()
select {
case <-s.pauseRxc: // already unpaused
case <-s.donec:
s.pauseRxMu.Unlock()
return
default:
close(s.pauseRxc)
}
s.pauseRxMu.Unlock()
s.lg.Info(
"unpaused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}
func (s *server) ResetListener() error {
s.listenerMu.Lock()
defer s.listenerMu.Unlock()