pkg/proxy: move from "pkg/transport"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-10 23:43:23 -07:00
parent 074e417770
commit 3736a126df
10 changed files with 501 additions and 181 deletions

16
pkg/proxy/doc.go Normal file
View File

@ -0,0 +1,16 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package proxy implements proxy servers for network fault testing.
package proxy

View File

@ -0,0 +1,19 @@
{
"key": {
"algo": "rsa",
"size": 2048
},
"names": [
{
"O": "etcd",
"OU": "etcd Security",
"L": "San Francisco",
"ST": "California",
"C": "USA"
}
],
"CN": "ca",
"ca": {
"expiry": "87600h"
}
}

22
pkg/proxy/fixtures/ca.crt Normal file
View File

@ -0,0 +1,22 @@
-----BEGIN CERTIFICATE-----
MIIDsTCCApmgAwIBAgIUZzOo4zcHY/nEXY1PD8A7povXlWUwDQYJKoZIhvcNAQEL
BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH
Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl
Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx
MDBaMG8xDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE
BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT
ZWN1cml0eTELMAkGA1UEAxMCY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDD4Ys48LDWGyojj3Rcr6fnESY+UycaaGoTXADWLPmm+sQR3KcsJxF4054S
d2G+NBfJHZvTHhVqOeqZxNtoqgje4paY2A5TbWBdV+xoGfbakwwngiX1yeF1I54k
KH19zb8rBKAm7xixO60hE2CIYzMuw9lDkwoHpI6/PJdy7jwtytbo2Oac512JiO9Y
dHp9dr3mrCzoKEBRtL1asRKfzp6gBC5rIw5T4jrq37feerV4pDEJX7fvexxVocVm
tT4bmMq3Ap6OFFAzmE/ITI8pXvFaOd9lyebNXQmrreKJLUfEIZa6JulLCYxfkJ8z
+CcNLyn6ZXNMaIZ8G9Hm6VRdRi8/AgMBAAGjRTBDMA4GA1UdDwEB/wQEAwIBBjAS
BgNVHRMBAf8ECDAGAQH/AgECMB0GA1UdDgQWBBRDLNYEX8XI7nM53k1rUR+mpTjQ
NTANBgkqhkiG9w0BAQsFAAOCAQEACDe3Fa1KE/rvVtyCLW/IBfKV01NShFTsb6x8
GrPEQ6NJLZQ2MzdyJgAF2a/nZ9KVgrhGXoyoZBCKP9Dd/JDzSSZcBztfNK8dRv2A
XHBBF6tZ19I+XY9c7/CfhJ2CEYJpeN9r3GKSqV+njkmg8n/On2BTlFsij88plK8H
ORyemc1nQI+ARPSu2r3rJbYa4yI2U6w4L4BTCVImg3bX50GImmXGlwvnJMFik1FX
+0hdfetRxxMZ1pm2Uy6099KkULnSKabZGwRiBUHQJYh0EeuAOQ4a6MG5DRkURWNs
dInjPOLY9/7S5DQKwz/NtqXA8EEymZosHxpiRp+zzKB4XaV9Ig==
-----END CERTIFICATE-----

View File

@ -0,0 +1,13 @@
{
"signing": {
"default": {
"usages": [
"signing",
"key encipherment",
"server auth",
"client auth"
],
"expiry": "87600h"
}
}
}

26
pkg/proxy/fixtures/gencerts.sh Executable file
View File

@ -0,0 +1,26 @@
#!/bin/bash
if ! [[ "$0" =~ "./gencerts.sh" ]]; then
echo "must be run from 'fixtures'"
exit 255
fi
if ! which cfssl; then
echo "cfssl is not installed"
exit 255
fi
cfssl gencert --initca=true ./ca-csr.json | cfssljson --bare ./ca
mv ca.pem ca.crt
openssl x509 -in ca.crt -noout -text
# generate DNS: localhost, IP: 127.0.0.1, CN: example.com certificates
cfssl gencert \
--ca ./ca.crt \
--ca-key ./ca-key.pem \
--config ./gencert.json \
./server-ca-csr.json | cfssljson --bare ./server
mv server.pem server.crt
mv server-key.pem server.key.insecure
rm -f *.csr *.pem *.stderr *.txt

View File

@ -0,0 +1,20 @@
{
"key": {
"algo": "rsa",
"size": 2048
},
"names": [
{
"O": "etcd",
"OU": "etcd Security",
"L": "San Francisco",
"ST": "California",
"C": "USA"
}
],
"CN": "example.com",
"hosts": [
"127.0.0.1",
"localhost"
]
}

View File

