mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Remove pause
Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
This commit is contained in:
parent
ac592a2f97
commit
e7b77bb914
@ -53,7 +53,8 @@ var (
|
||||
//
|
||||
// Also, because we are forced to use TLS to communicate with the proxy server
|
||||
// and using well-formed header to talk to the destination server,
|
||||
// we can't do random modification on the data on-the-fly anymore.
|
||||
// so in the L7 forward proxy design we drop features such as random packet
|
||||
// modification, etc.
|
||||
type Server interface {
|
||||
// Listen returns proxy listen address in "scheme://host:port" format.
|
||||
Listen() string
|
||||
@ -67,11 +68,6 @@ type Server interface {
|
||||
// Close closes listener and transport.
|
||||
Close() error
|
||||
|
||||
// PauseAccept stops accepting new connections.
|
||||
PauseAccept()
|
||||
// UnpauseAccept removes pause operation on accepting new connections.
|
||||
UnpauseAccept()
|
||||
|
||||
// DelayAccept adds latency ± random variable to accepting
|
||||
// new incoming connections.
|
||||
DelayAccept(latency, rv time.Duration)
|
||||
@ -132,16 +128,6 @@ type Server interface {
|
||||
BlackholePeerRx(peer url.URL)
|
||||
// UnblackholePeerRx removes blackhole operation on "receiving".
|
||||
UnblackholePeerRx(peer url.URL)
|
||||
|
||||
// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
|
||||
PauseTx()
|
||||
// UnpauseTx removes "forwarding" pause operation.
|
||||
UnpauseTx()
|
||||
|
||||
// PauseRx stops "receiving" packets; "incoming" traffic blocks.
|
||||
PauseRx()
|
||||
// UnpauseRx removes "receiving" pause operation.
|
||||
UnpauseRx()
|
||||
}
|
||||
|
||||
// ServerConfig defines proxy server configuration.
|
||||
@ -183,9 +169,6 @@ type server struct {
|
||||
listenerMu sync.RWMutex
|
||||
listener *customListener
|
||||
|
||||
pauseAcceptMu sync.Mutex
|
||||
pauseAcceptc chan struct{}
|
||||
|
||||
latencyAcceptMu sync.RWMutex
|
||||
latencyAccept time.Duration
|
||||
|
||||
@ -195,12 +178,6 @@ type server struct {
|
||||
modifyRxMu sync.RWMutex
|
||||
modifyRx func(data []byte) []byte
|
||||
|
||||
pauseTxMu sync.Mutex
|
||||
pauseTxc chan struct{}
|
||||
|
||||
pauseRxMu sync.Mutex
|
||||
pauseRxc chan struct{}
|
||||
|
||||
latencyTxMu sync.RWMutex
|
||||
latencyTx time.Duration
|
||||
|
||||
@ -231,10 +208,6 @@ func NewServer(cfg ServerConfig) Server {
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 16),
|
||||
|
||||
pauseAcceptc: make(chan struct{}),
|
||||
pauseTxc: make(chan struct{}),
|
||||
pauseRxc: make(chan struct{}),
|
||||
|
||||
blackholePeerMap: make(map[int]uint8),
|
||||
}
|
||||
|
||||
@ -251,10 +224,6 @@ func NewServer(cfg ServerConfig) Server {
|
||||
s.retryInterval = defaultRetryInterval
|
||||
}
|
||||
|
||||
close(s.pauseAcceptc)
|
||||
close(s.pauseTxc)
|
||||
close(s.pauseRxc)
|
||||
|
||||
// L7 is http (scheme), L4 is tcp (network listener)
|
||||
addr := ""
|
||||
if strings.HasPrefix(s.listen.Scheme, "http") {
|
||||
@ -321,16 +290,7 @@ type customListener struct {
|
||||
}
|
||||
|
||||
func (c *customListener) Accept() (net.Conn, error) {
|
||||
// we implement the L4 features here (pause / latency accept)
|
||||
c.s.pauseAcceptMu.Lock()
|
||||
pausec := c.s.pauseAcceptc
|
||||
c.s.pauseAcceptMu.Unlock()
|
||||
select {
|
||||
case <-pausec:
|
||||
case <-c.s.donec:
|
||||
return nil, fmt.Errorf("listener is closed")
|
||||
}
|
||||
|
||||
// we implement the L4 features here
|
||||
c.s.latencyAcceptMu.RLock()
|
||||
lat := c.s.latencyAccept
|
||||
c.s.lg.Info(
|
||||
@ -624,27 +584,6 @@ func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) {
|
||||
panic("unknown proxy type")
|
||||
}
|
||||
|
||||
// pause before packet dropping, blocking, and forwarding
|
||||
var pausec chan struct{}
|
||||
switch ptype {
|
||||
case proxyTx:
|
||||
s.pauseTxMu.Lock()
|
||||
pausec = s.pauseTxc
|
||||
s.pauseTxMu.Unlock()
|
||||
case proxyRx:
|
||||
s.pauseRxMu.Lock()
|
||||
pausec = s.pauseRxc
|
||||
s.pauseRxMu.Unlock()
|
||||
default:
|
||||
panic("unknown proxy type")
|
||||
}
|
||||
select {
|
||||
case <-pausec:
|
||||
case <-s.donec:
|
||||
return
|
||||
}
|
||||
|
||||
// pause first, and then drop packets
|
||||
if nr2 == 0 {
|
||||
continue
|
||||
}
|
||||
@ -783,35 +722,6 @@ func (s *server) Close() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *server) PauseAccept() {
|
||||
s.pauseAcceptMu.Lock()
|
||||
s.pauseAcceptc = make(chan struct{})
|
||||
s.pauseAcceptMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"paused accept",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *server) UnpauseAccept() {
|
||||
s.pauseAcceptMu.Lock()
|
||||
select {
|
||||
case <-s.pauseAcceptc: // already unpaused
|
||||
case <-s.donec:
|
||||
s.pauseAcceptMu.Unlock()
|
||||
return
|
||||
default:
|
||||
close(s.pauseAcceptc)
|
||||
}
|
||||
s.pauseAcceptMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"unpaused accept",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *server) DelayAccept(latency, rv time.Duration) {
|
||||
if latency <= 0 {
|
||||
return
|
||||
@ -1079,61 +989,3 @@ func (s *server) UnblackholePeerRx(peer url.URL) {
|
||||
s.blackholePeerMap[port] = val
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) PauseTx() {
|
||||
s.pauseTxMu.Lock()
|
||||
s.pauseTxc = make(chan struct{})
|
||||
s.pauseTxMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"paused tx",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *server) UnpauseTx() {
|
||||
s.pauseTxMu.Lock()
|
||||
select {
|
||||
case <-s.pauseTxc: // already unpaused
|
||||
case <-s.donec:
|
||||
s.pauseTxMu.Unlock()
|
||||
return
|
||||
default:
|
||||
close(s.pauseTxc)
|
||||
}
|
||||
s.pauseTxMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"unpaused tx",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *server) PauseRx() {
|
||||
s.pauseRxMu.Lock()
|
||||
s.pauseRxc = make(chan struct{})
|
||||
s.pauseRxMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"paused rx",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *server) UnpauseRx() {
|
||||
s.pauseRxMu.Lock()
|
||||
select {
|
||||
case <-s.pauseRxc: // already unpaused
|
||||
case <-s.donec:
|
||||
s.pauseRxMu.Unlock()
|
||||
return
|
||||
default:
|
||||
close(s.pauseRxc)
|
||||
}
|
||||
s.pauseRxMu.Unlock()
|
||||
|
||||
s.lg.Info(
|
||||
"unpaused rx",
|
||||
zap.String("proxy listen on", s.Listen()),
|
||||
)
|
||||
}
|
||||
|
@ -334,41 +334,6 @@ func TestServer_DelayAccept(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_PauseTx(t *testing.T) {
|
||||
recvc, donec, writec, p, httpServer, sendData := prepare(t, false)
|
||||
defer destroy(t, writec, donec, p, false, httpServer)
|
||||
// the sendData function must be in a goroutine
|
||||
// otherwise, the pauseTx will cause the sendData to block
|
||||
go func() {
|
||||
defer close(donec)
|
||||
for data := range writec {
|
||||
sendData(data)
|
||||
}
|
||||
}()
|
||||
|
||||
data := []byte("Hello World!")
|
||||
|
||||
p.PauseTx()
|
||||
|
||||
writec <- data
|
||||
select {
|
||||
case d := <-recvc:
|
||||
t.Fatalf("received unexpected data %q during pause", string(d))
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
}
|
||||
|
||||
p.UnpauseTx()
|
||||
|
||||
select {
|
||||
case d := <-recvc:
|
||||
if !bytes.Equal(data, d) {
|
||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("took too long to receive after unpause")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_BlackholeTx(t *testing.T) {
|
||||
recvc, donec, writec, p, httpServer, sendData := prepare(t, false)
|
||||
defer destroy(t, writec, donec, p, false, httpServer)
|
||||
|
Loading…
x
Reference in New Issue
Block a user