mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge 5fb0352b97e5c62edcd4b4ae3ba78b7efbaa6fea into c86c93ca2951338115159dcdd20711603044e1f1
This commit is contained in:
commit
345bdd46c7
@ -59,20 +59,6 @@ type Server interface {
|
|||||||
// Close closes listener and transport.
|
// Close closes listener and transport.
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
// PauseAccept stops accepting new connections.
|
|
||||||
PauseAccept()
|
|
||||||
// UnpauseAccept removes pause operation on accepting new connections.
|
|
||||||
UnpauseAccept()
|
|
||||||
|
|
||||||
// DelayAccept adds latency ± random variable to accepting
|
|
||||||
// new incoming connections.
|
|
||||||
DelayAccept(latency, rv time.Duration)
|
|
||||||
// UndelayAccept removes sending latencies.
|
|
||||||
UndelayAccept()
|
|
||||||
// LatencyAccept returns current latency on accepting
|
|
||||||
// new incoming connections.
|
|
||||||
LatencyAccept() time.Duration
|
|
||||||
|
|
||||||
// DelayTx adds latency ± random variable for "outgoing" traffic
|
// DelayTx adds latency ± random variable for "outgoing" traffic
|
||||||
// in "sending" layer.
|
// in "sending" layer.
|
||||||
DelayTx(latency, rv time.Duration)
|
DelayTx(latency, rv time.Duration)
|
||||||
@ -115,16 +101,6 @@ type Server interface {
|
|||||||
// UnblackholeRx removes blackhole operation on "receiving".
|
// UnblackholeRx removes blackhole operation on "receiving".
|
||||||
UnblackholeRx()
|
UnblackholeRx()
|
||||||
|
|
||||||
// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
|
|
||||||
PauseTx()
|
|
||||||
// UnpauseTx removes "forwarding" pause operation.
|
|
||||||
UnpauseTx()
|
|
||||||
|
|
||||||
// PauseRx stops "receiving" packets; "incoming" traffic blocks.
|
|
||||||
PauseRx()
|
|
||||||
// UnpauseRx removes "receiving" pause operation.
|
|
||||||
UnpauseRx()
|
|
||||||
|
|
||||||
// ResetListener closes and restarts listener.
|
// ResetListener closes and restarts listener.
|
||||||
ResetListener() error
|
ResetListener() error
|
||||||
}
|
}
|
||||||
@ -164,24 +140,12 @@ type server struct {
|
|||||||
listenerMu sync.RWMutex
|
listenerMu sync.RWMutex
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
|
|
||||||
pauseAcceptMu sync.Mutex
|
|
||||||
pauseAcceptc chan struct{}
|
|
||||||
|
|
||||||
latencyAcceptMu sync.RWMutex
|
|
||||||
latencyAccept time.Duration
|
|
||||||
|
|
||||||
modifyTxMu sync.RWMutex
|
modifyTxMu sync.RWMutex
|
||||||
modifyTx func(data []byte) []byte
|
modifyTx func(data []byte) []byte
|
||||||
|
|
||||||
modifyRxMu sync.RWMutex
|
modifyRxMu sync.RWMutex
|
||||||
modifyRx func(data []byte) []byte
|
modifyRx func(data []byte) []byte
|
||||||
|
|
||||||
pauseTxMu sync.Mutex
|
|
||||||
pauseTxc chan struct{}
|
|
||||||
|
|
||||||
pauseRxMu sync.Mutex
|
|
||||||
pauseRxc chan struct{}
|
|
||||||
|
|
||||||
latencyTxMu sync.RWMutex
|
latencyTxMu sync.RWMutex
|
||||||
latencyTx time.Duration
|
latencyTx time.Duration
|
||||||
|
|
||||||
@ -207,10 +171,6 @@ func NewServer(cfg ServerConfig) Server {
|
|||||||
readyc: make(chan struct{}),
|
readyc: make(chan struct{}),
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
errc: make(chan error, 16),
|
errc: make(chan error, 16),
|
||||||
|
|
||||||
pauseAcceptc: make(chan struct{}),
|
|
||||||
pauseTxc: make(chan struct{}),
|
|
||||||
pauseRxc: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, fromPort, err := net.SplitHostPort(cfg.From.Host)
|
_, fromPort, err := net.SplitHostPort(cfg.From.Host)
|
||||||
@ -233,10 +193,6 @@ func NewServer(cfg ServerConfig) Server {
|
|||||||
s.retryInterval = defaultRetryInterval
|
s.retryInterval = defaultRetryInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
close(s.pauseAcceptc)
|
|
||||||
close(s.pauseTxc)
|
|
||||||
close(s.pauseRxc)
|
|
||||||
|
|
||||||
if strings.HasPrefix(s.from.Scheme, "http") {
|
if strings.HasPrefix(s.from.Scheme, "http") {
|
||||||
s.from.Scheme = "tcp"
|
s.from.Scheme = "tcp"
|
||||||
}
|
}
|
||||||
@ -290,26 +246,6 @@ func (s *server) listenAndServe() {
|
|||||||
close(s.readyc)
|
close(s.readyc)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
s.pauseAcceptMu.Lock()
|
|
||||||
pausec := s.pauseAcceptc
|
|
||||||
s.pauseAcceptMu.Unlock()
|
|
||||||
select {
|
|
||||||
case <-pausec:
|
|
||||||
case <-s.donec:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.latencyAcceptMu.RLock()
|
|
||||||
lat := s.latencyAccept
|
|
||||||
s.latencyAcceptMu.RUnlock()
|
|
||||||
if lat > 0 {
|
|
||||||
select {
|
|
||||||
case <-time.After(lat):
|
|
||||||
case <-s.donec:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.listenerMu.RLock()
|
s.listenerMu.RLock()
|
||||||
ln := s.listener
|
ln := s.listener
|
||||||
s.listenerMu.RUnlock()
|
s.listenerMu.RUnlock()
|
||||||
@ -495,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
|
|||||||
panic("unknown proxy type")
|
panic("unknown proxy type")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pause before packet dropping, blocking, and forwarding
|
|
||||||
var pausec chan struct{}
|
|
||||||
switch ptype {
|
|
||||||
case proxyTx:
|
|
||||||
s.pauseTxMu.Lock()
|
|
||||||
pausec = s.pauseTxc
|
|
||||||
s.pauseTxMu.Unlock()
|
|
||||||
case proxyRx:
|
|
||||||
s.pauseRxMu.Lock()
|
|
||||||
pausec = s.pauseRxc
|
|
||||||
s.pauseRxMu.Unlock()
|
|
||||||
default:
|
|
||||||
panic("unknown proxy type")
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-pausec:
|
|
||||||
case <-s.donec:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// pause first, and then drop packets
|
|
||||||
if nr2 == 0 {
|
if nr2 == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -645,77 +560,6 @@ func (s *server) Close() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) PauseAccept() {
|
|
||||||
s.pauseAcceptMu.Lock()
|
|
||||||
s.pauseAcceptc = make(chan struct{})
|
|
||||||
s.pauseAcceptMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"paused accept",
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) UnpauseAccept() {
|
|
||||||
s.pauseAcceptMu.Lock()
|
|
||||||
select {
|
|
||||||
case <-s.pauseAcceptc: // already unpaused
|
|
||||||
case <-s.donec:
|
|
||||||
s.pauseAcceptMu.Unlock()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(s.pauseAcceptc)
|
|
||||||
}
|
|
||||||
s.pauseAcceptMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"unpaused accept",
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) DelayAccept(latency, rv time.Duration) {
|
|
||||||
if latency <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
d := computeLatency(latency, rv)
|
|
||||||
s.latencyAcceptMu.Lock()
|
|
||||||
s.latencyAccept = d
|
|
||||||
s.latencyAcceptMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"set accept latency",
|
|
||||||
zap.Duration("latency", d),
|
|
||||||
zap.Duration("given-latency", latency),
|
|
||||||
zap.Duration("given-latency-random-variable", rv),
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) UndelayAccept() {
|
|
||||||
s.latencyAcceptMu.Lock()
|
|
||||||
d := s.latencyAccept
|
|
||||||
s.latencyAccept = 0
|
|
||||||
s.latencyAcceptMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"removed accept latency",
|
|
||||||
zap.Duration("latency", d),
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) LatencyAccept() time.Duration {
|
|
||||||
s.latencyAcceptMu.RLock()
|
|
||||||
d := s.latencyAccept
|
|
||||||
s.latencyAcceptMu.RUnlock()
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) DelayTx(latency, rv time.Duration) {
|
func (s *server) DelayTx(latency, rv time.Duration) {
|
||||||
if latency <= 0 {
|
if latency <= 0 {
|
||||||
return
|
return
|
||||||
@ -897,68 +741,6 @@ func (s *server) UnblackholeRx() {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) PauseTx() {
|
|
||||||
s.pauseTxMu.Lock()
|
|
||||||
s.pauseTxc = make(chan struct{})
|
|
||||||
s.pauseTxMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"paused tx",
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) UnpauseTx() {
|
|
||||||
s.pauseTxMu.Lock()
|
|
||||||
select {
|
|
||||||
case <-s.pauseTxc: // already unpaused
|
|
||||||
case <-s.donec:
|
|
||||||
s.pauseTxMu.Unlock()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(s.pauseTxc)
|
|
||||||
}
|
|
||||||
s.pauseTxMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"unpaused tx",
|
|
||||||
zap.String("from", s.From()),
|
|
||||||
zap.String("to", s.To()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) PauseRx() {
|
|
||||||
s.pauseRxMu.Lock()
|
|
||||||
s.pauseRxc = make(chan struct{})
|
|
||||||
s.pauseRxMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"paused rx",
|
|
||||||
zap.String("from", s.To()),
|
|
||||||
zap.String("to", s.From()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) UnpauseRx() {
|
|
||||||
s.pauseRxMu.Lock()
|
|
||||||
select {
|
|
||||||
case <-s.pauseRxc: // already unpaused
|
|
||||||
case <-s.donec:
|
|
||||||
s.pauseRxMu.Unlock()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(s.pauseRxc)
|
|
||||||
}
|
|
||||||
s.pauseRxMu.Unlock()
|
|
||||||
|
|
||||||
s.lg.Info(
|
|
||||||
"unpaused rx",
|
|
||||||
zap.String("from", s.To()),
|
|
||||||
zap.String("to", s.From()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) ResetListener() error {
|
func (s *server) ResetListener() error {
|
||||||
s.listenerMu.Lock()
|
s.listenerMu.Lock()
|
||||||
defer s.listenerMu.Unlock()
|
defer s.listenerMu.Unlock()
|
||||||
|
|||||||
@ -175,114 +175,6 @@ func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo {
|
|||||||
return transport.TLSInfo{Logger: lg}
|
return transport.TLSInfo{Logger: lg}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
|
|
||||||
func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
|
|
||||||
func testServerDelayAccept(t *testing.T, secure bool) {
|
|
||||||
lg := zaptest.NewLogger(t)
|
|
||||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
|
||||||
defer func() {
|
|
||||||
os.RemoveAll(srcAddr)
|
|
||||||
os.RemoveAll(dstAddr)
|
|
||||||
}()
|
|
||||||
tlsInfo := createTLSInfo(lg, secure)
|
|
||||||
scheme := "unix"
|
|
||||||
ln := listen(t, scheme, dstAddr, tlsInfo)
|
|
||||||
defer ln.Close()
|
|
||||||
|
|
||||||
cfg := ServerConfig{
|
|
||||||
Logger: lg,
|
|
||||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
|
||||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
|
||||||
}
|
|
||||||
if secure {
|
|
||||||
cfg.TLSInfo = tlsInfo
|
|
||||||
}
|
|
||||||
p := NewServer(cfg)
|
|
||||||
|
|
||||||
waitForServer(t, p)
|
|
||||||
|
|
||||||
defer p.Close()
|
|
||||||
|
|
||||||
data := []byte("Hello World!")
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
send(t, data, scheme, srcAddr, tlsInfo)
|
|
||||||
if d := receive(t, ln); !bytes.Equal(data, d) {
|
|
||||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
|
||||||
}
|
|
||||||
took1 := time.Since(now)
|
|
||||||
t.Logf("took %v with no latency", took1)
|
|
||||||
|
|
||||||
lat, rv := 700*time.Millisecond, 10*time.Millisecond
|
|
||||||
p.DelayAccept(lat, rv)
|
|
||||||
defer p.UndelayAccept()
|
|
||||||
if err := p.ResetListener(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
|
|
||||||
now = time.Now()
|
|
||||||
send(t, data, scheme, srcAddr, tlsInfo)
|
|
||||||
if d := receive(t, ln); !bytes.Equal(data, d) {
|
|
||||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
|
||||||
}
|
|
||||||
took2 := time.Since(now)
|
|
||||||
t.Logf("took %v with latency %v±%v", took2, lat, rv)
|
|
||||||
|
|
||||||
if took1 >= took2 {
|
|
||||||
t.Fatalf("expected took1 %v < took2 %v", took1, took2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServer_PauseTx(t *testing.T) {
|
|
||||||
lg := zaptest.NewLogger(t)
|
|
||||||
scheme := "unix"
|
|
||||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
|
||||||
defer func() {
|
|
||||||
os.RemoveAll(srcAddr)
|
|
||||||
os.RemoveAll(dstAddr)
|
|
||||||
}()
|
|
||||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
|
||||||
defer ln.Close()
|
|
||||||
|
|
||||||
p := NewServer(ServerConfig{
|
|
||||||
Logger: lg,
|
|
||||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
|
||||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
|
||||||
})
|
|
||||||
|
|
||||||
waitForServer(t, p)
|
|
||||||
|
|
||||||
defer p.Close()
|
|
||||||
|
|
||||||
p.PauseTx()
|
|
||||||
|
|
||||||
data := []byte("Hello World!")
|
|
||||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
|
||||||
|
|
||||||
recvc := make(chan []byte, 1)
|
|
||||||
go func() {
|
|
||||||
recvc <- receive(t, ln)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case d := <-recvc:
|
|
||||||
t.Fatalf("received unexpected data %q during pause", string(d))
|
|
||||||
case <-time.After(200 * time.Millisecond):
|
|
||||||
}
|
|
||||||
|
|
||||||
p.UnpauseTx()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case d := <-recvc:
|
|
||||||
if !bytes.Equal(data, d) {
|
|
||||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
|
||||||
}
|
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
t.Fatal("took too long to receive after unpause")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServer_ModifyTx_corrupt(t *testing.T) {
|
func TestServer_ModifyTx_corrupt(t *testing.T) {
|
||||||
lg := zaptest.NewLogger(t)
|
lg := zaptest.NewLogger(t)
|
||||||
scheme := "unix"
|
scheme := "unix"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user