@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIIEEjCCAvqgAwIBAgIUIYc+vmysep1pDc2ua/VQEeMFQVAwDQYJKoZIhvcNAQEL
BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH
Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl
Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx
MDBaMHgxDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE
BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT
ZWN1cml0eTEUMBIGA1UEAxMLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQDEq7aT2BQZfmJ2xpUm8xWJlN0c3cOLVZRH9mIrEutIHmip
BYq3ZIq3q52w+T3sMcaJNMGjCteE8Lu+G9YSmtfZMAWnkaM02KOjVMkkQcK7Z4vM
lOUjlO+dsvhfmw3CPghqSs6M1K2CTqhuEiXdOBofuEMmwKNRgkV/jT92PUs0h8kq
loc/I3/H+hx/ZJ1i0S0xkZKpaImc0oZ9ZDo07biMrsUIzjwbN69mEs+CtVkah4sy
k6UyRoU2k21lyRTK0LxNjWc9ylzDNUuf6DwduU7lPZsqTaJrFNAAPpOlI4k2EcjL
3zD8amKkJGDm+PQz97PbTA381ec4ZAtB8volxCebAgMBAAGjgZwwgZkwDgYDVR0P
AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMB
Af8EAjAAMB0GA1UdDgQWBBTTZQnMn5tuUgVE+8c9W0hmbghGoDAfBgNVHSMEGDAW
gBRDLNYEX8XI7nM53k1rUR+mpTjQNTAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8A
AAEwDQYJKoZIhvcNAQELBQADggEBAKUQVj0YDuxg4tinlOZhp4ge7tCA+gL7vV+Q
iDrkWfOlGjDgwYqWMYDXMHWKIW9ea8LzyI/bVEcaHlnBmNOYuS7g47EWNiU7WUA5
iTkm3CKA5zHFFPcXHW0GQeCQrX9y3SepKS3cP8TAyZFfC/FvV24Kn1oQhJbEe0ZV
In/vPHssW7jlVe0FGVUn7FutRQgiA1pTAtS6AP4LeZ9O41DTWkPqV4nBgcxlvkgD
KjEoXXSb5C0LoR5zwAo9zB3RtmqnmvkHAOv3G92YctdS2VbCmd8CNLj9H7gMmQiH
ThsStVOhb2uo6Ni4PgzUIYKGTd4ZjUXCYxFKck//ajDyCHlL8v4=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAxKu2k9gUGX5idsaVJvMViZTdHN3Di1WUR/ZiKxLrSB5oqQWK
t2SKt6udsPk97DHGiTTBowrXhPC7vhvWEprX2TAFp5GjNNijo1TJJEHCu2eLzJTl
I5TvnbL4X5sNwj4IakrOjNStgk6obhIl3TgaH7hDJsCjUYJFf40/dj1LNIfJKpaH
PyN/x/ocf2SdYtEtMZGSqWiJnNKGfWQ6NO24jK7FCM48GzevZhLPgrVZGoeLMpOl
MkaFNpNtZckUytC8TY1nPcpcwzVLn+g8HblO5T2bKk2iaxTQAD6TpSOJNhHIy98w
/GpipCRg5vj0M/ez20wN/NXnOGQLQfL6JcQnmwIDAQABAoIBAGTx1eaQk9B6BEP+
rXOudTGGzO8SDFop9M/y8HQ3Y7hCk2mdxJNY8bJQTcIWS+g9rC+kencbC3/aqCJt
2zT1cTCy61QU9nYbc/JThGIttqvF/AVnryzSNyL0R3Oa/Dbk7CDSgK3cQ6qMgPru
Ka0gLJh3VVBAtBMUEGPltdsUntM4sHTh5FAabP0ioBJ1QLG6Aak7LOQikjBEFJoc
Tea4uRsE7IreP5Mn7UW92nkt1ey5UGzBtNNtpHbVaHmfQojwlwkLtnV35sumbvK6
6KTMNREZv6xSIMwkYxm1zRE3Cus/1jGIc8MZF0BxgcCR+G37l+BKwL8CSymHPxhH
dvGxoPECgYEA3STp52CbI/KyVfvjxK2OIex/NV1jKh85wQsLtkaRv3/a/EEg7MV7
54dEvo5KKOZXfeOd9r9G9h1RffjSD9MhxfPhyGwuOcqa8IE1zNwlY/v7KL7HtDIf
2mrXWF5Klafh8aXYcaRH0ZSLnl/nXUXYht4/0NRGiXnttUgqs6hvY70CgYEA46tO
J5QkgF3YVY0gx10wRCAnnKLkAaHdtxtteXOJh79xsGXQ4LLngc+mz1hLt+TNJza+
BZhoWwY/ZgyiTH0pebGr/U0QUMoUHlGgjgj3Aa/XFpOhtyLU+IU/PYl0BUz9dqsN
TDtv6p/HQhfd98vUNsbACQda+YAo+oRdO5kLQjcCgYB3OAZNcXxRte5EgoY5KqN8
UGYH2++w7qKRGqZWvtamGYRyB557Zr+0gu0hmc4LHJrASGyJcHcOCaI8Ol7snxMP
B7qJ9SA6kapTzCS361rQ+zBct/UrhPY9JuovPq4Q3i/luVXldf4t01otqGAvnY7s
rnZS242nYa8v0tcKgdyDNQKBgB3Z60BzQyn1pBTrkT2ysU5tbOQz03OHVrvYg80l
4gWDi5OWdgHQU1yI7pVHPX5aKLAYlGfFaQFuW0e1Jl6jFpoXOrbWsOn25RZom4Wk
FUcKWEhkiRKrJYOEbRtTd3vucVlq6i5xqKX51zWKTZddCXE5NBq69Sm7rSPT0Sms
UnaXAoGAXYAE5slvjcylJpMV4lxTBmNtA9+pw1T7I379mIyqZ0OS25nmpskHU7FR
SQDSRHw7hHuyjEHyhMoHEGLfUMIltQoi+pcrieVQelJdSuX7VInzHPAR5RppUVFl
jOZZKlIiqs+UfCoOgsIblXuw7a/ATnAnXakutSFgHU1lN1gN02U=
-----END RSA PRIVATE KEY-----

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
package proxy
import (
"fmt"
@ -21,18 +21,19 @@ import (
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/coreos/etcd/pkg/transport"
humanize "github.com/dustin/go-humanize"
"google.golang.org/grpc/grpclog"
"go.uber.org/zap"
)
// Proxy defines proxy layer that simulates common network faults,
// Server defines proxy server layer that simulates common network faults,
// such as latency spikes, packet drop/corruption, etc..
type Proxy interface {
type Server interface {
// From returns proxy source address in "scheme://host:port" format.
From() string
// To returns proxy destination address in "scheme://host:port" format.
@ -101,13 +102,14 @@ type Proxy interface {
ResetListener() error
}
type proxy struct {
type proxyServer struct {
lg *zap.Logger
from, to url.URL
tlsInfo TLSInfo
tlsInfo transport.TLSInfo
dialTimeout time.Duration
bufferSize int
retryInterval time.Duration
logger grpclog.LoggerV2
readyc chan struct{}
donec chan struct{}
@ -141,35 +143,44 @@ type proxy struct {
blackholeRxc chan struct{}
}
// ProxyConfig defines proxy configuration.
type ProxyConfig struct {
// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo TLSInfo
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
Logger grpclog.LoggerV2
}
var (
defaultDialTimeout = 3 * time.Second
defaultBufferSize = 48 * 1024
defaultRetryInterval = 10 * time.Millisecond
defaultLogger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 0)
defaultLogger *zap.Logger
)
// NewProxy returns a proxy implementation with no iptables/tc dependencies.
func init() {
var err error
defaultLogger, err = zap.NewProduction()
if err != nil {
panic(err)
}
}
// NewServer returns a proxy implementation with no iptables/tc dependencies.
// The proxy layer overhead is <1ms.
func NewProxy(cfg ProxyConfig) Proxy {
p := &proxy{
func NewServer(cfg ServerConfig) Server {
p := &proxyServer{
lg: cfg.Logger,
from: cfg.From,
to: cfg.To,
tlsInfo: cfg.TLSInfo,
dialTimeout: cfg.DialTimeout,
bufferSize: cfg.BufferSize,
retryInterval: cfg.RetryInterval,
logger: cfg.Logger,
readyc: make(chan struct{}),
donec: make(chan struct{}),
@ -190,8 +201,8 @@ func NewProxy(cfg ProxyConfig) Proxy {
if p.retryInterval == 0 {
p.retryInterval = defaultRetryInterval
}
if p.logger == nil {
p.logger = defaultLogger
if p.lg == nil {
p.lg = defaultLogger
}
close(p.pauseAcceptc)
close(p.pauseTxc)
@ -207,7 +218,7 @@ func NewProxy(cfg ProxyConfig) Proxy {
var ln net.Listener
var err error
if !p.tlsInfo.Empty() {
ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
} else {
ln, err = net.Listen(p.from.Scheme, p.from.Host)
}
@ -220,15 +231,16 @@ func NewProxy(cfg ProxyConfig) Proxy {
p.closeWg.Add(1)
go p.listenAndServe()
p.logger.Infof("started proxying [%s -> %s]", p.From(), p.To())
p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
return p
}
func (p *proxy) From() string {
func (p *proxyServer) From() string {
return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
}
func (p *proxy) To() string {
func (p *proxyServer) To() string {
return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
}
@ -237,10 +249,10 @@ func (p *proxy) To() string {
// - https://github.com/coreos/etcd/issues/5614
// - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
func (p *proxy) listenAndServe() {
func (p *proxyServer) listenAndServe() {
defer p.closeWg.Done()
p.logger.Infof("listen %q", p.From())
p.lg.Info("proxy is listening on", zap.String("from", p.From()))
close(p.readyc)
for {
@ -280,9 +292,7 @@ func (p *proxy) listenAndServe() {
case <-p.donec:
return
}
if p.logger.V(5) {
p.logger.Errorf("listener accept error %q", err.Error())
}
p.lg.Debug("listener accept error", zap.Error(err))
if strings.HasSuffix(err.Error(), "use of closed network connection") {
select {
@ -290,9 +300,7 @@ func (p *proxy) listenAndServe() {
case <-p.donec:
return
}
if p.logger.V(5) {
p.logger.Errorf("listener is closed; retry listen %q", p.From())
}
p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
if err = p.ResetListener(); err != nil {
select {
@ -305,7 +313,7 @@ func (p *proxy) listenAndServe() {
case <-p.donec:
return
}
p.logger.Errorf("failed to reset listener %q", err.Error())
p.lg.Warn("failed to reset listener", zap.Error(err))
}
}
@ -315,7 +323,7 @@ func (p *proxy) listenAndServe() {
var out net.Conn
if !p.tlsInfo.Empty() {
var tp *http.Transport
tp, err = NewTransport(p.tlsInfo, p.dialTimeout)
tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout)
if err != nil {
select {
case p.errc <- err:
@ -344,9 +352,7 @@ func (p *proxy) listenAndServe() {
case <-p.donec:
return
}
if p.logger.V(5) {
p.logger.Errorf("dial error %q", err.Error())
}
p.lg.Debug("failed to dial", zap.Error(err))
continue
}
@ -365,9 +371,9 @@ func (p *proxy) listenAndServe() {
}
}
func (p *proxy) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
func (p *proxy) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
buf := make([]byte, p.bufferSize)
for {
nr, err := src.Read(buf)
@ -392,9 +398,7 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
case <-p.donec:
return
}
if p.logger.V(5) {
p.logger.Errorf("read error %q", err.Error())
}
p.lg.Debug("failed to read", zap.Error(err))
return
}
if nr == 0 {
@ -429,12 +433,20 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
default:
}
if blackholed {
if p.logger.V(5) {
if proxySend {
p.logger.Infof("dropped %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
} else {
p.logger.Infof("dropped %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
}
if proxySend {
p.lg.Debug(
"dropped",
zap.String("data-size", humanize.Bytes(uint64(nr))),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
} else {
p.lg.Debug(
"dropped",
zap.String("data-size", humanize.Bytes(uint64(nr))),
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
continue
}
@ -487,12 +499,10 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
case <-p.donec:
return
}
if p.logger.V(5) {
if proxySend {
p.logger.Errorf("write error while sending (%q)", err.Error())
} else {
p.logger.Errorf("write error while receiving (%q)", err.Error())
}
if proxySend {
p.lg.Debug("failed to write while sending", zap.Error(err))
} else {
p.lg.Debug("failed to write while receiving", zap.Error(err))
}
return
}
@ -509,41 +519,65 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
return
}
if proxySend {
p.logger.Errorf("write error while sending (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw)
p.lg.Debug(
"failed to write while sending; read/write bytes are different",
zap.Int("read-bytes", nr),
zap.Int("write-bytes", nw),
zap.Error(io.ErrShortWrite),
)
} else {
p.logger.Errorf("write error while receiving (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw)
p.lg.Debug(
"failed to write while receiving; read/write bytes are different",
zap.Int("read-bytes", nr),
zap.Int("write-bytes", nw),
zap.Error(io.ErrShortWrite),
)
}
return
}
if p.logger.V(5) {
if proxySend {
p.logger.Infof("transmitted %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
} else {
p.logger.Infof("received %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
}
if proxySend {
p.lg.Debug(
"transmitted",
zap.String("data-size", humanize.Bytes(uint64(nr))),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
} else {
p.lg.Debug(
"received",
zap.String("data-size", humanize.Bytes(uint64(nr))),
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
}
}
func (p *proxy) Ready() <-chan struct{} { return p.readyc }
func (p *proxy) Done() <-chan struct{} { return p.donec }
func (p *proxy) Error() <-chan error { return p.errc }
func (p *proxy) Close() (err error) {
func (p *proxyServer) Ready() <-chan struct{} { return p.readyc }
func (p *proxyServer) Done() <-chan struct{} { return p.donec }
func (p *proxyServer) Error() <-chan error { return p.errc }
func (p *proxyServer) Close() (err error) {
p.closeOnce.Do(func() {
close(p.donec)
p.listenerMu.Lock()
if p.listener != nil {
err = p.listener.Close()
p.logger.Infof("closed proxy listener on %q", p.From())
p.lg.Info(
"closed proxy listener",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
p.lg.Sync()
p.listenerMu.Unlock()
})
p.closeWg.Wait()
return err
}
func (p *proxy) DelayAccept(latency, rv time.Duration) {
func (p *proxyServer) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
}
@ -551,25 +585,39 @@ func (p *proxy) DelayAccept(latency, rv time.Duration) {
p.latencyAcceptMu.Lock()
p.latencyAccept = d
p.latencyAcceptMu.Unlock()
p.logger.Infof("set accept latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To())
p.lg.Info(
"set accept latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UndelayAccept() {
func (p *proxyServer) UndelayAccept() {
p.latencyAcceptMu.Lock()
d := p.latencyAccept
p.latencyAccept = 0
p.latencyAcceptMu.Unlock()
p.logger.Infof("removed accept latency %v [%s -> %s]", d, p.From(), p.To())
p.lg.Info(
"removed accept latency",
zap.Duration("latency", d),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) LatencyAccept() time.Duration {
func (p *proxyServer) LatencyAccept() time.Duration {
p.latencyAcceptMu.RLock()
d := p.latencyAccept
p.latencyAcceptMu.RUnlock()
return d
}
func (p *proxy) DelayTx(latency, rv time.Duration) {
func (p *proxyServer) DelayTx(latency, rv time.Duration) {
if latency <= 0 {
return
}
@ -577,25 +625,39 @@ func (p *proxy) DelayTx(latency, rv time.Duration) {
p.latencyTxMu.Lock()
p.latencyTx = d
p.latencyTxMu.Unlock()
p.logger.Infof("set transmit latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To())
p.lg.Info(
"set transmit latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UndelayTx() {
func (p *proxyServer) UndelayTx() {
p.latencyTxMu.Lock()
d := p.latencyTx
p.latencyTx = 0
p.latencyTxMu.Unlock()
p.logger.Infof("removed transmit latency %v [%s -> %s]", d, p.From(), p.To())
p.lg.Info(
"removed transmit latency",
zap.Duration("latency", d),
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) LatencyTx() time.Duration {
func (p *proxyServer) LatencyTx() time.Duration {
p.latencyTxMu.RLock()
d := p.latencyTx
p.latencyTxMu.RUnlock()
return d
}
func (p *proxy) DelayRx(latency, rv time.Duration) {
func (p *proxyServer) DelayRx(latency, rv time.Duration) {
if latency <= 0 {
return
}
@ -603,18 +665,32 @@ func (p *proxy) DelayRx(latency, rv time.Duration) {
p.latencyRxMu.Lock()
p.latencyRx = d
p.latencyRxMu.Unlock()
p.logger.Infof("set receive latency %v(%v±%v) [%s <- %s]", d, latency, rv, p.From(), p.To())
p.lg.Info(
"set receive latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) UndelayRx() {
func (p *proxyServer) UndelayRx() {
p.latencyRxMu.Lock()
d := p.latencyRx
p.latencyRx = 0
p.latencyRxMu.Unlock()
p.logger.Infof("removed receive latency %v [%s <- %s]", d, p.From(), p.To())
p.lg.Info(
"removed receive latency",
zap.Duration("latency", d),
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) LatencyRx() time.Duration {
func (p *proxyServer) LatencyRx() time.Duration {
p.latencyRxMu.RLock()
d := p.latencyRx
p.latencyRxMu.RUnlock()
@ -640,14 +716,19 @@ func computeLatency(lat, rv time.Duration) time.Duration {
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
}
func (p *proxy) PauseAccept() {
func (p *proxyServer) PauseAccept() {
p.acceptMu.Lock()
p.pauseAcceptc = make(chan struct{})
p.acceptMu.Unlock()
p.logger.Infof("paused accepting new connections [%s -> %s]", p.From(), p.To())
p.lg.Info(
"paused accepting new connections",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UnpauseAccept() {
func (p *proxyServer) UnpauseAccept() {
p.acceptMu.Lock()
select {
case <-p.pauseAcceptc: // already unpaused
@ -658,17 +739,27 @@ func (p *proxy) UnpauseAccept() {
close(p.pauseAcceptc)
}
p.acceptMu.Unlock()
p.logger.Infof("unpaused accepting new connections [%s -> %s]", p.From(), p.To())
p.lg.Info(
"unpaused accepting new connections",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) PauseTx() {
func (p *proxyServer) PauseTx() {
p.txMu.Lock()
p.pauseTxc = make(chan struct{})
p.txMu.Unlock()
p.logger.Infof("paused transmit listen [%s -> %s]", p.From(), p.To())
p.lg.Info(
"paused transmit listen",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UnpauseTx() {
func (p *proxyServer) UnpauseTx() {
p.txMu.Lock()
select {
case <-p.pauseTxc: // already unpaused
@ -679,17 +770,27 @@ func (p *proxy) UnpauseTx() {
close(p.pauseTxc)
}
p.txMu.Unlock()
p.logger.Infof("unpaused transmit listen [%s -> %s]", p.From(), p.To())
p.lg.Info(
"unpaused transmit listen",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) PauseRx() {
func (p *proxyServer) PauseRx() {
p.rxMu.Lock()
p.pauseRxc = make(chan struct{})
p.rxMu.Unlock()
p.logger.Infof("paused receive listen [%s <- %s]", p.From(), p.To())
p.lg.Info(
"paused receive listen",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) UnpauseRx() {
func (p *proxyServer) UnpauseRx() {
p.rxMu.Lock()
select {
case <-p.pauseRxc: // already unpaused
@ -700,10 +801,15 @@ func (p *proxy) UnpauseRx() {
close(p.pauseRxc)
}
p.rxMu.Unlock()
p.logger.Infof("unpaused receive listen [%s <- %s]", p.From(), p.To())
p.lg.Info(
"unpaused receive listen",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) BlackholeTx() {
func (p *proxyServer) BlackholeTx() {
p.txMu.Lock()
select {
case <-p.blackholeTxc: // already blackholed
@ -714,17 +820,27 @@ func (p *proxy) BlackholeTx() {
close(p.blackholeTxc)
}
p.txMu.Unlock()
p.logger.Infof("blackholed transmit [%s -> %s]", p.From(), p.To())
p.lg.Info(
"blackholed transmit",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UnblackholeTx() {
func (p *proxyServer) UnblackholeTx() {
p.txMu.Lock()
p.blackholeTxc = make(chan struct{})
p.txMu.Unlock()
p.logger.Infof("unblackholed transmit [%s -> %s]", p.From(), p.To())
p.lg.Info(
"unblackholed transmit",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) BlackholeRx() {
func (p *proxyServer) BlackholeRx() {
p.rxMu.Lock()
select {
case <-p.blackholeRxc: // already blackholed
@ -735,45 +851,74 @@ func (p *proxy) BlackholeRx() {
close(p.blackholeRxc)
}
p.rxMu.Unlock()
p.logger.Infof("blackholed receive [%s <- %s]", p.From(), p.To())
p.lg.Info(
"blackholed receive",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) UnblackholeRx() {
func (p *proxyServer) UnblackholeRx() {
p.rxMu.Lock()
p.blackholeRxc = make(chan struct{})
p.rxMu.Unlock()
p.logger.Infof("unblackholed receive [%s <- %s]", p.From(), p.To())
p.lg.Info(
"unblackholed receive",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) CorruptTx(f func([]byte) []byte) {
func (p *proxyServer) CorruptTx(f func([]byte) []byte) {
p.corruptTxMu.Lock()
p.corruptTx = f
p.corruptTxMu.Unlock()
p.logger.Infof("corrupting transmit [%s -> %s]", p.From(), p.To())
p.lg.Info(
"corrupting transmit",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) UncorruptTx() {
func (p *proxyServer) UncorruptTx() {
p.corruptTxMu.Lock()
p.corruptTx = nil
p.corruptTxMu.Unlock()
p.logger.Infof("stopped corrupting transmit [%s -> %s]", p.From(), p.To())
p.lg.Info(
"stopped corrupting transmit",
zap.String("from", p.From()),
zap.String("to", p.To()),
)
}
func (p *proxy) CorruptRx(f func([]byte) []byte) {
func (p *proxyServer) CorruptRx(f func([]byte) []byte) {
p.corruptRxMu.Lock()
p.corruptRx = f
p.corruptRxMu.Unlock()
p.logger.Infof("corrupting receive [%s <- %s]", p.From(), p.To())
p.lg.Info(
"corrupting receive",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) UncorruptRx() {
func (p *proxyServer) UncorruptRx() {
p.corruptRxMu.Lock()
p.corruptRx = nil
p.corruptRxMu.Unlock()
p.logger.Infof("stopped corrupting receive [%s <- %s]", p.From(), p.To())
p.lg.Info(
"stopped corrupting receive",
zap.String("from", p.To()),
zap.String("to", p.From()),
)
}
func (p *proxy) ResetListener() error {
func (p *proxyServer) ResetListener() error {
p.listenerMu.Lock()
defer p.listenerMu.Unlock()
@ -787,7 +932,7 @@ func (p *proxy) ResetListener() error {
var ln net.Listener
var err error
if !p.tlsInfo.Empty() {
ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
} else {
ln, err = net.Listen(p.from.Scheme, p.from.Host)
}
@ -796,6 +941,9 @@ func (p *proxy) ResetListener() error {
}
p.listener = ln
p.logger.Infof("reset listener %q", p.From())
p.lg.Info(
"reset listener on",
zap.String("from", p.From()),
)
return nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
package proxy
import (
"bytes"
@ -28,28 +28,33 @@ import (
"testing"
"time"
"google.golang.org/grpc/grpclog"
"github.com/coreos/etcd/pkg/transport"
"go.uber.org/zap"
)
var testTLSInfo = TLSInfo{
// enable DebugLevel
var testLogger = zap.NewExample()
var testTLSInfo = transport.TLSInfo{
KeyFile: "./fixtures/server.key.insecure",
CertFile: "./fixtures/server.crt",
TrustedCAFile: "./fixtures/ca.crt",
ClientCertAuth: true,
}
func TestProxy_Unix_Insecure(t *testing.T) { testProxy(t, "unix", false, false) }
func TestProxy_TCP_Insecure(t *testing.T) { testProxy(t, "tcp", false, false) }
func TestProxy_Unix_Secure(t *testing.T) { testProxy(t, "unix", true, false) }
func TestProxy_TCP_Secure(t *testing.T) { testProxy(t, "tcp", true, false) }
func TestProxy_Unix_Insecure_DelayTx(t *testing.T) { testProxy(t, "unix", false, true) }
func TestProxy_TCP_Insecure_DelayTx(t *testing.T) { testProxy(t, "tcp", false, true) }
func TestProxy_Unix_Secure_DelayTx(t *testing.T) { testProxy(t, "unix", true, true) }
func TestProxy_TCP_Secure_DelayTx(t *testing.T) { testProxy(t, "tcp", true, true) }
func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) }
func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) }
func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) }
func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) }
func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) }
func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) }
func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) }
func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
if scheme == "tcp" {
ln1, ln2 := listen(t, "tcp", "localhost:0", TLSInfo{}), listen(t, "tcp", "localhost:0", TLSInfo{})
ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
ln1.Close()
ln2.Close()
@ -61,20 +66,20 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
}
tlsInfo := testTLSInfo
if !secure {
tlsInfo = TLSInfo{}
tlsInfo = transport.TLSInfo{}
}
ln := listen(t, scheme, dstAddr, tlsInfo)
defer ln.Close()
cfg := ProxyConfig{
cfg := ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
}
if secure {
cfg.TLSInfo = testTLSInfo
}
p := NewProxy(cfg)
p := NewServer(cfg)
<-p.Ready()
defer p.Close()
@ -159,9 +164,9 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
}
}
func TestProxy_Unix_Insecure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, false) }
func TestProxy_Unix_Secure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, true) }
func testProxyDelayAccept(t *testing.T, secure bool) {
func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
func testServerDelayAccept(t *testing.T, secure bool) {
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
@ -169,21 +174,21 @@ func testProxyDelayAccept(t *testing.T, secure bool) {
}()
tlsInfo := testTLSInfo
if !secure {
tlsInfo = TLSInfo{}
tlsInfo = transport.TLSInfo{}
}
scheme := "unix"
ln := listen(t, scheme, dstAddr, tlsInfo)
defer ln.Close()
cfg := ProxyConfig{
cfg := ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
}
if secure {
cfg.TLSInfo = testTLSInfo
}
p := NewProxy(cfg)
p := NewServer(cfg)
<-p.Ready()
defer p.Close()
@ -218,20 +223,20 @@ func testProxyDelayAccept(t *testing.T, secure bool) {
}
}
func TestProxy_PauseTx(t *testing.T) {
func TestServer_PauseTx(t *testing.T) {
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, TLSInfo{})
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
p := NewProxy(ProxyConfig{
p := NewServer(ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
})
<-p.Ready()
defer p.Close()
@ -239,7 +244,7 @@ func TestProxy_PauseTx(t *testing.T) {
p.PauseTx()
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
recvc := make(chan []byte)
go func() {
@ -264,20 +269,20 @@ func TestProxy_PauseTx(t *testing.T) {
}
}
func TestProxy_BlackholeTx(t *testing.T) {
func TestServer_BlackholeTx(t *testing.T) {
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, TLSInfo{})
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
p := NewProxy(ProxyConfig{
p := NewServer(ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
})
<-p.Ready()
defer p.Close()
@ -285,7 +290,7 @@ func TestProxy_BlackholeTx(t *testing.T) {
p.BlackholeTx()
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
recvc := make(chan []byte)
go func() {
@ -302,7 +307,7 @@ func TestProxy_BlackholeTx(t *testing.T) {
// expect different data, old data dropped
data[0]++
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
select {
case d := <-recvc:
@ -314,20 +319,20 @@ func TestProxy_BlackholeTx(t *testing.T) {
}
}
func TestProxy_CorruptTx(t *testing.T) {
func TestServer_CorruptTx(t *testing.T) {
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, TLSInfo{})
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
p := NewProxy(ProxyConfig{
p := NewServer(ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
})
<-p.Ready()
defer p.Close()
@ -337,48 +342,48 @@ func TestProxy_CorruptTx(t *testing.T) {
return d
})
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); bytes.Equal(d, data) {
t.Fatalf("expected corrupted data, got %q", string(d))
}
p.UncorruptTx()
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); !bytes.Equal(d, data) {
t.Fatalf("expected uncorrupted data, got %q", string(d))
}
}
func TestProxy_Shutdown(t *testing.T) {
func TestServer_Shutdown(t *testing.T) {
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, TLSInfo{})
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
p := NewProxy(ProxyConfig{
p := NewServer(ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
})
<-p.Ready()
defer p.Close()
px, _ := p.(*proxy)
px, _ := p.(*proxyServer)
px.listener.Close()
time.Sleep(200 * time.Millisecond)
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); !bytes.Equal(d, data) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
}
func TestProxy_ShutdownListener(t *testing.T) {
func TestServer_ShutdownListener(t *testing.T) {
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
@ -386,13 +391,13 @@ func TestProxy_ShutdownListener(t *testing.T) {
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, TLSInfo{})
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
p := NewProxy(ProxyConfig{
p := NewServer(ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
})
<-p.Ready()
defer p.Close()
@ -401,23 +406,23 @@ func TestProxy_ShutdownListener(t *testing.T) {
ln.Close()
time.Sleep(200 * time.Millisecond)
ln = listen(t, scheme, dstAddr, TLSInfo{})
ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()
data := []byte("Hello World!")
send(t, data, scheme, srcAddr, TLSInfo{})
send(t, data, scheme, srcAddr, transport.TLSInfo{})
if d := receive(t, ln); !bytes.Equal(d, data) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
}
func TestProxyHTTP_Insecure_DelayTx(t *testing.T) { testProxyHTTP(t, false, true) }
func TestProxyHTTP_Secure_DelayTx(t *testing.T) { testProxyHTTP(t, true, true) }
func TestProxyHTTP_Insecure_DelayRx(t *testing.T) { testProxyHTTP(t, false, false) }
func TestProxyHTTP_Secure_DelayRx(t *testing.T) { testProxyHTTP(t, true, false) }
func testProxyHTTP(t *testing.T, secure, delayTx bool) {
func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) }
func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) }
func testServerHTTP(t *testing.T, secure, delayTx bool) {
scheme := "tcp"
ln1, ln2 := listen(t, scheme, "localhost:0", TLSInfo{}), listen(t, scheme, "localhost:0", TLSInfo{})
ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
ln1.Close()
ln2.Close()
@ -461,15 +466,15 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
}()
time.Sleep(200 * time.Millisecond)
cfg := ProxyConfig{
cfg := ServerConfig{
Logger: testLogger,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
}
if secure {
cfg.TLSInfo = testTLSInfo
}
p := NewProxy(cfg)
p := NewServer(cfg)
<-p.Ready()
defer p.Close()
@ -478,7 +483,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
now := time.Now()
var resp *http.Response
if secure {
tp, terr := NewTransport(testTLSInfo, 3*time.Second)
tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
if terr != nil {
t.Fatal(terr)
}
@ -514,7 +519,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
now = time.Now()
if secure {
tp, terr := NewTransport(testTLSInfo, 3*time.Second)
tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
if terr != nil {
t.Fatal(terr)
}
@ -550,10 +555,10 @@ func newUnixAddr() string {
return addr
}
func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener) {
func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
var err error
if !tlsInfo.Empty() {
ln, err = NewListener(addr, scheme, &tlsInfo)
ln, err = transport.NewListener(addr, scheme, &tlsInfo)
} else {
ln, err = net.Listen(scheme, addr)
}
@ -563,11 +568,11 @@ func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener
return ln
}
func send(t *testing.T, data []byte, scheme, addr string, tlsInfo TLSInfo) {
func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
var out net.Conn
var err error
if !tlsInfo.Empty() {
tp, terr := NewTransport(tlsInfo, 3*time.Second)
tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
if terr != nil {
t.Fatal(terr)
}