From d44332daa9c95333b77233a5eef4e2f27dc04e0e Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 29 Mar 2018 20:16:06 -0700 Subject: [PATCH] proxy: clean up logging Signed-off-by: Gyuho Lee --- pkg/transport/proxy.go | 273 +++++++++++++++++++++++++++--------- pkg/transport/proxy_test.go | 21 +-- 2 files changed, 222 insertions(+), 72 deletions(-) diff --git a/pkg/transport/proxy.go b/pkg/transport/proxy.go index 8af76d46b..f1d7adf3a 100644 --- a/pkg/transport/proxy.go +++ b/pkg/transport/proxy.go @@ -21,13 +21,13 @@ import ( "net" "net/http" "net/url" - "os" "strings" "sync" "time" + "go.uber.org/zap" + humanize "github.com/dustin/go-humanize" - "google.golang.org/grpc/grpclog" ) // Proxy defines proxy layer that simulates common network faults, @@ -102,12 +102,13 @@ type Proxy interface { } type proxy struct { + logger *zap.Logger + from, to url.URL tlsInfo TLSInfo dialTimeout time.Duration bufferSize int retryInterval time.Duration - logger grpclog.LoggerV2 readyc chan struct{} donec chan struct{} @@ -143,33 +144,42 @@ type proxy struct { // ProxyConfig defines proxy configuration. type ProxyConfig struct { + Logger *zap.Logger From url.URL To url.URL TLSInfo TLSInfo DialTimeout time.Duration BufferSize int RetryInterval time.Duration - Logger grpclog.LoggerV2 } var ( defaultDialTimeout = 3 * time.Second defaultBufferSize = 48 * 1024 defaultRetryInterval = 10 * time.Millisecond - defaultLogger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 0) + defaultLogger *zap.Logger ) +func init() { + var err error + defaultLogger, err = zap.NewProduction() + if err != nil { + panic(err) + } +} + // NewProxy returns a proxy implementation with no iptables/tc dependencies. // The proxy layer overhead is <1ms. func NewProxy(cfg ProxyConfig) Proxy { p := &proxy{ + logger: cfg.Logger, + from: cfg.From, to: cfg.To, tlsInfo: cfg.TLSInfo, dialTimeout: cfg.DialTimeout, bufferSize: cfg.BufferSize, retryInterval: cfg.RetryInterval, - logger: cfg.Logger, readyc: make(chan struct{}), donec: make(chan struct{}), @@ -220,7 +230,8 @@ func NewProxy(cfg ProxyConfig) Proxy { p.closeWg.Add(1) go p.listenAndServe() - p.logger.Infof("started proxying [%s -> %s]", p.From(), p.To()) + + p.logger.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To())) return p } @@ -240,7 +251,7 @@ func (p *proxy) To() string { func (p *proxy) listenAndServe() { defer p.closeWg.Done() - p.logger.Infof("listen %q", p.From()) + p.logger.Info("proxy is listening on", zap.String("from", p.From())) close(p.readyc) for { @@ -280,9 +291,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("listener accept error %q", err.Error()) - } + p.logger.Debug("listener accept error", zap.Error(err)) if strings.HasSuffix(err.Error(), "use of closed network connection") { select { @@ -290,9 +299,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("listener is closed; retry listen %q", p.From()) - } + p.logger.Debug("listener is closed; retry listening on", zap.String("from", p.From())) if err = p.ResetListener(); err != nil { select { @@ -305,7 +312,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - p.logger.Errorf("failed to reset listener %q", err.Error()) + p.logger.Warn("failed to reset listener", zap.Error(err)) } } @@ -344,9 +351,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("dial error %q", err.Error()) - } + p.logger.Debug("failed to dial", zap.Error(err)) continue } @@ -392,9 +397,7 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("read error %q", err.Error()) - } + p.logger.Debug("failed to read", zap.Error(err)) return } if nr == 0 { @@ -429,12 +432,20 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { default: } if blackholed { - if p.logger.V(5) { - if proxySend { - p.logger.Infof("dropped %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } else { - p.logger.Infof("dropped %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } + if proxySend { + p.logger.Debug( + "dropped", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) + } else { + p.logger.Debug( + "dropped", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } continue } @@ -487,12 +498,10 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { case <-p.donec: return } - if p.logger.V(5) { - if proxySend { - p.logger.Errorf("write error while sending (%q)", err.Error()) - } else { - p.logger.Errorf("write error while receiving (%q)", err.Error()) - } + if proxySend { + p.logger.Debug("failed to write while sending", zap.Error(err)) + } else { + p.logger.Debug("failed to write while receiving", zap.Error(err)) } return } @@ -509,20 +518,39 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { return } if proxySend { - p.logger.Errorf("write error while sending (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw) + p.logger.Debug( + "failed to write while sending; read/write bytes are different", + zap.Int("read-bytes", nr), + zap.Int("write-bytes", nw), + zap.Error(io.ErrShortWrite), + ) } else { - p.logger.Errorf("write error while receiving (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw) + p.logger.Debug( + "failed to write while receiving; read/write bytes are different", + zap.Int("read-bytes", nr), + zap.Int("write-bytes", nw), + zap.Error(io.ErrShortWrite), + ) } return } - if p.logger.V(5) { - if proxySend { - p.logger.Infof("transmitted %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } else { - p.logger.Infof("received %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } + if proxySend { + p.logger.Debug( + "transmitted", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) + } else { + p.logger.Debug( + "received", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } + } } @@ -535,8 +563,13 @@ func (p *proxy) Close() (err error) { p.listenerMu.Lock() if p.listener != nil { err = p.listener.Close() - p.logger.Infof("closed proxy listener on %q", p.From()) + p.logger.Info( + "closed proxy listener", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } + p.logger.Sync() p.listenerMu.Unlock() }) p.closeWg.Wait() @@ -551,7 +584,15 @@ func (p *proxy) DelayAccept(latency, rv time.Duration) { p.latencyAcceptMu.Lock() p.latencyAccept = d p.latencyAcceptMu.Unlock() - p.logger.Infof("set accept latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To()) + + p.logger.Info( + "set accept latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UndelayAccept() { @@ -559,7 +600,13 @@ func (p *proxy) UndelayAccept() { d := p.latencyAccept p.latencyAccept = 0 p.latencyAcceptMu.Unlock() - p.logger.Infof("removed accept latency %v [%s -> %s]", d, p.From(), p.To()) + + p.logger.Info( + "removed accept latency", + zap.Duration("latency", d), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) LatencyAccept() time.Duration { @@ -577,7 +624,15 @@ func (p *proxy) DelayTx(latency, rv time.Duration) { p.latencyTxMu.Lock() p.latencyTx = d p.latencyTxMu.Unlock() - p.logger.Infof("set transmit latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To()) + + p.logger.Info( + "set transmit latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UndelayTx() { @@ -585,7 +640,13 @@ func (p *proxy) UndelayTx() { d := p.latencyTx p.latencyTx = 0 p.latencyTxMu.Unlock() - p.logger.Infof("removed transmit latency %v [%s -> %s]", d, p.From(), p.To()) + + p.logger.Info( + "removed transmit latency", + zap.Duration("latency", d), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) LatencyTx() time.Duration { @@ -603,7 +664,15 @@ func (p *proxy) DelayRx(latency, rv time.Duration) { p.latencyRxMu.Lock() p.latencyRx = d p.latencyRxMu.Unlock() - p.logger.Infof("set receive latency %v(%v±%v) [%s <- %s]", d, latency, rv, p.From(), p.To()) + + p.logger.Info( + "set receive latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) UndelayRx() { @@ -611,7 +680,13 @@ func (p *proxy) UndelayRx() { d := p.latencyRx p.latencyRx = 0 p.latencyRxMu.Unlock() - p.logger.Infof("removed receive latency %v [%s <- %s]", d, p.From(), p.To()) + + p.logger.Info( + "removed receive latency", + zap.Duration("latency", d), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) LatencyRx() time.Duration { @@ -644,7 +719,12 @@ func (p *proxy) PauseAccept() { p.acceptMu.Lock() p.pauseAcceptc = make(chan struct{}) p.acceptMu.Unlock() - p.logger.Infof("paused accepting new connections [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "paused accepting new connections", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UnpauseAccept() { @@ -658,14 +738,24 @@ func (p *proxy) UnpauseAccept() { close(p.pauseAcceptc) } p.acceptMu.Unlock() - p.logger.Infof("unpaused accepting new connections [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "unpaused accepting new connections", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) PauseTx() { p.txMu.Lock() p.pauseTxc = make(chan struct{}) p.txMu.Unlock() - p.logger.Infof("paused transmit listen [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "paused transmit listen", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UnpauseTx() { @@ -679,14 +769,24 @@ func (p *proxy) UnpauseTx() { close(p.pauseTxc) } p.txMu.Unlock() - p.logger.Infof("unpaused transmit listen [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "unpaused transmit listen", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) PauseRx() { p.rxMu.Lock() p.pauseRxc = make(chan struct{}) p.rxMu.Unlock() - p.logger.Infof("paused receive listen [%s <- %s]", p.From(), p.To()) + + p.logger.Info( + "paused receive listen", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) UnpauseRx() { @@ -700,7 +800,12 @@ func (p *proxy) UnpauseRx() { close(p.pauseRxc) } p.rxMu.Unlock() - p.logger.Infof("unpaused receive listen [%s <- %s]", p.From(), p.To()) + + p.logger.Info( + "unpaused receive listen", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) BlackholeTx() { @@ -714,14 +819,24 @@ func (p *proxy) BlackholeTx() { close(p.blackholeTxc) } p.txMu.Unlock() - p.logger.Infof("blackholed transmit [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "blackholed transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UnblackholeTx() { p.txMu.Lock() p.blackholeTxc = make(chan struct{}) p.txMu.Unlock() - p.logger.Infof("unblackholed transmit [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "unblackholed transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) BlackholeRx() { @@ -735,42 +850,71 @@ func (p *proxy) BlackholeRx() { close(p.blackholeRxc) } p.rxMu.Unlock() - p.logger.Infof("blackholed receive [%s <- %s]", p.From(), p.To()) + + p.logger.Info( + "blackholed receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) UnblackholeRx() { p.rxMu.Lock() p.blackholeRxc = make(chan struct{}) p.rxMu.Unlock() - p.logger.Infof("unblackholed receive [%s <- %s]", p.From(), p.To()) + + p.logger.Info( + "unblackholed receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) CorruptTx(f func([]byte) []byte) { p.corruptTxMu.Lock() p.corruptTx = f p.corruptTxMu.Unlock() - p.logger.Infof("corrupting transmit [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "corrupting transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) UncorruptTx() { p.corruptTxMu.Lock() p.corruptTx = nil p.corruptTxMu.Unlock() - p.logger.Infof("stopped corrupting transmit [%s -> %s]", p.From(), p.To()) + + p.logger.Info( + "stopped corrupting transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } func (p *proxy) CorruptRx(f func([]byte) []byte) { p.corruptRxMu.Lock() p.corruptRx = f p.corruptRxMu.Unlock() - p.logger.Infof("corrupting receive [%s <- %s]", p.From(), p.To()) + p.logger.Info( + "corrupting receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) UncorruptRx() { p.corruptRxMu.Lock() p.corruptRx = nil p.corruptRxMu.Unlock() - p.logger.Infof("stopped corrupting receive [%s <- %s]", p.From(), p.To()) + + p.logger.Info( + "stopped corrupting receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } func (p *proxy) ResetListener() error { @@ -796,6 +940,9 @@ func (p *proxy) ResetListener() error { } p.listener = ln - p.logger.Infof("reset listener %q", p.From()) + p.logger.Info( + "reset listener on", + zap.String("from", p.From()), + ) return nil } diff --git a/pkg/transport/proxy_test.go b/pkg/transport/proxy_test.go index 58f9e253d..12ae0f6b5 100644 --- a/pkg/transport/proxy_test.go +++ b/pkg/transport/proxy_test.go @@ -28,9 +28,12 @@ import ( "testing" "time" - "google.golang.org/grpc/grpclog" + "go.uber.org/zap" ) +// enable DebugLevel +var testLogger = zap.NewExample() + var testTLSInfo = TLSInfo{ KeyFile: "./fixtures/server.key.insecure", CertFile: "./fixtures/server.crt", @@ -67,9 +70,9 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) { defer ln.Close() cfg := ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo @@ -176,9 +179,9 @@ func testProxyDelayAccept(t *testing.T, secure bool) { defer ln.Close() cfg := ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo @@ -229,9 +232,9 @@ func TestProxy_PauseTx(t *testing.T) { defer ln.Close() p := NewProxy(ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -275,9 +278,9 @@ func TestProxy_BlackholeTx(t *testing.T) { defer ln.Close() p := NewProxy(ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -325,9 +328,9 @@ func TestProxy_CorruptTx(t *testing.T) { defer ln.Close() p := NewProxy(ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -360,9 +363,9 @@ func TestProxy_Shutdown(t *testing.T) { defer ln.Close() p := NewProxy(ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -390,9 +393,9 @@ func TestProxy_ShutdownListener(t *testing.T) { defer ln.Close() p := NewProxy(ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -462,9 +465,9 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) { time.Sleep(200 * time.Millisecond) cfg := ProxyConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo