From 5db4df762b9bf9e55c5ded1b83e2267dfdef47a6 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Sat, 14 Apr 2018 05:56:52 -0700 Subject: [PATCH] pkg/proxy: make/simplify interface more extensible Extend proxy for more advanced corrupt and packet drop testing. Signed-off-by: Gyuho Lee --- pkg/proxy/server.go | 1157 +++++++++++++++++++------------------- pkg/proxy/server_test.go | 110 ++-- 2 files changed, 666 insertions(+), 601 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 311af966f..10b00f4d4 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -31,129 +31,6 @@ import ( "go.uber.org/zap" ) -// Server defines proxy server layer that simulates common network faults, -// such as latency spikes, packet drop/corruption, etc.. -type Server interface { - // From returns proxy source address in "scheme://host:port" format. - From() string - // To returns proxy destination address in "scheme://host:port" format. - To() string - - // Ready returns when proxy is ready to serve. - Ready() <-chan struct{} - // Done returns when proxy has been closed. - Done() <-chan struct{} - // Error sends errors while serving proxy. - Error() <-chan error - // Close closes listener and transport. - Close() error - - // DelayAccept adds latency ± random variable to accepting new incoming connections. - DelayAccept(latency, rv time.Duration) - // UndelayAccept removes sending latencies. - UndelayAccept() - // LatencyAccept returns current latency on accepting new incoming connections. - LatencyAccept() time.Duration - // DelayTx adds latency ± random variable to "sending" layer. - DelayTx(latency, rv time.Duration) - // UndelayTx removes sending latencies. - UndelayTx() - // LatencyTx returns current send latency. - LatencyTx() time.Duration - // DelayRx adds latency ± random variable to "receiving" layer. - DelayRx(latency, rv time.Duration) - // UndelayRx removes "receiving" latencies. - UndelayRx() - // LatencyRx returns current receive latency. - LatencyRx() time.Duration - - // PauseAccept stops accepting new connections. - PauseAccept() - // UnpauseAccept removes pause operation on accepting new connections. - UnpauseAccept() - // PauseTx stops "forwarding" packets. - PauseTx() - // UnpauseTx removes "forwarding" pause operation. - UnpauseTx() - // PauseRx stops "receiving" packets to client. - PauseRx() - // UnpauseRx removes "receiving" pause operation. - UnpauseRx() - - // BlackholeTx drops all incoming packets before "forwarding". - BlackholeTx() - // UnblackholeTx removes blackhole operation on "sending". - UnblackholeTx() - // BlackholeRx drops all incoming packets to client. - BlackholeRx() - // UnblackholeRx removes blackhole operation on "receiving". - UnblackholeRx() - - // CorruptTx corrupts incoming packets from the listener. - CorruptTx(f func(data []byte) []byte) - // UncorruptTx removes corrupt operation on "forwarding". - UncorruptTx() - // CorruptRx corrupts incoming packets to client. - CorruptRx(f func(data []byte) []byte) - // UncorruptRx removes corrupt operation on "receiving". - UncorruptRx() - - // ResetListener closes and restarts listener. - ResetListener() error -} - -type proxyServer struct { - lg *zap.Logger - - from, to url.URL - tlsInfo transport.TLSInfo - dialTimeout time.Duration - bufferSize int - retryInterval time.Duration - - readyc chan struct{} - donec chan struct{} - errc chan error - - closeOnce sync.Once - closeWg sync.WaitGroup - - listenerMu sync.RWMutex - listener net.Listener - - latencyAcceptMu sync.RWMutex - latencyAccept time.Duration - latencyTxMu sync.RWMutex - latencyTx time.Duration - latencyRxMu sync.RWMutex - latencyRx time.Duration - - corruptTxMu sync.RWMutex - corruptTx func(data []byte) []byte - corruptRxMu sync.RWMutex - corruptRx func(data []byte) []byte - - acceptMu sync.Mutex - pauseAcceptc chan struct{} - txMu sync.Mutex - pauseTxc chan struct{} - blackholeTxc chan struct{} - rxMu sync.Mutex - pauseRxc chan struct{} - blackholeRxc chan struct{} -} - -// ServerConfig defines proxy server configuration. -type ServerConfig struct { - Logger *zap.Logger - From url.URL - To url.URL - TLSInfo transport.TLSInfo - DialTimeout time.Duration - BufferSize int - RetryInterval time.Duration -} - var ( defaultDialTimeout = 3 * time.Second defaultBufferSize = 48 * 1024 @@ -169,16 +46,163 @@ func init() { } } +// Server defines proxy server layer that simulates common network faults: +// latency spikes and packet drop or corruption. The proxy overhead is very +// small overhead (<500μs per request). Please run tests to compute actual +// overhead. +type Server interface { + // From returns proxy source address in "scheme://host:port" format. + From() string + // To returns proxy destination address in "scheme://host:port" format. + To() string + + // Ready returns when proxy is ready to serve. + Ready() <-chan struct{} + // Done returns when proxy has been closed. + Done() <-chan struct{} + // Error sends errors while serving proxy. + Error() <-chan error + // Close closes listener and transport. + Close() error + + // PauseAccept stops accepting new connections. + PauseAccept() + // UnpauseAccept removes pause operation on accepting new connections. + UnpauseAccept() + + // DelayAccept adds latency ± random variable to accepting + // new incoming connections. + DelayAccept(latency, rv time.Duration) + // UndelayAccept removes sending latencies. + UndelayAccept() + // LatencyAccept returns current latency on accepting + // new incoming connections. + LatencyAccept() time.Duration + + // DelayTx adds latency ± random variable for "outgoing" traffic + // in "sending" layer. + DelayTx(latency, rv time.Duration) + // UndelayTx removes sending latencies. + UndelayTx() + // LatencyTx returns current send latency. + LatencyTx() time.Duration + + // DelayRx adds latency ± random variable for "incoming" traffic + // in "receiving" layer. + DelayRx(latency, rv time.Duration) + // UndelayRx removes "receiving" latencies. + UndelayRx() + // LatencyRx returns current receive latency. + LatencyRx() time.Duration + + // ModifyTx alters/corrupts/drops "outgoing" packets from the listener + // with the given edit function. + ModifyTx(f func(data []byte) []byte) + // UnmodifyTx removes modify operation on "forwarding". + UnmodifyTx() + + // ModifyRx alters/corrupts/drops "incoming" packets to client + // with the given edit function. + ModifyRx(f func(data []byte) []byte) + // UnmodifyRx removes modify operation on "receiving". + UnmodifyRx() + + // BlackholeTx drops all "outgoing" packets before "forwarding". + // "BlackholeTx" operation is a wrapper around "ModifyTx" with + // a function that returns empty bytes. + BlackholeTx() + // UnblackholeTx removes blackhole operation on "sending". + UnblackholeTx() + + // BlackholeRx drops all "incoming" packets to client. + // "BlackholeRx" operation is a wrapper around "ModifyRx" with + // a function that returns empty bytes. + BlackholeRx() + // UnblackholeRx removes blackhole operation on "receiving". + UnblackholeRx() + + // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. + PauseTx() + // UnpauseTx removes "forwarding" pause operation. + UnpauseTx() + + // PauseRx stops "receiving" packets; "incoming" traffic blocks. + PauseRx() + // UnpauseRx removes "receiving" pause operation. + UnpauseRx() + + // ResetListener closes and restarts listener. + ResetListener() error +} + +// ServerConfig defines proxy server configuration. +type ServerConfig struct { + Logger *zap.Logger + From url.URL + To url.URL + TLSInfo transport.TLSInfo + DialTimeout time.Duration + BufferSize int + RetryInterval time.Duration +} + +type server struct { + lg *zap.Logger + + from url.URL + to url.URL + tlsInfo transport.TLSInfo + dialTimeout time.Duration + + bufferSize int + retryInterval time.Duration + + readyc chan struct{} + donec chan struct{} + errc chan error + + closeOnce sync.Once + closeWg sync.WaitGroup + + listenerMu sync.RWMutex + listener net.Listener + + acceptMu sync.Mutex + pauseAcceptc chan struct{} + + latencyAcceptMu sync.RWMutex + latencyAccept time.Duration + + modifyTxMu sync.RWMutex + modifyTx func(data []byte) []byte + + modifyRxMu sync.RWMutex + modifyRx func(data []byte) []byte + + pauseTxMu sync.Mutex + pauseTxc chan struct{} + + pauseRxMu sync.Mutex + pauseRxc chan struct{} + + latencyTxMu sync.RWMutex + latencyTx time.Duration + + latencyRxMu sync.RWMutex + latencyRx time.Duration +} + // NewServer returns a proxy implementation with no iptables/tc dependencies. // The proxy layer overhead is <1ms. func NewServer(cfg ServerConfig) Server { - p := &proxyServer{ + s := &server{ lg: cfg.Logger, - from: cfg.From, - to: cfg.To, - tlsInfo: cfg.TLSInfo, - dialTimeout: cfg.DialTimeout, + from: cfg.From, + to: cfg.To, + tlsInfo: cfg.TLSInfo, + dialTimeout: cfg.DialTimeout, + bufferSize: cfg.BufferSize, retryInterval: cfg.RetryInterval, @@ -188,60 +212,60 @@ func NewServer(cfg ServerConfig) Server { pauseAcceptc: make(chan struct{}), pauseTxc: make(chan struct{}), - blackholeTxc: make(chan struct{}), pauseRxc: make(chan struct{}), - blackholeRxc: make(chan struct{}), } - if p.dialTimeout == 0 { - p.dialTimeout = defaultDialTimeout - } - if p.bufferSize == 0 { - p.bufferSize = defaultBufferSize - } - if p.retryInterval == 0 { - p.retryInterval = defaultRetryInterval - } - if p.lg == nil { - p.lg = defaultLogger - } - close(p.pauseAcceptc) - close(p.pauseTxc) - close(p.pauseRxc) - if strings.HasPrefix(p.from.Scheme, "http") { - p.from.Scheme = "tcp" + if s.dialTimeout == 0 { + s.dialTimeout = defaultDialTimeout } - if strings.HasPrefix(p.to.Scheme, "http") { - p.to.Scheme = "tcp" + if s.bufferSize == 0 { + s.bufferSize = defaultBufferSize + } + if s.retryInterval == 0 { + s.retryInterval = defaultRetryInterval + } + if s.lg == nil { + s.lg = defaultLogger + } + + close(s.pauseAcceptc) + close(s.pauseTxc) + close(s.pauseRxc) + + if strings.HasPrefix(s.from.Scheme, "http") { + s.from.Scheme = "tcp" + } + if strings.HasPrefix(s.to.Scheme, "http") { + s.to.Scheme = "tcp" } var ln net.Listener var err error - if !p.tlsInfo.Empty() { - ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) + if !s.tlsInfo.Empty() { + ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) } else { - ln, err = net.Listen(p.from.Scheme, p.from.Host) + ln, err = net.Listen(s.from.Scheme, s.from.Host) } if err != nil { - p.errc <- err - p.Close() - return p + s.errc <- err + s.Close() + return s } - p.listener = ln + s.listener = ln - p.closeWg.Add(1) - go p.listenAndServe() + s.closeWg.Add(1) + go s.listenAndServe() - p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To())) - return p + s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To())) + return s } -func (p *proxyServer) From() string { - return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host) +func (s *server) From() string { + return fmt.Sprintf("%s://%s", s.from.Scheme, s.from.Host) } -func (p *proxyServer) To() string { - return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host) +func (s *server) To() string { + return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host) } // TODO: implement packet reordering from multiple TCP connections @@ -249,71 +273,71 @@ func (p *proxyServer) To() string { // - https://github.com/coreos/etcd/issues/5614 // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034 -func (p *proxyServer) listenAndServe() { - defer p.closeWg.Done() +func (s *server) listenAndServe() { + defer s.closeWg.Done() - p.lg.Info("proxy is listening on", zap.String("from", p.From())) - close(p.readyc) + s.lg.Info("proxy is listening on", zap.String("from", s.From())) + close(s.readyc) for { - p.acceptMu.Lock() - pausec := p.pauseAcceptc - p.acceptMu.Unlock() + s.acceptMu.Lock() + pausec := s.pauseAcceptc + s.acceptMu.Unlock() select { case <-pausec: - case <-p.donec: + case <-s.donec: return } - p.latencyAcceptMu.RLock() - lat := p.latencyAccept - p.latencyAcceptMu.RUnlock() + s.latencyAcceptMu.RLock() + lat := s.latencyAccept + s.latencyAcceptMu.RUnlock() if lat > 0 { select { case <-time.After(lat): - case <-p.donec: + case <-s.donec: return } } - p.listenerMu.RLock() - ln := p.listener - p.listenerMu.RUnlock() + s.listenerMu.RLock() + ln := s.listener + s.listenerMu.RUnlock() in, err := ln.Accept() if err != nil { select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - p.lg.Debug("listener accept error", zap.Error(err)) + s.lg.Debug("listener accept error", zap.Error(err)) if strings.HasSuffix(err.Error(), "use of closed network connection") { select { - case <-time.After(p.retryInterval): - case <-p.donec: + case <-time.After(s.retryInterval): + case <-s.donec: return } - p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From())) + s.lg.Debug("listener is closed; retry listening on", zap.String("from", s.From())) - if err = p.ResetListener(); err != nil { + if err = s.ResetListener(); err != nil { select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - p.lg.Warn("failed to reset listener", zap.Error(err)) + s.lg.Warn("failed to reset listener", zap.Error(err)) } } @@ -321,62 +345,75 @@ func (p *proxyServer) listenAndServe() { } var out net.Conn - if !p.tlsInfo.Empty() { + if !s.tlsInfo.Empty() { var tp *http.Transport - tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout) + tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout) if err != nil { select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } continue } - out, err = tp.Dial(p.to.Scheme, p.to.Host) + out, err = tp.Dial(s.to.Scheme, s.to.Host) } else { - out, err = net.Dial(p.to.Scheme, p.to.Host) + out, err = net.Dial(s.to.Scheme, s.to.Host) } if err != nil { select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - p.lg.Debug("failed to dial", zap.Error(err)) + s.lg.Debug("failed to dial", zap.Error(err)) continue } go func() { // read incoming bytes from listener, dispatch to outgoing connection - p.transmit(out, in) + s.transmit(out, in) out.Close() in.Close() }() go func() { // read response from outgoing connection, write back to listener - p.receive(in, out) + s.receive(in, out) in.Close() out.Close() }() } } -func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) } -func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) } -func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { - buf := make([]byte, p.bufferSize) +func (s *server) transmit(dst io.Writer, src io.Reader) { + s.ioCopy(dst, src, proxyTx) +} + +func (s *server) receive(dst io.Writer, src io.Reader) { + s.ioCopy(dst, src, proxyRx) +} + +type proxyType uint8 + +const ( + proxyTx proxyType = iota + proxyRx +) + +func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { + buf := make([]byte, s.bufferSize) for { - nr, err := src.Read(buf) + nr1, err := src.Read(buf) if err != nil { if err == io.EOF { return @@ -389,100 +426,110 @@ func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { return } select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - p.lg.Debug("failed to read", zap.Error(err)) + s.lg.Debug("failed to read", zap.Error(err)) return } - if nr == 0 { + if nr1 == 0 { return } - data := buf[:nr] + data := buf[:nr1] + // alters/corrupts/drops data + switch ptype { + case proxyTx: + s.modifyTxMu.RLock() + if s.modifyTx != nil { + data = s.modifyTx(data) + } + s.modifyTxMu.RUnlock() + case proxyRx: + s.modifyRxMu.RLock() + if s.modifyRx != nil { + data = s.modifyRx(data) + } + s.modifyRxMu.RUnlock() + default: + panic("unknown proxy type") + } + nr2 := len(data) + switch ptype { + case proxyTx: + s.lg.Debug( + "modified tx", + zap.String("data-received", humanize.Bytes(uint64(nr1))), + zap.String("data-modified", humanize.Bytes(uint64(nr2))), + zap.String("from", s.From()), + zap.String("to", s.To()), + ) + case proxyRx: + s.lg.Debug( + "modified rx", + zap.String("data-received", humanize.Bytes(uint64(nr1))), + zap.String("data-modified", humanize.Bytes(uint64(nr2))), + zap.String("from", s.To()), + zap.String("to", s.From()), + ) + default: + panic("unknown proxy type") + } + + // pause before packet dropping, blocking, and forwarding var pausec chan struct{} - var blackholec chan struct{} - if proxySend { - p.txMu.Lock() - pausec = p.pauseTxc - blackholec = p.blackholeTxc - p.txMu.Unlock() - } else { - p.rxMu.Lock() - pausec = p.pauseRxc - blackholec = p.blackholeRxc - p.rxMu.Unlock() + switch ptype { + case proxyTx: + s.pauseTxMu.Lock() + pausec = s.pauseTxc + s.pauseTxMu.Unlock() + case proxyRx: + s.pauseRxMu.Lock() + pausec = s.pauseRxc + s.pauseRxMu.Unlock() + default: + panic("unknown proxy type") } select { case <-pausec: - case <-p.donec: + case <-s.donec: return } - blackholed := false - select { - case <-blackholec: - blackholed = true - case <-p.donec: - return - default: - } - if blackholed { - if proxySend { - p.lg.Debug( - "dropped", - zap.String("data-size", humanize.Bytes(uint64(nr))), - zap.String("from", p.From()), - zap.String("to", p.To()), - ) - } else { - p.lg.Debug( - "dropped", - zap.String("data-size", humanize.Bytes(uint64(nr))), - zap.String("from", p.To()), - zap.String("to", p.From()), - ) - } + + // pause first, and then drop packets + if nr2 == 0 { continue } + // block before forwarding var lat time.Duration - if proxySend { - p.latencyTxMu.RLock() - lat = p.latencyTx - p.latencyTxMu.RUnlock() - } else { - p.latencyRxMu.RLock() - lat = p.latencyRx - p.latencyRxMu.RUnlock() + switch ptype { + case proxyTx: + s.latencyTxMu.RLock() + lat = s.latencyTx + s.latencyTxMu.RUnlock() + case proxyRx: + s.latencyRxMu.RLock() + lat = s.latencyRx + s.latencyRxMu.RUnlock() + default: + panic("unknown proxy type") } if lat > 0 { select { case <-time.After(lat): - case <-p.donec: + case <-s.donec: return } } - if proxySend { - p.corruptTxMu.RLock() - if p.corruptTx != nil { - data = p.corruptTx(data) - } - p.corruptTxMu.RUnlock() - } else { - p.corruptRxMu.RLock() - if p.corruptRx != nil { - data = p.corruptRx(data) - } - p.corruptRxMu.RUnlock() - } - + // now forward packets to target var nw int nw, err = dst.Write(data) if err != nil { @@ -490,210 +537,249 @@ func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { return } select { - case p.errc <- err: + case s.errc <- err: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - if proxySend { - p.lg.Debug("failed to write while sending", zap.Error(err)) - } else { - p.lg.Debug("failed to write while receiving", zap.Error(err)) + switch ptype { + case proxyTx: + s.lg.Debug("write fail on tx", zap.Error(err)) + case proxyRx: + s.lg.Debug("write fail on rx", zap.Error(err)) + default: + panic("unknown proxy type") } return } - if nr != nw { + if nr2 != nw { select { - case p.errc <- io.ErrShortWrite: + case s.errc <- io.ErrShortWrite: select { - case <-p.donec: + case <-s.donec: return default: } - case <-p.donec: + case <-s.donec: return } - if proxySend { - p.lg.Debug( - "failed to write while sending; read/write bytes are different", - zap.Int("read-bytes", nr), + switch ptype { + case proxyTx: + s.lg.Debug( + "write fail on tx; read/write bytes are different", + zap.Int("read-bytes", nr1), zap.Int("write-bytes", nw), zap.Error(io.ErrShortWrite), ) - } else { - p.lg.Debug( - "failed to write while receiving; read/write bytes are different", - zap.Int("read-bytes", nr), + case proxyRx: + s.lg.Debug( + "write fail on rx; read/write bytes are different", + zap.Int("read-bytes", nr1), zap.Int("write-bytes", nw), zap.Error(io.ErrShortWrite), ) + default: + panic("unknown proxy type") } return } - if proxySend { - p.lg.Debug( + switch ptype { + case proxyTx: + s.lg.Debug( "transmitted", - zap.String("data-size", humanize.Bytes(uint64(nr))), - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("data-size", humanize.Bytes(uint64(nr1))), + zap.String("from", s.From()), + zap.String("to", s.To()), ) - } else { - p.lg.Debug( + case proxyRx: + s.lg.Debug( "received", - zap.String("data-size", humanize.Bytes(uint64(nr))), - zap.String("from", p.To()), - zap.String("to", p.From()), + zap.String("data-size", humanize.Bytes(uint64(nr1))), + zap.String("from", s.To()), + zap.String("to", s.From()), ) + default: + panic("unknown proxy type") } - } } -func (p *proxyServer) Ready() <-chan struct{} { return p.readyc } -func (p *proxyServer) Done() <-chan struct{} { return p.donec } -func (p *proxyServer) Error() <-chan error { return p.errc } -func (p *proxyServer) Close() (err error) { - p.closeOnce.Do(func() { - close(p.donec) - p.listenerMu.Lock() - if p.listener != nil { - err = p.listener.Close() - p.lg.Info( +func (s *server) Ready() <-chan struct{} { return s.readyc } +func (s *server) Done() <-chan struct{} { return s.donec } +func (s *server) Error() <-chan error { return s.errc } +func (s *server) Close() (err error) { + s.closeOnce.Do(func() { + close(s.donec) + s.listenerMu.Lock() + if s.listener != nil { + err = s.listener.Close() + s.lg.Info( "closed proxy listener", - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("from", s.From()), + zap.String("to", s.To()), ) } - p.lg.Sync() - p.listenerMu.Unlock() + s.lg.Sync() + s.listenerMu.Unlock() }) - p.closeWg.Wait() + s.closeWg.Wait() return err } -func (p *proxyServer) DelayAccept(latency, rv time.Duration) { +func (s *server) PauseAccept() { + s.acceptMu.Lock() + s.pauseAcceptc = make(chan struct{}) + s.acceptMu.Unlock() + + s.lg.Info( + "paused accept", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) UnpauseAccept() { + s.acceptMu.Lock() + select { + case <-s.pauseAcceptc: // already unpaused + case <-s.donec: + s.acceptMu.Unlock() + return + default: + close(s.pauseAcceptc) + } + s.acceptMu.Unlock() + + s.lg.Info( + "unpaused accept", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) DelayAccept(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) - p.latencyAcceptMu.Lock() - p.latencyAccept = d - p.latencyAcceptMu.Unlock() + s.latencyAcceptMu.Lock() + s.latencyAccept = d + s.latencyAcceptMu.Unlock() - p.lg.Info( + s.lg.Info( "set accept latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) UndelayAccept() { - p.latencyAcceptMu.Lock() - d := p.latencyAccept - p.latencyAccept = 0 - p.latencyAcceptMu.Unlock() +func (s *server) UndelayAccept() { + s.latencyAcceptMu.Lock() + d := s.latencyAccept + s.latencyAccept = 0 + s.latencyAcceptMu.Unlock() - p.lg.Info( + s.lg.Info( "removed accept latency", zap.Duration("latency", d), - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) LatencyAccept() time.Duration { - p.latencyAcceptMu.RLock() - d := p.latencyAccept - p.latencyAcceptMu.RUnlock() +func (s *server) LatencyAccept() time.Duration { + s.latencyAcceptMu.RLock() + d := s.latencyAccept + s.latencyAcceptMu.RUnlock() return d } -func (p *proxyServer) DelayTx(latency, rv time.Duration) { +func (s *server) DelayTx(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) - p.latencyTxMu.Lock() - p.latencyTx = d - p.latencyTxMu.Unlock() + s.latencyTxMu.Lock() + s.latencyTx = d + s.latencyTxMu.Unlock() - p.lg.Info( + s.lg.Info( "set transmit latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) UndelayTx() { - p.latencyTxMu.Lock() - d := p.latencyTx - p.latencyTx = 0 - p.latencyTxMu.Unlock() +func (s *server) UndelayTx() { + s.latencyTxMu.Lock() + d := s.latencyTx + s.latencyTx = 0 + s.latencyTxMu.Unlock() - p.lg.Info( + s.lg.Info( "removed transmit latency", zap.Duration("latency", d), - zap.String("from", p.From()), - zap.String("to", p.To()), + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) LatencyTx() time.Duration { - p.latencyTxMu.RLock() - d := p.latencyTx - p.latencyTxMu.RUnlock() +func (s *server) LatencyTx() time.Duration { + s.latencyTxMu.RLock() + d := s.latencyTx + s.latencyTxMu.RUnlock() return d } -func (p *proxyServer) DelayRx(latency, rv time.Duration) { +func (s *server) DelayRx(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) - p.latencyRxMu.Lock() - p.latencyRx = d - p.latencyRxMu.Unlock() + s.latencyRxMu.Lock() + s.latencyRx = d + s.latencyRxMu.Unlock() - p.lg.Info( + s.lg.Info( "set receive latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", p.To()), - zap.String("to", p.From()), + zap.String("from", s.To()), + zap.String("to", s.From()), ) } -func (p *proxyServer) UndelayRx() { - p.latencyRxMu.Lock() - d := p.latencyRx - p.latencyRx = 0 - p.latencyRxMu.Unlock() +func (s *server) UndelayRx() { + s.latencyRxMu.Lock() + d := s.latencyRx + s.latencyRx = 0 + s.latencyRxMu.Unlock() - p.lg.Info( + s.lg.Info( "removed receive latency", zap.Duration("latency", d), - zap.String("from", p.To()), - zap.String("to", p.From()), + zap.String("from", s.To()), + zap.String("to", s.From()), ) } -func (p *proxyServer) LatencyRx() time.Duration { - p.latencyRxMu.RLock() - d := p.latencyRx - p.latencyRxMu.RUnlock() +func (s *server) LatencyRx() time.Duration { + s.latencyRxMu.RLock() + d := s.latencyRx + s.latencyRxMu.RUnlock() return d } @@ -716,213 +802,156 @@ func computeLatency(lat, rv time.Duration) time.Duration { return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) } -func (p *proxyServer) PauseAccept() { - p.acceptMu.Lock() - p.pauseAcceptc = make(chan struct{}) - p.acceptMu.Unlock() +func (s *server) ModifyTx(f func([]byte) []byte) { + s.modifyTxMu.Lock() + s.modifyTx = f + s.modifyTxMu.Unlock() - p.lg.Info( - "paused accepting new connections", - zap.String("from", p.From()), - zap.String("to", p.To()), + s.lg.Info( + "modifying tx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) UnpauseAccept() { - p.acceptMu.Lock() +func (s *server) UnmodifyTx() { + s.modifyTxMu.Lock() + s.modifyTx = nil + s.modifyTxMu.Unlock() + + s.lg.Info( + "unmodifyed tx", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) ModifyRx(f func([]byte) []byte) { + s.modifyRxMu.Lock() + s.modifyRx = f + s.modifyRxMu.Unlock() + s.lg.Info( + "modifying rx", + zap.String("from", s.To()), + zap.String("to", s.From()), + ) +} + +func (s *server) UnmodifyRx() { + s.modifyRxMu.Lock() + s.modifyRx = nil + s.modifyRxMu.Unlock() + + s.lg.Info( + "unmodifyed rx", + zap.String("from", s.To()), + zap.String("to", s.From()), + ) +} + +func (s *server) BlackholeTx() { + s.ModifyTx(func([]byte) []byte { return nil }) + s.lg.Info( + "blackholed tx", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) UnblackholeTx() { + s.UnmodifyTx() + s.lg.Info( + "unblackholed tx", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) BlackholeRx() { + s.ModifyRx(func([]byte) []byte { return nil }) + s.lg.Info( + "blackholed rx", + zap.String("from", s.To()), + zap.String("to", s.From()), + ) +} + +func (s *server) UnblackholeRx() { + s.UnmodifyRx() + s.lg.Info( + "unblackholed rx", + zap.String("from", s.To()), + zap.String("to", s.From()), + ) +} + +func (s *server) PauseTx() { + s.pauseTxMu.Lock() + s.pauseTxc = make(chan struct{}) + s.pauseTxMu.Unlock() + + s.lg.Info( + "paused tx", + zap.String("from", s.From()), + zap.String("to", s.To()), + ) +} + +func (s *server) UnpauseTx() { + s.pauseTxMu.Lock() select { - case <-p.pauseAcceptc: // already unpaused - case <-p.donec: - p.acceptMu.Unlock() + case <-s.pauseTxc: // already unpaused + case <-s.donec: + s.pauseTxMu.Unlock() return default: - close(p.pauseAcceptc) + close(s.pauseTxc) } - p.acceptMu.Unlock() + s.pauseTxMu.Unlock() - p.lg.Info( - "unpaused accepting new connections", - zap.String("from", p.From()), - zap.String("to", p.To()), + s.lg.Info( + "unpaused tx", + zap.String("from", s.From()), + zap.String("to", s.To()), ) } -func (p *proxyServer) PauseTx() { - p.txMu.Lock() - p.pauseTxc = make(chan struct{}) - p.txMu.Unlock() +func (s *server) PauseRx() { + s.pauseRxMu.Lock() + s.pauseRxc = make(chan struct{}) + s.pauseRxMu.Unlock() - p.lg.Info( - "paused transmit listen", - zap.String("from", p.From()), - zap.String("to", p.To()), + s.lg.Info( + "paused rx", + zap.String("from", s.To()), + zap.String("to", s.From()), ) } -func (p *proxyServer) UnpauseTx() { - p.txMu.Lock() +func (s *server) UnpauseRx() { + s.pauseRxMu.Lock() select { - case <-p.pauseTxc: // already unpaused - case <-p.donec: - p.txMu.Unlock() + case <-s.pauseRxc: // already unpaused + case <-s.donec: + s.pauseRxMu.Unlock() return default: - close(p.pauseTxc) + close(s.pauseRxc) } - p.txMu.Unlock() + s.pauseRxMu.Unlock() - p.lg.Info( - "unpaused transmit listen", - zap.String("from", p.From()), - zap.String("to", p.To()), + s.lg.Info( + "unpaused rx", + zap.String("from", s.To()), + zap.String("to", s.From()), ) } -func (p *proxyServer) PauseRx() { - p.rxMu.Lock() - p.pauseRxc = make(chan struct{}) - p.rxMu.Unlock() +func (s *server) ResetListener() error { + s.listenerMu.Lock() + defer s.listenerMu.Unlock() - p.lg.Info( - "paused receive listen", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) UnpauseRx() { - p.rxMu.Lock() - select { - case <-p.pauseRxc: // already unpaused - case <-p.donec: - p.rxMu.Unlock() - return - default: - close(p.pauseRxc) - } - p.rxMu.Unlock() - - p.lg.Info( - "unpaused receive listen", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) BlackholeTx() { - p.txMu.Lock() - select { - case <-p.blackholeTxc: // already blackholed - case <-p.donec: - p.txMu.Unlock() - return - default: - close(p.blackholeTxc) - } - p.txMu.Unlock() - - p.lg.Info( - "blackholed transmit", - zap.String("from", p.From()), - zap.String("to", p.To()), - ) -} - -func (p *proxyServer) UnblackholeTx() { - p.txMu.Lock() - p.blackholeTxc = make(chan struct{}) - p.txMu.Unlock() - - p.lg.Info( - "unblackholed transmit", - zap.String("from", p.From()), - zap.String("to", p.To()), - ) -} - -func (p *proxyServer) BlackholeRx() { - p.rxMu.Lock() - select { - case <-p.blackholeRxc: // already blackholed - case <-p.donec: - p.rxMu.Unlock() - return - default: - close(p.blackholeRxc) - } - p.rxMu.Unlock() - - p.lg.Info( - "blackholed receive", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) UnblackholeRx() { - p.rxMu.Lock() - p.blackholeRxc = make(chan struct{}) - p.rxMu.Unlock() - - p.lg.Info( - "unblackholed receive", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) CorruptTx(f func([]byte) []byte) { - p.corruptTxMu.Lock() - p.corruptTx = f - p.corruptTxMu.Unlock() - - p.lg.Info( - "corrupting transmit", - zap.String("from", p.From()), - zap.String("to", p.To()), - ) -} - -func (p *proxyServer) UncorruptTx() { - p.corruptTxMu.Lock() - p.corruptTx = nil - p.corruptTxMu.Unlock() - - p.lg.Info( - "stopped corrupting transmit", - zap.String("from", p.From()), - zap.String("to", p.To()), - ) -} - -func (p *proxyServer) CorruptRx(f func([]byte) []byte) { - p.corruptRxMu.Lock() - p.corruptRx = f - p.corruptRxMu.Unlock() - p.lg.Info( - "corrupting receive", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) UncorruptRx() { - p.corruptRxMu.Lock() - p.corruptRx = nil - p.corruptRxMu.Unlock() - - p.lg.Info( - "stopped corrupting receive", - zap.String("from", p.To()), - zap.String("to", p.From()), - ) -} - -func (p *proxyServer) ResetListener() error { - p.listenerMu.Lock() - defer p.listenerMu.Unlock() - - if err := p.listener.Close(); err != nil { + if err := s.listener.Close(); err != nil { // already closed if !strings.HasSuffix(err.Error(), "use of closed network connection") { return err @@ -931,19 +960,19 @@ func (p *proxyServer) ResetListener() error { var ln net.Listener var err error - if !p.tlsInfo.Empty() { - ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) + if !s.tlsInfo.Empty() { + ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) } else { - ln, err = net.Listen(p.from.Scheme, p.from.Host) + ln, err = net.Listen(s.from.Scheme, s.from.Host) } if err != nil { return err } - p.listener = ln + s.listener = ln - p.lg.Info( + s.lg.Info( "reset listener on", - zap.String("from", p.From()), + zap.String("from", s.From()), ) return nil } diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index 27e2784af..ace1a7eeb 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -269,6 +269,77 @@ func TestServer_PauseTx(t *testing.T) { } } +func TestServer_ModifyTx_corrupt(t *testing.T) { + scheme := "unix" + srcAddr, dstAddr := newUnixAddr(), newUnixAddr() + defer func() { + os.RemoveAll(srcAddr) + os.RemoveAll(dstAddr) + }() + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) + defer ln.Close() + + p := NewServer(ServerConfig{ + Logger: testLogger, + From: url.URL{Scheme: scheme, Host: srcAddr}, + To: url.URL{Scheme: scheme, Host: dstAddr}, + }) + <-p.Ready() + defer p.Close() + + p.ModifyTx(func(d []byte) []byte { + d[len(d)/2]++ + return d + }) + data := []byte("Hello World!") + send(t, data, scheme, srcAddr, transport.TLSInfo{}) + if d := receive(t, ln); bytes.Equal(d, data) { + t.Fatalf("expected corrupted data, got %q", string(d)) + } + + p.UnmodifyTx() + send(t, data, scheme, srcAddr, transport.TLSInfo{}) + if d := receive(t, ln); !bytes.Equal(d, data) { + t.Fatalf("expected uncorrupted data, got %q", string(d)) + } +} + +func TestServer_ModifyTx_packet_loss(t *testing.T) { + scheme := "unix" + srcAddr, dstAddr := newUnixAddr(), newUnixAddr() + defer func() { + os.RemoveAll(srcAddr) + os.RemoveAll(dstAddr) + }() + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) + defer ln.Close() + + p := NewServer(ServerConfig{ + Logger: testLogger, + From: url.URL{Scheme: scheme, Host: srcAddr}, + To: url.URL{Scheme: scheme, Host: dstAddr}, + }) + <-p.Ready() + defer p.Close() + + // 50% packet loss + p.ModifyTx(func(d []byte) []byte { + half := len(d) / 2 + return d[:half:half] + }) + data := []byte("Hello World!") + send(t, data, scheme, srcAddr, transport.TLSInfo{}) + if d := receive(t, ln); bytes.Equal(d, data) { + t.Fatalf("expected corrupted data, got %q", string(d)) + } + + p.UnmodifyTx() + send(t, data, scheme, srcAddr, transport.TLSInfo{}) + if d := receive(t, ln); !bytes.Equal(d, data) { + t.Fatalf("expected uncorrupted data, got %q", string(d)) + } +} + func TestServer_BlackholeTx(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() @@ -319,41 +390,6 @@ func TestServer_BlackholeTx(t *testing.T) { } } -func TestServer_CorruptTx(t *testing.T) { - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: testLogger, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - <-p.Ready() - defer p.Close() - - p.CorruptTx(func(d []byte) []byte { - d[len(d)/2]++ - return d - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UncorruptTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - func TestServer_Shutdown(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() @@ -372,8 +408,8 @@ func TestServer_Shutdown(t *testing.T) { <-p.Ready() defer p.Close() - px, _ := p.(*proxyServer) - px.listener.Close() + s, _ := p.(*server) + s.listener.Close() time.Sleep(200 * time.Millisecond) data := []byte("Hello World!")