diff --git a/pkg/proxy/doc.go b/pkg/proxy/doc.go new file mode 100644 index 000000000..fc81aa20b --- /dev/null +++ b/pkg/proxy/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package proxy implements proxy servers for network fault testing. +package proxy diff --git a/pkg/proxy/fixtures/ca-csr.json b/pkg/proxy/fixtures/ca-csr.json new file mode 100644 index 000000000..ecafabaad --- /dev/null +++ b/pkg/proxy/fixtures/ca-csr.json @@ -0,0 +1,19 @@ +{ + "key": { + "algo": "rsa", + "size": 2048 + }, + "names": [ + { + "O": "etcd", + "OU": "etcd Security", + "L": "San Francisco", + "ST": "California", + "C": "USA" + } + ], + "CN": "ca", + "ca": { + "expiry": "87600h" + } +} diff --git a/pkg/proxy/fixtures/ca.crt b/pkg/proxy/fixtures/ca.crt new file mode 100644 index 000000000..0947aa383 --- /dev/null +++ b/pkg/proxy/fixtures/ca.crt @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDsTCCApmgAwIBAgIUZzOo4zcHY/nEXY1PD8A7povXlWUwDQYJKoZIhvcNAQEL +BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH +Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl +Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx +MDBaMG8xDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE +BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT +ZWN1cml0eTELMAkGA1UEAxMCY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDD4Ys48LDWGyojj3Rcr6fnESY+UycaaGoTXADWLPmm+sQR3KcsJxF4054S +d2G+NBfJHZvTHhVqOeqZxNtoqgje4paY2A5TbWBdV+xoGfbakwwngiX1yeF1I54k +KH19zb8rBKAm7xixO60hE2CIYzMuw9lDkwoHpI6/PJdy7jwtytbo2Oac512JiO9Y +dHp9dr3mrCzoKEBRtL1asRKfzp6gBC5rIw5T4jrq37feerV4pDEJX7fvexxVocVm +tT4bmMq3Ap6OFFAzmE/ITI8pXvFaOd9lyebNXQmrreKJLUfEIZa6JulLCYxfkJ8z ++CcNLyn6ZXNMaIZ8G9Hm6VRdRi8/AgMBAAGjRTBDMA4GA1UdDwEB/wQEAwIBBjAS +BgNVHRMBAf8ECDAGAQH/AgECMB0GA1UdDgQWBBRDLNYEX8XI7nM53k1rUR+mpTjQ +NTANBgkqhkiG9w0BAQsFAAOCAQEACDe3Fa1KE/rvVtyCLW/IBfKV01NShFTsb6x8 +GrPEQ6NJLZQ2MzdyJgAF2a/nZ9KVgrhGXoyoZBCKP9Dd/JDzSSZcBztfNK8dRv2A +XHBBF6tZ19I+XY9c7/CfhJ2CEYJpeN9r3GKSqV+njkmg8n/On2BTlFsij88plK8H +ORyemc1nQI+ARPSu2r3rJbYa4yI2U6w4L4BTCVImg3bX50GImmXGlwvnJMFik1FX ++0hdfetRxxMZ1pm2Uy6099KkULnSKabZGwRiBUHQJYh0EeuAOQ4a6MG5DRkURWNs +dInjPOLY9/7S5DQKwz/NtqXA8EEymZosHxpiRp+zzKB4XaV9Ig== +-----END CERTIFICATE----- diff --git a/pkg/proxy/fixtures/gencert.json b/pkg/proxy/fixtures/gencert.json new file mode 100644 index 000000000..09b67267b --- /dev/null +++ b/pkg/proxy/fixtures/gencert.json @@ -0,0 +1,13 @@ +{ + "signing": { + "default": { + "usages": [ + "signing", + "key encipherment", + "server auth", + "client auth" + ], + "expiry": "87600h" + } + } +} diff --git a/pkg/proxy/fixtures/gencerts.sh b/pkg/proxy/fixtures/gencerts.sh new file mode 100755 index 000000000..fdf3a1086 --- /dev/null +++ b/pkg/proxy/fixtures/gencerts.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +if ! [[ "$0" =~ "./gencerts.sh" ]]; then + echo "must be run from 'fixtures'" + exit 255 +fi + +if ! which cfssl; then + echo "cfssl is not installed" + exit 255 +fi + +cfssl gencert --initca=true ./ca-csr.json | cfssljson --bare ./ca +mv ca.pem ca.crt +openssl x509 -in ca.crt -noout -text + +# generate DNS: localhost, IP: 127.0.0.1, CN: example.com certificates +cfssl gencert \ + --ca ./ca.crt \ + --ca-key ./ca-key.pem \ + --config ./gencert.json \ + ./server-ca-csr.json | cfssljson --bare ./server +mv server.pem server.crt +mv server-key.pem server.key.insecure + +rm -f *.csr *.pem *.stderr *.txt diff --git a/pkg/proxy/fixtures/server-ca-csr.json b/pkg/proxy/fixtures/server-ca-csr.json new file mode 100644 index 000000000..272cf841d --- /dev/null +++ b/pkg/proxy/fixtures/server-ca-csr.json @@ -0,0 +1,20 @@ +{ + "key": { + "algo": "rsa", + "size": 2048 + }, + "names": [ + { + "O": "etcd", + "OU": "etcd Security", + "L": "San Francisco", + "ST": "California", + "C": "USA" + } + ], + "CN": "example.com", + "hosts": [ + "127.0.0.1", + "localhost" + ] +} diff --git a/pkg/proxy/fixtures/server.crt b/pkg/proxy/fixtures/server.crt new file mode 100644 index 000000000..1a310b5a8 --- /dev/null +++ b/pkg/proxy/fixtures/server.crt @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIIEEjCCAvqgAwIBAgIUIYc+vmysep1pDc2ua/VQEeMFQVAwDQYJKoZIhvcNAQEL +BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH +Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl +Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx +MDBaMHgxDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE +BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT +ZWN1cml0eTEUMBIGA1UEAxMLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQDEq7aT2BQZfmJ2xpUm8xWJlN0c3cOLVZRH9mIrEutIHmip +BYq3ZIq3q52w+T3sMcaJNMGjCteE8Lu+G9YSmtfZMAWnkaM02KOjVMkkQcK7Z4vM +lOUjlO+dsvhfmw3CPghqSs6M1K2CTqhuEiXdOBofuEMmwKNRgkV/jT92PUs0h8kq +loc/I3/H+hx/ZJ1i0S0xkZKpaImc0oZ9ZDo07biMrsUIzjwbN69mEs+CtVkah4sy +k6UyRoU2k21lyRTK0LxNjWc9ylzDNUuf6DwduU7lPZsqTaJrFNAAPpOlI4k2EcjL +3zD8amKkJGDm+PQz97PbTA381ec4ZAtB8volxCebAgMBAAGjgZwwgZkwDgYDVR0P +AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMB +Af8EAjAAMB0GA1UdDgQWBBTTZQnMn5tuUgVE+8c9W0hmbghGoDAfBgNVHSMEGDAW +gBRDLNYEX8XI7nM53k1rUR+mpTjQNTAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8A +AAEwDQYJKoZIhvcNAQELBQADggEBAKUQVj0YDuxg4tinlOZhp4ge7tCA+gL7vV+Q +iDrkWfOlGjDgwYqWMYDXMHWKIW9ea8LzyI/bVEcaHlnBmNOYuS7g47EWNiU7WUA5 +iTkm3CKA5zHFFPcXHW0GQeCQrX9y3SepKS3cP8TAyZFfC/FvV24Kn1oQhJbEe0ZV +In/vPHssW7jlVe0FGVUn7FutRQgiA1pTAtS6AP4LeZ9O41DTWkPqV4nBgcxlvkgD +KjEoXXSb5C0LoR5zwAo9zB3RtmqnmvkHAOv3G92YctdS2VbCmd8CNLj9H7gMmQiH +ThsStVOhb2uo6Ni4PgzUIYKGTd4ZjUXCYxFKck//ajDyCHlL8v4= +-----END CERTIFICATE----- diff --git a/pkg/proxy/fixtures/server.key.insecure b/pkg/proxy/fixtures/server.key.insecure new file mode 100644 index 000000000..0ab2896bf --- /dev/null +++ b/pkg/proxy/fixtures/server.key.insecure @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAxKu2k9gUGX5idsaVJvMViZTdHN3Di1WUR/ZiKxLrSB5oqQWK +t2SKt6udsPk97DHGiTTBowrXhPC7vhvWEprX2TAFp5GjNNijo1TJJEHCu2eLzJTl +I5TvnbL4X5sNwj4IakrOjNStgk6obhIl3TgaH7hDJsCjUYJFf40/dj1LNIfJKpaH +PyN/x/ocf2SdYtEtMZGSqWiJnNKGfWQ6NO24jK7FCM48GzevZhLPgrVZGoeLMpOl +MkaFNpNtZckUytC8TY1nPcpcwzVLn+g8HblO5T2bKk2iaxTQAD6TpSOJNhHIy98w +/GpipCRg5vj0M/ez20wN/NXnOGQLQfL6JcQnmwIDAQABAoIBAGTx1eaQk9B6BEP+ +rXOudTGGzO8SDFop9M/y8HQ3Y7hCk2mdxJNY8bJQTcIWS+g9rC+kencbC3/aqCJt +2zT1cTCy61QU9nYbc/JThGIttqvF/AVnryzSNyL0R3Oa/Dbk7CDSgK3cQ6qMgPru +Ka0gLJh3VVBAtBMUEGPltdsUntM4sHTh5FAabP0ioBJ1QLG6Aak7LOQikjBEFJoc +Tea4uRsE7IreP5Mn7UW92nkt1ey5UGzBtNNtpHbVaHmfQojwlwkLtnV35sumbvK6 +6KTMNREZv6xSIMwkYxm1zRE3Cus/1jGIc8MZF0BxgcCR+G37l+BKwL8CSymHPxhH +dvGxoPECgYEA3STp52CbI/KyVfvjxK2OIex/NV1jKh85wQsLtkaRv3/a/EEg7MV7 +54dEvo5KKOZXfeOd9r9G9h1RffjSD9MhxfPhyGwuOcqa8IE1zNwlY/v7KL7HtDIf +2mrXWF5Klafh8aXYcaRH0ZSLnl/nXUXYht4/0NRGiXnttUgqs6hvY70CgYEA46tO +J5QkgF3YVY0gx10wRCAnnKLkAaHdtxtteXOJh79xsGXQ4LLngc+mz1hLt+TNJza+ +BZhoWwY/ZgyiTH0pebGr/U0QUMoUHlGgjgj3Aa/XFpOhtyLU+IU/PYl0BUz9dqsN +TDtv6p/HQhfd98vUNsbACQda+YAo+oRdO5kLQjcCgYB3OAZNcXxRte5EgoY5KqN8 +UGYH2++w7qKRGqZWvtamGYRyB557Zr+0gu0hmc4LHJrASGyJcHcOCaI8Ol7snxMP +B7qJ9SA6kapTzCS361rQ+zBct/UrhPY9JuovPq4Q3i/luVXldf4t01otqGAvnY7s +rnZS242nYa8v0tcKgdyDNQKBgB3Z60BzQyn1pBTrkT2ysU5tbOQz03OHVrvYg80l +4gWDi5OWdgHQU1yI7pVHPX5aKLAYlGfFaQFuW0e1Jl6jFpoXOrbWsOn25RZom4Wk +FUcKWEhkiRKrJYOEbRtTd3vucVlq6i5xqKX51zWKTZddCXE5NBq69Sm7rSPT0Sms +UnaXAoGAXYAE5slvjcylJpMV4lxTBmNtA9+pw1T7I379mIyqZ0OS25nmpskHU7FR +SQDSRHw7hHuyjEHyhMoHEGLfUMIltQoi+pcrieVQelJdSuX7VInzHPAR5RppUVFl +jOZZKlIiqs+UfCoOgsIblXuw7a/ATnAnXakutSFgHU1lN1gN02U= +-----END RSA PRIVATE KEY----- diff --git a/pkg/transport/proxy.go b/pkg/proxy/server.go similarity index 64% rename from pkg/transport/proxy.go rename to pkg/proxy/server.go index 8af76d46b..311af966f 100644 --- a/pkg/transport/proxy.go +++ b/pkg/proxy/server.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package transport +package proxy import ( "fmt" @@ -21,18 +21,19 @@ import ( "net" "net/http" "net/url" - "os" "strings" "sync" "time" + "github.com/coreos/etcd/pkg/transport" + humanize "github.com/dustin/go-humanize" - "google.golang.org/grpc/grpclog" + "go.uber.org/zap" ) -// Proxy defines proxy layer that simulates common network faults, +// Server defines proxy server layer that simulates common network faults, // such as latency spikes, packet drop/corruption, etc.. -type Proxy interface { +type Server interface { // From returns proxy source address in "scheme://host:port" format. From() string // To returns proxy destination address in "scheme://host:port" format. @@ -101,13 +102,14 @@ type Proxy interface { ResetListener() error } -type proxy struct { +type proxyServer struct { + lg *zap.Logger + from, to url.URL - tlsInfo TLSInfo + tlsInfo transport.TLSInfo dialTimeout time.Duration bufferSize int retryInterval time.Duration - logger grpclog.LoggerV2 readyc chan struct{} donec chan struct{} @@ -141,35 +143,44 @@ type proxy struct { blackholeRxc chan struct{} } -// ProxyConfig defines proxy configuration. -type ProxyConfig struct { +// ServerConfig defines proxy server configuration. +type ServerConfig struct { + Logger *zap.Logger From url.URL To url.URL - TLSInfo TLSInfo + TLSInfo transport.TLSInfo DialTimeout time.Duration BufferSize int RetryInterval time.Duration - Logger grpclog.LoggerV2 } var ( defaultDialTimeout = 3 * time.Second defaultBufferSize = 48 * 1024 defaultRetryInterval = 10 * time.Millisecond - defaultLogger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 0) + defaultLogger *zap.Logger ) -// NewProxy returns a proxy implementation with no iptables/tc dependencies. +func init() { + var err error + defaultLogger, err = zap.NewProduction() + if err != nil { + panic(err) + } +} + +// NewServer returns a proxy implementation with no iptables/tc dependencies. // The proxy layer overhead is <1ms. -func NewProxy(cfg ProxyConfig) Proxy { - p := &proxy{ +func NewServer(cfg ServerConfig) Server { + p := &proxyServer{ + lg: cfg.Logger, + from: cfg.From, to: cfg.To, tlsInfo: cfg.TLSInfo, dialTimeout: cfg.DialTimeout, bufferSize: cfg.BufferSize, retryInterval: cfg.RetryInterval, - logger: cfg.Logger, readyc: make(chan struct{}), donec: make(chan struct{}), @@ -190,8 +201,8 @@ func NewProxy(cfg ProxyConfig) Proxy { if p.retryInterval == 0 { p.retryInterval = defaultRetryInterval } - if p.logger == nil { - p.logger = defaultLogger + if p.lg == nil { + p.lg = defaultLogger } close(p.pauseAcceptc) close(p.pauseTxc) @@ -207,7 +218,7 @@ func NewProxy(cfg ProxyConfig) Proxy { var ln net.Listener var err error if !p.tlsInfo.Empty() { - ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) + ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) } else { ln, err = net.Listen(p.from.Scheme, p.from.Host) } @@ -220,15 +231,16 @@ func NewProxy(cfg ProxyConfig) Proxy { p.closeWg.Add(1) go p.listenAndServe() - p.logger.Infof("started proxying [%s -> %s]", p.From(), p.To()) + + p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To())) return p } -func (p *proxy) From() string { +func (p *proxyServer) From() string { return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host) } -func (p *proxy) To() string { +func (p *proxyServer) To() string { return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host) } @@ -237,10 +249,10 @@ func (p *proxy) To() string { // - https://github.com/coreos/etcd/issues/5614 // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034 -func (p *proxy) listenAndServe() { +func (p *proxyServer) listenAndServe() { defer p.closeWg.Done() - p.logger.Infof("listen %q", p.From()) + p.lg.Info("proxy is listening on", zap.String("from", p.From())) close(p.readyc) for { @@ -280,9 +292,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("listener accept error %q", err.Error()) - } + p.lg.Debug("listener accept error", zap.Error(err)) if strings.HasSuffix(err.Error(), "use of closed network connection") { select { @@ -290,9 +300,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("listener is closed; retry listen %q", p.From()) - } + p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From())) if err = p.ResetListener(); err != nil { select { @@ -305,7 +313,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - p.logger.Errorf("failed to reset listener %q", err.Error()) + p.lg.Warn("failed to reset listener", zap.Error(err)) } } @@ -315,7 +323,7 @@ func (p *proxy) listenAndServe() { var out net.Conn if !p.tlsInfo.Empty() { var tp *http.Transport - tp, err = NewTransport(p.tlsInfo, p.dialTimeout) + tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout) if err != nil { select { case p.errc <- err: @@ -344,9 +352,7 @@ func (p *proxy) listenAndServe() { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("dial error %q", err.Error()) - } + p.lg.Debug("failed to dial", zap.Error(err)) continue } @@ -365,9 +371,9 @@ func (p *proxy) listenAndServe() { } } -func (p *proxy) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) } -func (p *proxy) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) } -func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { +func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) } +func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) } +func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { buf := make([]byte, p.bufferSize) for { nr, err := src.Read(buf) @@ -392,9 +398,7 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { case <-p.donec: return } - if p.logger.V(5) { - p.logger.Errorf("read error %q", err.Error()) - } + p.lg.Debug("failed to read", zap.Error(err)) return } if nr == 0 { @@ -429,12 +433,20 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { default: } if blackholed { - if p.logger.V(5) { - if proxySend { - p.logger.Infof("dropped %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } else { - p.logger.Infof("dropped %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } + if proxySend { + p.lg.Debug( + "dropped", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) + } else { + p.lg.Debug( + "dropped", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } continue } @@ -487,12 +499,10 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { case <-p.donec: return } - if p.logger.V(5) { - if proxySend { - p.logger.Errorf("write error while sending (%q)", err.Error()) - } else { - p.logger.Errorf("write error while receiving (%q)", err.Error()) - } + if proxySend { + p.lg.Debug("failed to write while sending", zap.Error(err)) + } else { + p.lg.Debug("failed to write while receiving", zap.Error(err)) } return } @@ -509,41 +519,65 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { return } if proxySend { - p.logger.Errorf("write error while sending (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw) + p.lg.Debug( + "failed to write while sending; read/write bytes are different", + zap.Int("read-bytes", nr), + zap.Int("write-bytes", nw), + zap.Error(io.ErrShortWrite), + ) } else { - p.logger.Errorf("write error while receiving (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw) + p.lg.Debug( + "failed to write while receiving; read/write bytes are different", + zap.Int("read-bytes", nr), + zap.Int("write-bytes", nw), + zap.Error(io.ErrShortWrite), + ) } return } - if p.logger.V(5) { - if proxySend { - p.logger.Infof("transmitted %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } else { - p.logger.Infof("received %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To()) - } + if proxySend { + p.lg.Debug( + "transmitted", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) + } else { + p.lg.Debug( + "received", + zap.String("data-size", humanize.Bytes(uint64(nr))), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } + } } -func (p *proxy) Ready() <-chan struct{} { return p.readyc } -func (p *proxy) Done() <-chan struct{} { return p.donec } -func (p *proxy) Error() <-chan error { return p.errc } -func (p *proxy) Close() (err error) { +func (p *proxyServer) Ready() <-chan struct{} { return p.readyc } +func (p *proxyServer) Done() <-chan struct{} { return p.donec } +func (p *proxyServer) Error() <-chan error { return p.errc } +func (p *proxyServer) Close() (err error) { p.closeOnce.Do(func() { close(p.donec) p.listenerMu.Lock() if p.listener != nil { err = p.listener.Close() - p.logger.Infof("closed proxy listener on %q", p.From()) + p.lg.Info( + "closed proxy listener", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } + p.lg.Sync() p.listenerMu.Unlock() }) p.closeWg.Wait() return err } -func (p *proxy) DelayAccept(latency, rv time.Duration) { +func (p *proxyServer) DelayAccept(latency, rv time.Duration) { if latency <= 0 { return } @@ -551,25 +585,39 @@ func (p *proxy) DelayAccept(latency, rv time.Duration) { p.latencyAcceptMu.Lock() p.latencyAccept = d p.latencyAcceptMu.Unlock() - p.logger.Infof("set accept latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To()) + + p.lg.Info( + "set accept latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UndelayAccept() { +func (p *proxyServer) UndelayAccept() { p.latencyAcceptMu.Lock() d := p.latencyAccept p.latencyAccept = 0 p.latencyAcceptMu.Unlock() - p.logger.Infof("removed accept latency %v [%s -> %s]", d, p.From(), p.To()) + + p.lg.Info( + "removed accept latency", + zap.Duration("latency", d), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) LatencyAccept() time.Duration { +func (p *proxyServer) LatencyAccept() time.Duration { p.latencyAcceptMu.RLock() d := p.latencyAccept p.latencyAcceptMu.RUnlock() return d } -func (p *proxy) DelayTx(latency, rv time.Duration) { +func (p *proxyServer) DelayTx(latency, rv time.Duration) { if latency <= 0 { return } @@ -577,25 +625,39 @@ func (p *proxy) DelayTx(latency, rv time.Duration) { p.latencyTxMu.Lock() p.latencyTx = d p.latencyTxMu.Unlock() - p.logger.Infof("set transmit latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To()) + + p.lg.Info( + "set transmit latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UndelayTx() { +func (p *proxyServer) UndelayTx() { p.latencyTxMu.Lock() d := p.latencyTx p.latencyTx = 0 p.latencyTxMu.Unlock() - p.logger.Infof("removed transmit latency %v [%s -> %s]", d, p.From(), p.To()) + + p.lg.Info( + "removed transmit latency", + zap.Duration("latency", d), + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) LatencyTx() time.Duration { +func (p *proxyServer) LatencyTx() time.Duration { p.latencyTxMu.RLock() d := p.latencyTx p.latencyTxMu.RUnlock() return d } -func (p *proxy) DelayRx(latency, rv time.Duration) { +func (p *proxyServer) DelayRx(latency, rv time.Duration) { if latency <= 0 { return } @@ -603,18 +665,32 @@ func (p *proxy) DelayRx(latency, rv time.Duration) { p.latencyRxMu.Lock() p.latencyRx = d p.latencyRxMu.Unlock() - p.logger.Infof("set receive latency %v(%v±%v) [%s <- %s]", d, latency, rv, p.From(), p.To()) + + p.lg.Info( + "set receive latency", + zap.Duration("latency", d), + zap.Duration("given-latency", latency), + zap.Duration("given-latency-random-variable", rv), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) UndelayRx() { +func (p *proxyServer) UndelayRx() { p.latencyRxMu.Lock() d := p.latencyRx p.latencyRx = 0 p.latencyRxMu.Unlock() - p.logger.Infof("removed receive latency %v [%s <- %s]", d, p.From(), p.To()) + + p.lg.Info( + "removed receive latency", + zap.Duration("latency", d), + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) LatencyRx() time.Duration { +func (p *proxyServer) LatencyRx() time.Duration { p.latencyRxMu.RLock() d := p.latencyRx p.latencyRxMu.RUnlock() @@ -640,14 +716,19 @@ func computeLatency(lat, rv time.Duration) time.Duration { return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) } -func (p *proxy) PauseAccept() { +func (p *proxyServer) PauseAccept() { p.acceptMu.Lock() p.pauseAcceptc = make(chan struct{}) p.acceptMu.Unlock() - p.logger.Infof("paused accepting new connections [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "paused accepting new connections", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UnpauseAccept() { +func (p *proxyServer) UnpauseAccept() { p.acceptMu.Lock() select { case <-p.pauseAcceptc: // already unpaused @@ -658,17 +739,27 @@ func (p *proxy) UnpauseAccept() { close(p.pauseAcceptc) } p.acceptMu.Unlock() - p.logger.Infof("unpaused accepting new connections [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "unpaused accepting new connections", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) PauseTx() { +func (p *proxyServer) PauseTx() { p.txMu.Lock() p.pauseTxc = make(chan struct{}) p.txMu.Unlock() - p.logger.Infof("paused transmit listen [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "paused transmit listen", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UnpauseTx() { +func (p *proxyServer) UnpauseTx() { p.txMu.Lock() select { case <-p.pauseTxc: // already unpaused @@ -679,17 +770,27 @@ func (p *proxy) UnpauseTx() { close(p.pauseTxc) } p.txMu.Unlock() - p.logger.Infof("unpaused transmit listen [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "unpaused transmit listen", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) PauseRx() { +func (p *proxyServer) PauseRx() { p.rxMu.Lock() p.pauseRxc = make(chan struct{}) p.rxMu.Unlock() - p.logger.Infof("paused receive listen [%s <- %s]", p.From(), p.To()) + + p.lg.Info( + "paused receive listen", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) UnpauseRx() { +func (p *proxyServer) UnpauseRx() { p.rxMu.Lock() select { case <-p.pauseRxc: // already unpaused @@ -700,10 +801,15 @@ func (p *proxy) UnpauseRx() { close(p.pauseRxc) } p.rxMu.Unlock() - p.logger.Infof("unpaused receive listen [%s <- %s]", p.From(), p.To()) + + p.lg.Info( + "unpaused receive listen", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) BlackholeTx() { +func (p *proxyServer) BlackholeTx() { p.txMu.Lock() select { case <-p.blackholeTxc: // already blackholed @@ -714,17 +820,27 @@ func (p *proxy) BlackholeTx() { close(p.blackholeTxc) } p.txMu.Unlock() - p.logger.Infof("blackholed transmit [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "blackholed transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UnblackholeTx() { +func (p *proxyServer) UnblackholeTx() { p.txMu.Lock() p.blackholeTxc = make(chan struct{}) p.txMu.Unlock() - p.logger.Infof("unblackholed transmit [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "unblackholed transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) BlackholeRx() { +func (p *proxyServer) BlackholeRx() { p.rxMu.Lock() select { case <-p.blackholeRxc: // already blackholed @@ -735,45 +851,74 @@ func (p *proxy) BlackholeRx() { close(p.blackholeRxc) } p.rxMu.Unlock() - p.logger.Infof("blackholed receive [%s <- %s]", p.From(), p.To()) + + p.lg.Info( + "blackholed receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) UnblackholeRx() { +func (p *proxyServer) UnblackholeRx() { p.rxMu.Lock() p.blackholeRxc = make(chan struct{}) p.rxMu.Unlock() - p.logger.Infof("unblackholed receive [%s <- %s]", p.From(), p.To()) + + p.lg.Info( + "unblackholed receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) CorruptTx(f func([]byte) []byte) { +func (p *proxyServer) CorruptTx(f func([]byte) []byte) { p.corruptTxMu.Lock() p.corruptTx = f p.corruptTxMu.Unlock() - p.logger.Infof("corrupting transmit [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "corrupting transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) UncorruptTx() { +func (p *proxyServer) UncorruptTx() { p.corruptTxMu.Lock() p.corruptTx = nil p.corruptTxMu.Unlock() - p.logger.Infof("stopped corrupting transmit [%s -> %s]", p.From(), p.To()) + + p.lg.Info( + "stopped corrupting transmit", + zap.String("from", p.From()), + zap.String("to", p.To()), + ) } -func (p *proxy) CorruptRx(f func([]byte) []byte) { +func (p *proxyServer) CorruptRx(f func([]byte) []byte) { p.corruptRxMu.Lock() p.corruptRx = f p.corruptRxMu.Unlock() - p.logger.Infof("corrupting receive [%s <- %s]", p.From(), p.To()) + p.lg.Info( + "corrupting receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) UncorruptRx() { +func (p *proxyServer) UncorruptRx() { p.corruptRxMu.Lock() p.corruptRx = nil p.corruptRxMu.Unlock() - p.logger.Infof("stopped corrupting receive [%s <- %s]", p.From(), p.To()) + + p.lg.Info( + "stopped corrupting receive", + zap.String("from", p.To()), + zap.String("to", p.From()), + ) } -func (p *proxy) ResetListener() error { +func (p *proxyServer) ResetListener() error { p.listenerMu.Lock() defer p.listenerMu.Unlock() @@ -787,7 +932,7 @@ func (p *proxy) ResetListener() error { var ln net.Listener var err error if !p.tlsInfo.Empty() { - ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) + ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) } else { ln, err = net.Listen(p.from.Scheme, p.from.Host) } @@ -796,6 +941,9 @@ func (p *proxy) ResetListener() error { } p.listener = ln - p.logger.Infof("reset listener %q", p.From()) + p.lg.Info( + "reset listener on", + zap.String("from", p.From()), + ) return nil } diff --git a/pkg/transport/proxy_test.go b/pkg/proxy/server_test.go similarity index 74% rename from pkg/transport/proxy_test.go rename to pkg/proxy/server_test.go index 58f9e253d..27e2784af 100644 --- a/pkg/transport/proxy_test.go +++ b/pkg/proxy/server_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package transport +package proxy import ( "bytes" @@ -28,28 +28,33 @@ import ( "testing" "time" - "google.golang.org/grpc/grpclog" + "github.com/coreos/etcd/pkg/transport" + + "go.uber.org/zap" ) -var testTLSInfo = TLSInfo{ +// enable DebugLevel +var testLogger = zap.NewExample() + +var testTLSInfo = transport.TLSInfo{ KeyFile: "./fixtures/server.key.insecure", CertFile: "./fixtures/server.crt", TrustedCAFile: "./fixtures/ca.crt", ClientCertAuth: true, } -func TestProxy_Unix_Insecure(t *testing.T) { testProxy(t, "unix", false, false) } -func TestProxy_TCP_Insecure(t *testing.T) { testProxy(t, "tcp", false, false) } -func TestProxy_Unix_Secure(t *testing.T) { testProxy(t, "unix", true, false) } -func TestProxy_TCP_Secure(t *testing.T) { testProxy(t, "tcp", true, false) } -func TestProxy_Unix_Insecure_DelayTx(t *testing.T) { testProxy(t, "unix", false, true) } -func TestProxy_TCP_Insecure_DelayTx(t *testing.T) { testProxy(t, "tcp", false, true) } -func TestProxy_Unix_Secure_DelayTx(t *testing.T) { testProxy(t, "unix", true, true) } -func TestProxy_TCP_Secure_DelayTx(t *testing.T) { testProxy(t, "tcp", true, true) } -func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) { +func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) } +func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) } +func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) } +func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) } +func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) } +func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) } +func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) } +func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) } +func testServer(t *testing.T, scheme string, secure bool, delayTx bool) { srcAddr, dstAddr := newUnixAddr(), newUnixAddr() if scheme == "tcp" { - ln1, ln2 := listen(t, "tcp", "localhost:0", TLSInfo{}), listen(t, "tcp", "localhost:0", TLSInfo{}) + ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{}) srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String() ln1.Close() ln2.Close() @@ -61,20 +66,20 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) { } tlsInfo := testTLSInfo if !secure { - tlsInfo = TLSInfo{} + tlsInfo = transport.TLSInfo{} } ln := listen(t, scheme, dstAddr, tlsInfo) defer ln.Close() - cfg := ProxyConfig{ + cfg := ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo } - p := NewProxy(cfg) + p := NewServer(cfg) <-p.Ready() defer p.Close() @@ -159,9 +164,9 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) { } } -func TestProxy_Unix_Insecure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, false) } -func TestProxy_Unix_Secure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, true) } -func testProxyDelayAccept(t *testing.T, secure bool) { +func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) } +func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) } +func testServerDelayAccept(t *testing.T, secure bool) { srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { os.RemoveAll(srcAddr) @@ -169,21 +174,21 @@ func testProxyDelayAccept(t *testing.T, secure bool) { }() tlsInfo := testTLSInfo if !secure { - tlsInfo = TLSInfo{} + tlsInfo = transport.TLSInfo{} } scheme := "unix" ln := listen(t, scheme, dstAddr, tlsInfo) defer ln.Close() - cfg := ProxyConfig{ + cfg := ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo } - p := NewProxy(cfg) + p := NewServer(cfg) <-p.Ready() defer p.Close() @@ -218,20 +223,20 @@ func testProxyDelayAccept(t *testing.T, secure bool) { } } -func TestProxy_PauseTx(t *testing.T) { +func TestServer_PauseTx(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { os.RemoveAll(srcAddr) os.RemoveAll(dstAddr) }() - ln := listen(t, scheme, dstAddr, TLSInfo{}) + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() - p := NewProxy(ProxyConfig{ + p := NewServer(ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -239,7 +244,7 @@ func TestProxy_PauseTx(t *testing.T) { p.PauseTx() data := []byte("Hello World!") - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) recvc := make(chan []byte) go func() { @@ -264,20 +269,20 @@ func TestProxy_PauseTx(t *testing.T) { } } -func TestProxy_BlackholeTx(t *testing.T) { +func TestServer_BlackholeTx(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { os.RemoveAll(srcAddr) os.RemoveAll(dstAddr) }() - ln := listen(t, scheme, dstAddr, TLSInfo{}) + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() - p := NewProxy(ProxyConfig{ + p := NewServer(ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -285,7 +290,7 @@ func TestProxy_BlackholeTx(t *testing.T) { p.BlackholeTx() data := []byte("Hello World!") - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) recvc := make(chan []byte) go func() { @@ -302,7 +307,7 @@ func TestProxy_BlackholeTx(t *testing.T) { // expect different data, old data dropped data[0]++ - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) select { case d := <-recvc: @@ -314,20 +319,20 @@ func TestProxy_BlackholeTx(t *testing.T) { } } -func TestProxy_CorruptTx(t *testing.T) { +func TestServer_CorruptTx(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { os.RemoveAll(srcAddr) os.RemoveAll(dstAddr) }() - ln := listen(t, scheme, dstAddr, TLSInfo{}) + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() - p := NewProxy(ProxyConfig{ + p := NewServer(ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -337,48 +342,48 @@ func TestProxy_CorruptTx(t *testing.T) { return d }) data := []byte("Hello World!") - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) if d := receive(t, ln); bytes.Equal(d, data) { t.Fatalf("expected corrupted data, got %q", string(d)) } p.UncorruptTx() - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) if d := receive(t, ln); !bytes.Equal(d, data) { t.Fatalf("expected uncorrupted data, got %q", string(d)) } } -func TestProxy_Shutdown(t *testing.T) { +func TestServer_Shutdown(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { os.RemoveAll(srcAddr) os.RemoveAll(dstAddr) }() - ln := listen(t, scheme, dstAddr, TLSInfo{}) + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() - p := NewProxy(ProxyConfig{ + p := NewServer(ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() - px, _ := p.(*proxy) + px, _ := p.(*proxyServer) px.listener.Close() time.Sleep(200 * time.Millisecond) data := []byte("Hello World!") - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) if d := receive(t, ln); !bytes.Equal(d, data) { t.Fatalf("expected %q, got %q", string(data), string(d)) } } -func TestProxy_ShutdownListener(t *testing.T) { +func TestServer_ShutdownListener(t *testing.T) { scheme := "unix" srcAddr, dstAddr := newUnixAddr(), newUnixAddr() defer func() { @@ -386,13 +391,13 @@ func TestProxy_ShutdownListener(t *testing.T) { os.RemoveAll(dstAddr) }() - ln := listen(t, scheme, dstAddr, TLSInfo{}) + ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() - p := NewProxy(ProxyConfig{ + p := NewServer(ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), }) <-p.Ready() defer p.Close() @@ -401,23 +406,23 @@ func TestProxy_ShutdownListener(t *testing.T) { ln.Close() time.Sleep(200 * time.Millisecond) - ln = listen(t, scheme, dstAddr, TLSInfo{}) + ln = listen(t, scheme, dstAddr, transport.TLSInfo{}) defer ln.Close() data := []byte("Hello World!") - send(t, data, scheme, srcAddr, TLSInfo{}) + send(t, data, scheme, srcAddr, transport.TLSInfo{}) if d := receive(t, ln); !bytes.Equal(d, data) { t.Fatalf("expected %q, got %q", string(data), string(d)) } } -func TestProxyHTTP_Insecure_DelayTx(t *testing.T) { testProxyHTTP(t, false, true) } -func TestProxyHTTP_Secure_DelayTx(t *testing.T) { testProxyHTTP(t, true, true) } -func TestProxyHTTP_Insecure_DelayRx(t *testing.T) { testProxyHTTP(t, false, false) } -func TestProxyHTTP_Secure_DelayRx(t *testing.T) { testProxyHTTP(t, true, false) } -func testProxyHTTP(t *testing.T, secure, delayTx bool) { +func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) } +func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) } +func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) } +func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) } +func testServerHTTP(t *testing.T, secure, delayTx bool) { scheme := "tcp" - ln1, ln2 := listen(t, scheme, "localhost:0", TLSInfo{}), listen(t, scheme, "localhost:0", TLSInfo{}) + ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{}) srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String() ln1.Close() ln2.Close() @@ -461,15 +466,15 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) { }() time.Sleep(200 * time.Millisecond) - cfg := ProxyConfig{ + cfg := ServerConfig{ + Logger: testLogger, From: url.URL{Scheme: scheme, Host: srcAddr}, To: url.URL{Scheme: scheme, Host: dstAddr}, - Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5), } if secure { cfg.TLSInfo = testTLSInfo } - p := NewProxy(cfg) + p := NewServer(cfg) <-p.Ready() defer p.Close() @@ -478,7 +483,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) { now := time.Now() var resp *http.Response if secure { - tp, terr := NewTransport(testTLSInfo, 3*time.Second) + tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second) if terr != nil { t.Fatal(terr) } @@ -514,7 +519,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) { now = time.Now() if secure { - tp, terr := NewTransport(testTLSInfo, 3*time.Second) + tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second) if terr != nil { t.Fatal(terr) } @@ -550,10 +555,10 @@ func newUnixAddr() string { return addr } -func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener) { +func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) { var err error if !tlsInfo.Empty() { - ln, err = NewListener(addr, scheme, &tlsInfo) + ln, err = transport.NewListener(addr, scheme, &tlsInfo) } else { ln, err = net.Listen(scheme, addr) } @@ -563,11 +568,11 @@ func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener return ln } -func send(t *testing.T, data []byte, scheme, addr string, tlsInfo TLSInfo) { +func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) { var out net.Conn var err error if !tlsInfo.Empty() { - tp, terr := NewTransport(tlsInfo, 3*time.Second) + tp, terr := transport.NewTransport(tlsInfo, 3*time.Second) if terr != nil { t.Fatal(terr) }