mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
pkg/proxy: move from "pkg/transport"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
a831d15751
commit
7f812ca8e1
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package transport
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -25,13 +25,15 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Proxy defines proxy layer that simulates common network faults,
|
||||
// Server defines proxy server layer that simulates common network faults,
|
||||
// such as latency spikes, packet drop/corruption, etc..
|
||||
type Proxy interface {
|
||||
type Server interface {
|
||||
// From returns proxy source address in "scheme://host:port" format.
|
||||
From() string
|
||||
// To returns proxy destination address in "scheme://host:port" format.
|
||||
@ -100,11 +102,11 @@ type Proxy interface {
|
||||
ResetListener() error
|
||||
}
|
||||
|
||||
type proxy struct {
|
||||
logger *zap.Logger
|
||||
type proxyServer struct {
|
||||
lg *zap.Logger
|
||||
|
||||
from, to url.URL
|
||||
tlsInfo TLSInfo
|
||||
tlsInfo transport.TLSInfo
|
||||
dialTimeout time.Duration
|
||||
bufferSize int
|
||||
retryInterval time.Duration
|
||||
@ -141,12 +143,12 @@ type proxy struct {
|
||||
blackholeRxc chan struct{}
|
||||
}
|
||||
|
||||
// ProxyConfig defines proxy configuration.
|
||||
type ProxyConfig struct {
|
||||
// ServerConfig defines proxy server configuration.
|
||||
type ServerConfig struct {
|
||||
Logger *zap.Logger
|
||||
From url.URL
|
||||
To url.URL
|
||||
TLSInfo TLSInfo
|
||||
TLSInfo transport.TLSInfo
|
||||
DialTimeout time.Duration
|
||||
BufferSize int
|
||||
RetryInterval time.Duration
|
||||
@ -167,11 +169,11 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
// NewProxy returns a proxy implementation with no iptables/tc dependencies.
|
||||
// NewServer returns a proxy implementation with no iptables/tc dependencies.
|
||||
// The proxy layer overhead is <1ms.
|
||||
func NewProxy(cfg ProxyConfig) Proxy {
|
||||
p := &proxy{
|
||||
logger: cfg.Logger,
|
||||
func NewServer(cfg ServerConfig) Server {
|
||||
p := &proxyServer{
|
||||
lg: cfg.Logger,
|
||||
|
||||
from: cfg.From,
|
||||
to: cfg.To,
|
||||
@ -199,8 +201,8 @@ func NewProxy(cfg ProxyConfig) Proxy {
|
||||
if p.retryInterval == 0 {
|
||||
p.retryInterval = defaultRetryInterval
|
||||
}
|
||||
if p.logger == nil {
|
||||
p.logger = defaultLogger
|
||||
if p.lg == nil {
|
||||
p.lg = defaultLogger
|
||||
}
|
||||
close(p.pauseAcceptc)
|
||||
close(p.pauseTxc)
|
||||
@ -216,7 +218,7 @@ func NewProxy(cfg ProxyConfig) Proxy {
|
||||
var ln net.Listener
|
||||
var err error
|
||||
if !p.tlsInfo.Empty() {
|
||||
ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
|
||||
ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
|
||||
} else {
|
||||
ln, err = net.Listen(p.from.Scheme, p.from.Host)
|
||||
}
|
||||
@ -230,15 +232,15 @@ func NewProxy(cfg ProxyConfig) Proxy {
|
||||
p.closeWg.Add(1)
|
||||
go p.listenAndServe()
|
||||
|
||||
p.logger.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
|
||||
p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *proxy) From() string {
|
||||
func (p *proxyServer) From() string {
|
||||
return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
|
||||
}
|
||||
|
||||
func (p *proxy) To() string {
|
||||
func (p *proxyServer) To() string {
|
||||
return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
|
||||
}
|
||||
|
||||
@ -247,10 +249,10 @@ func (p *proxy) To() string {
|
||||
// - https://github.com/coreos/etcd/issues/5614
|
||||
// - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
|
||||
|
||||
func (p *proxy) listenAndServe() {
|
||||
func (p *proxyServer) listenAndServe() {
|
||||
defer p.closeWg.Done()
|
||||
|
||||
p.logger.Info("proxy is listening on", zap.String("from", p.From()))
|
||||
p.lg.Info("proxy is listening on", zap.String("from", p.From()))
|
||||
close(p.readyc)
|
||||
|
||||
for {
|
||||
@ -290,7 +292,7 @@ func (p *proxy) listenAndServe() {
|
||||
case <-p.donec:
|
||||
return
|
||||
}
|
||||
p.logger.Debug("listener accept error", zap.Error(err))
|
||||
p.lg.Debug("listener accept error", zap.Error(err))
|
||||
|
||||
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||
select {
|
||||
@ -298,7 +300,7 @@ func (p *proxy) listenAndServe() {
|
||||
case <-p.donec:
|
||||
return
|
||||
}
|
||||
p.logger.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
|
||||
p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
|
||||
|
||||
if err = p.ResetListener(); err != nil {
|
||||
select {
|
||||
@ -311,7 +313,7 @@ func (p *proxy) listenAndServe() {
|
||||
case <-p.donec:
|
||||
return
|
||||
}
|
||||
p.logger.Warn("failed to reset listener", zap.Error(err))
|
||||
p.lg.Warn("failed to reset listener", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -321,7 +323,7 @@ func (p *proxy) listenAndServe() {
|
||||
var out net.Conn
|
||||
if !p.tlsInfo.Empty() {
|
||||
var tp *http.Transport
|
||||
tp, err = NewTransport(p.tlsInfo, p.dialTimeout)
|
||||
tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout)
|
||||
if err != nil {
|
||||
select {
|
||||
case p.errc <- err:
|
||||
@ -350,7 +352,7 @@ func (p *proxy) listenAndServe() {
|
||||
case <-p.donec:
|
||||
return
|
||||
}
|
||||
p.logger.Debug("failed to dial", zap.Error(err))
|
||||
p.lg.Debug("failed to dial", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -369,9 +371,9 @@ func (p *proxy) listenAndServe() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxy) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
|
||||
func (p *proxy) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
|
||||
func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
|
||||
func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
|
||||
func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
buf := make([]byte, p.bufferSize)
|
||||
for {
|
||||
nr, err := src.Read(buf)
|
||||
@ -396,7 +398,7 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
case <-p.donec:
|
||||
return
|
||||
}
|
||||
p.logger.Debug("failed to read", zap.Error(err))
|
||||
p.lg.Debug("failed to read", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if nr == 0 {
|
||||
@ -432,14 +434,14 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
}
|
||||
if blackholed {
|
||||
if proxySend {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"dropped",
|
||||
zap.String("data-size", humanize.Bytes(uint64(nr))),
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
} else {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"dropped",
|
||||
zap.String("data-size", humanize.Bytes(uint64(nr))),
|
||||
zap.String("from", p.To()),
|
||||
@ -498,9 +500,9 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
return
|
||||
}
|
||||
if proxySend {
|
||||
p.logger.Debug("failed to write while sending", zap.Error(err))
|
||||
p.lg.Debug("failed to write while sending", zap.Error(err))
|
||||
} else {
|
||||
p.logger.Debug("failed to write while receiving", zap.Error(err))
|
||||
p.lg.Debug("failed to write while receiving", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -517,14 +519,14 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
return
|
||||
}
|
||||
if proxySend {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"failed to write while sending; read/write bytes are different",
|
||||
zap.Int("read-bytes", nr),
|
||||
zap.Int("write-bytes", nw),
|
||||
zap.Error(io.ErrShortWrite),
|
||||
)
|
||||
} else {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"failed to write while receiving; read/write bytes are different",
|
||||
zap.Int("read-bytes", nr),
|
||||
zap.Int("write-bytes", nw),
|
||||
@ -535,14 +537,14 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
}
|
||||
|
||||
if proxySend {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"transmitted",
|
||||
zap.String("data-size", humanize.Bytes(uint64(nr))),
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
} else {
|
||||
p.logger.Debug(
|
||||
p.lg.Debug(
|
||||
"received",
|
||||
zap.String("data-size", humanize.Bytes(uint64(nr))),
|
||||
zap.String("from", p.To()),
|
||||
@ -553,29 +555,29 @@ func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxy) Ready() <-chan struct{} { return p.readyc }
|
||||
func (p *proxy) Done() <-chan struct{} { return p.donec }
|
||||
func (p *proxy) Error() <-chan error { return p.errc }
|
||||
func (p *proxy) Close() (err error) {
|
||||
func (p *proxyServer) Ready() <-chan struct{} { return p.readyc }
|
||||
func (p *proxyServer) Done() <-chan struct{} { return p.donec }
|
||||
func (p *proxyServer) Error() <-chan error { return p.errc }
|
||||
func (p *proxyServer) Close() (err error) {
|
||||
p.closeOnce.Do(func() {
|
||||
close(p.donec)
|
||||
p.listenerMu.Lock()
|
||||
if p.listener != nil {
|
||||
err = p.listener.Close()
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"closed proxy listener",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
p.logger.Sync()
|
||||
p.lg.Sync()
|
||||
p.listenerMu.Unlock()
|
||||
})
|
||||
p.closeWg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *proxy) DelayAccept(latency, rv time.Duration) {
|
||||
func (p *proxyServer) DelayAccept(latency, rv time.Duration) {
|
||||
if latency <= 0 {
|
||||
return
|
||||
}
|
||||
@ -584,7 +586,7 @@ func (p *proxy) DelayAccept(latency, rv time.Duration) {
|
||||
p.latencyAccept = d
|
||||
p.latencyAcceptMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"set accept latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.Duration("given-latency", latency),
|
||||
@ -594,13 +596,13 @@ func (p *proxy) DelayAccept(latency, rv time.Duration) {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UndelayAccept() {
|
||||
func (p *proxyServer) UndelayAccept() {
|
||||
p.latencyAcceptMu.Lock()
|
||||
d := p.latencyAccept
|
||||
p.latencyAccept = 0
|
||||
p.latencyAcceptMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"removed accept latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.String("from", p.From()),
|
||||
@ -608,14 +610,14 @@ func (p *proxy) UndelayAccept() {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) LatencyAccept() time.Duration {
|
||||
func (p *proxyServer) LatencyAccept() time.Duration {
|
||||
p.latencyAcceptMu.RLock()
|
||||
d := p.latencyAccept
|
||||
p.latencyAcceptMu.RUnlock()
|
||||
return d
|
||||
}
|
||||
|
||||
func (p *proxy) DelayTx(latency, rv time.Duration) {
|
||||
func (p *proxyServer) DelayTx(latency, rv time.Duration) {
|
||||
if latency <= 0 {
|
||||
return
|
||||
}
|
||||
@ -624,7 +626,7 @@ func (p *proxy) DelayTx(latency, rv time.Duration) {
|
||||
p.latencyTx = d
|
||||
p.latencyTxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"set transmit latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.Duration("given-latency", latency),
|
||||
@ -634,13 +636,13 @@ func (p *proxy) DelayTx(latency, rv time.Duration) {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UndelayTx() {
|
||||
func (p *proxyServer) UndelayTx() {
|
||||
p.latencyTxMu.Lock()
|
||||
d := p.latencyTx
|
||||
p.latencyTx = 0
|
||||
p.latencyTxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"removed transmit latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.String("from", p.From()),
|
||||
@ -648,14 +650,14 @@ func (p *proxy) UndelayTx() {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) LatencyTx() time.Duration {
|
||||
func (p *proxyServer) LatencyTx() time.Duration {
|
||||
p.latencyTxMu.RLock()
|
||||
d := p.latencyTx
|
||||
p.latencyTxMu.RUnlock()
|
||||
return d
|
||||
}
|
||||
|
||||
func (p *proxy) DelayRx(latency, rv time.Duration) {
|
||||
func (p *proxyServer) DelayRx(latency, rv time.Duration) {
|
||||
if latency <= 0 {
|
||||
return
|
||||
}
|
||||
@ -664,7 +666,7 @@ func (p *proxy) DelayRx(latency, rv time.Duration) {
|
||||
p.latencyRx = d
|
||||
p.latencyRxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"set receive latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.Duration("given-latency", latency),
|
||||
@ -674,13 +676,13 @@ func (p *proxy) DelayRx(latency, rv time.Duration) {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UndelayRx() {
|
||||
func (p *proxyServer) UndelayRx() {
|
||||
p.latencyRxMu.Lock()
|
||||
d := p.latencyRx
|
||||
p.latencyRx = 0
|
||||
p.latencyRxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"removed receive latency",
|
||||
zap.Duration("latency", d),
|
||||
zap.String("from", p.To()),
|
||||
@ -688,7 +690,7 @@ func (p *proxy) UndelayRx() {
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) LatencyRx() time.Duration {
|
||||
func (p *proxyServer) LatencyRx() time.Duration {
|
||||
p.latencyRxMu.RLock()
|
||||
d := p.latencyRx
|
||||
p.latencyRxMu.RUnlock()
|
||||
@ -714,19 +716,19 @@ func computeLatency(lat, rv time.Duration) time.Duration {
|
||||
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
|
||||
}
|
||||
|
||||
func (p *proxy) PauseAccept() {
|
||||
func (p *proxyServer) PauseAccept() {
|
||||
p.acceptMu.Lock()
|
||||
p.pauseAcceptc = make(chan struct{})
|
||||
p.acceptMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"paused accepting new connections",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UnpauseAccept() {
|
||||
func (p *proxyServer) UnpauseAccept() {
|
||||
p.acceptMu.Lock()
|
||||
select {
|
||||
case <-p.pauseAcceptc: // already unpaused
|
||||
@ -738,26 +740,26 @@ func (p *proxy) UnpauseAccept() {
|
||||
}
|
||||
p.acceptMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"unpaused accepting new connections",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) PauseTx() {
|
||||
func (p *proxyServer) PauseTx() {
|
||||
p.txMu.Lock()
|
||||
p.pauseTxc = make(chan struct{})
|
||||
p.txMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"paused transmit listen",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UnpauseTx() {
|
||||
func (p *proxyServer) UnpauseTx() {
|
||||
p.txMu.Lock()
|
||||
select {
|
||||
case <-p.pauseTxc: // already unpaused
|
||||
@ -769,26 +771,26 @@ func (p *proxy) UnpauseTx() {
|
||||
}
|
||||
p.txMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"unpaused transmit listen",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) PauseRx() {
|
||||
func (p *proxyServer) PauseRx() {
|
||||
p.rxMu.Lock()
|
||||
p.pauseRxc = make(chan struct{})
|
||||
p.rxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"paused receive listen",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UnpauseRx() {
|
||||
func (p *proxyServer) UnpauseRx() {
|
||||
p.rxMu.Lock()
|
||||
select {
|
||||
case <-p.pauseRxc: // already unpaused
|
||||
@ -800,14 +802,14 @@ func (p *proxy) UnpauseRx() {
|
||||
}
|
||||
p.rxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"unpaused receive listen",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) BlackholeTx() {
|
||||
func (p *proxyServer) BlackholeTx() {
|
||||
p.txMu.Lock()
|
||||
select {
|
||||
case <-p.blackholeTxc: // already blackholed
|
||||
@ -819,26 +821,26 @@ func (p *proxy) BlackholeTx() {
|
||||
}
|
||||
p.txMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"blackholed transmit",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UnblackholeTx() {
|
||||
func (p *proxyServer) UnblackholeTx() {
|
||||
p.txMu.Lock()
|
||||
p.blackholeTxc = make(chan struct{})
|
||||
p.txMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"unblackholed transmit",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) BlackholeRx() {
|
||||
func (p *proxyServer) BlackholeRx() {
|
||||
p.rxMu.Lock()
|
||||
select {
|
||||
case <-p.blackholeRxc: // already blackholed
|
||||
@ -850,73 +852,73 @@ func (p *proxy) BlackholeRx() {
|
||||
}
|
||||
p.rxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"blackholed receive",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UnblackholeRx() {
|
||||
func (p *proxyServer) UnblackholeRx() {
|
||||
p.rxMu.Lock()
|
||||
p.blackholeRxc = make(chan struct{})
|
||||
p.rxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"unblackholed receive",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) CorruptTx(f func([]byte) []byte) {
|
||||
func (p *proxyServer) CorruptTx(f func([]byte) []byte) {
|
||||
p.corruptTxMu.Lock()
|
||||
p.corruptTx = f
|
||||
p.corruptTxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"corrupting transmit",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UncorruptTx() {
|
||||
func (p *proxyServer) UncorruptTx() {
|
||||
p.corruptTxMu.Lock()
|
||||
p.corruptTx = nil
|
||||
p.corruptTxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"stopped corrupting transmit",
|
||||
zap.String("from", p.From()),
|
||||
zap.String("to", p.To()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) CorruptRx(f func([]byte) []byte) {
|
||||
func (p *proxyServer) CorruptRx(f func([]byte) []byte) {
|
||||
p.corruptRxMu.Lock()
|
||||
p.corruptRx = f
|
||||
p.corruptRxMu.Unlock()
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"corrupting receive",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) UncorruptRx() {
|
||||
func (p *proxyServer) UncorruptRx() {
|
||||
p.corruptRxMu.Lock()
|
||||
p.corruptRx = nil
|
||||
p.corruptRxMu.Unlock()
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"stopped corrupting receive",
|
||||
zap.String("from", p.To()),
|
||||
zap.String("to", p.From()),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *proxy) ResetListener() error {
|
||||
func (p *proxyServer) ResetListener() error {
|
||||
p.listenerMu.Lock()
|
||||
defer p.listenerMu.Unlock()
|
||||
|
||||
@ -930,7 +932,7 @@ func (p *proxy) ResetListener() error {
|
||||
var ln net.Listener
|
||||
var err error
|
||||
if !p.tlsInfo.Empty() {
|
||||
ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
|
||||
ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
|
||||
} else {
|
||||
ln, err = net.Listen(p.from.Scheme, p.from.Host)
|
||||
}
|
||||
@ -939,7 +941,7 @@ func (p *proxy) ResetListener() error {
|
||||
}
|
||||
p.listener = ln
|
||||
|
||||
p.logger.Info(
|
||||
p.lg.Info(
|
||||
"reset listener on",
|
||||
zap.String("from", p.From()),
|
||||
)
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package transport
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@ -28,31 +28,33 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// enable DebugLevel
|
||||
var testLogger = zap.NewExample()
|
||||
|
||||
var testTLSInfo = TLSInfo{
|
||||
var testTLSInfo = transport.TLSInfo{
|
||||
KeyFile: "./fixtures/server.key.insecure",
|
||||
CertFile: "./fixtures/server.crt",
|
||||
TrustedCAFile: "./fixtures/ca.crt",
|
||||
ClientCertAuth: true,
|
||||
}
|
||||
|
||||
func TestProxy_Unix_Insecure(t *testing.T) { testProxy(t, "unix", false, false) }
|
||||
func TestProxy_TCP_Insecure(t *testing.T) { testProxy(t, "tcp", false, false) }
|
||||
func TestProxy_Unix_Secure(t *testing.T) { testProxy(t, "unix", true, false) }
|
||||
func TestProxy_TCP_Secure(t *testing.T) { testProxy(t, "tcp", true, false) }
|
||||
func TestProxy_Unix_Insecure_DelayTx(t *testing.T) { testProxy(t, "unix", false, true) }
|
||||
func TestProxy_TCP_Insecure_DelayTx(t *testing.T) { testProxy(t, "tcp", false, true) }
|
||||
func TestProxy_Unix_Secure_DelayTx(t *testing.T) { testProxy(t, "unix", true, true) }
|
||||
func TestProxy_TCP_Secure_DelayTx(t *testing.T) { testProxy(t, "tcp", true, true) }
|
||||
func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
|
||||
func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) }
|
||||
func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) }
|
||||
func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) }
|
||||
func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) }
|
||||
func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
|
||||
func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) }
|
||||
func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) }
|
||||
func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) }
|
||||
func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
if scheme == "tcp" {
|
||||
ln1, ln2 := listen(t, "tcp", "localhost:0", TLSInfo{}), listen(t, "tcp", "localhost:0", TLSInfo{})
|
||||
ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
|
||||
srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
|
||||
ln1.Close()
|
||||
ln2.Close()
|
||||
@ -64,12 +66,12 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
|
||||
}
|
||||
tlsInfo := testTLSInfo
|
||||
if !secure {
|
||||
tlsInfo = TLSInfo{}
|
||||
tlsInfo = transport.TLSInfo{}
|
||||
}
|
||||
ln := listen(t, scheme, dstAddr, tlsInfo)
|
||||
defer ln.Close()
|
||||
|
||||
cfg := ProxyConfig{
|
||||
cfg := ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -77,7 +79,7 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
|
||||
if secure {
|
||||
cfg.TLSInfo = testTLSInfo
|
||||
}
|
||||
p := NewProxy(cfg)
|
||||
p := NewServer(cfg)
|
||||
<-p.Ready()
|
||||
defer p.Close()
|
||||
|
||||
@ -162,9 +164,9 @@ func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_Unix_Insecure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, false) }
|
||||
func TestProxy_Unix_Secure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, true) }
|
||||
func testProxyDelayAccept(t *testing.T, secure bool) {
|
||||
func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
|
||||
func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
|
||||
func testServerDelayAccept(t *testing.T, secure bool) {
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
os.RemoveAll(srcAddr)
|
||||
@ -172,13 +174,13 @@ func testProxyDelayAccept(t *testing.T, secure bool) {
|
||||
}()
|
||||
tlsInfo := testTLSInfo
|
||||
if !secure {
|
||||
tlsInfo = TLSInfo{}
|
||||
tlsInfo = transport.TLSInfo{}
|
||||
}
|
||||
scheme := "unix"
|
||||
ln := listen(t, scheme, dstAddr, tlsInfo)
|
||||
defer ln.Close()
|
||||
|
||||
cfg := ProxyConfig{
|
||||
cfg := ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -186,7 +188,7 @@ func testProxyDelayAccept(t *testing.T, secure bool) {
|
||||
if secure {
|
||||
cfg.TLSInfo = testTLSInfo
|
||||
}
|
||||
p := NewProxy(cfg)
|
||||
p := NewServer(cfg)
|
||||
<-p.Ready()
|
||||
defer p.Close()
|
||||
|
||||
@ -221,17 +223,17 @@ func testProxyDelayAccept(t *testing.T, secure bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_PauseTx(t *testing.T) {
|
||||
func TestServer_PauseTx(t *testing.T) {
|
||||
scheme := "unix"
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
os.RemoveAll(srcAddr)
|
||||
os.RemoveAll(dstAddr)
|
||||
}()
|
||||
ln := listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
p := NewProxy(ProxyConfig{
|
||||
p := NewServer(ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -242,7 +244,7 @@ func TestProxy_PauseTx(t *testing.T) {
|
||||
p.PauseTx()
|
||||
|
||||
data := []byte("Hello World!")
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
|
||||
recvc := make(chan []byte)
|
||||
go func() {
|
||||
@ -267,17 +269,17 @@ func TestProxy_PauseTx(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_BlackholeTx(t *testing.T) {
|
||||
func TestServer_BlackholeTx(t *testing.T) {
|
||||
scheme := "unix"
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
os.RemoveAll(srcAddr)
|
||||
os.RemoveAll(dstAddr)
|
||||
}()
|
||||
ln := listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
p := NewProxy(ProxyConfig{
|
||||
p := NewServer(ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -288,7 +290,7 @@ func TestProxy_BlackholeTx(t *testing.T) {
|
||||
p.BlackholeTx()
|
||||
|
||||
data := []byte("Hello World!")
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
|
||||
recvc := make(chan []byte)
|
||||
go func() {
|
||||
@ -305,7 +307,7 @@ func TestProxy_BlackholeTx(t *testing.T) {
|
||||
|
||||
// expect different data, old data dropped
|
||||
data[0]++
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
|
||||
select {
|
||||
case d := <-recvc:
|
||||
@ -317,17 +319,17 @@ func TestProxy_BlackholeTx(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_CorruptTx(t *testing.T) {
|
||||
func TestServer_CorruptTx(t *testing.T) {
|
||||
scheme := "unix"
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
os.RemoveAll(srcAddr)
|
||||
os.RemoveAll(dstAddr)
|
||||
}()
|
||||
ln := listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
p := NewProxy(ProxyConfig{
|
||||
p := NewServer(ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -340,29 +342,29 @@ func TestProxy_CorruptTx(t *testing.T) {
|
||||
return d
|
||||
})
|
||||
data := []byte("Hello World!")
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
if d := receive(t, ln); bytes.Equal(d, data) {
|
||||
t.Fatalf("expected corrupted data, got %q", string(d))
|
||||
}
|
||||
|
||||
p.UncorruptTx()
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
if d := receive(t, ln); !bytes.Equal(d, data) {
|
||||
t.Fatalf("expected uncorrupted data, got %q", string(d))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_Shutdown(t *testing.T) {
|
||||
func TestServer_Shutdown(t *testing.T) {
|
||||
scheme := "unix"
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
os.RemoveAll(srcAddr)
|
||||
os.RemoveAll(dstAddr)
|
||||
}()
|
||||
ln := listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
p := NewProxy(ProxyConfig{
|
||||
p := NewServer(ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -370,18 +372,18 @@ func TestProxy_Shutdown(t *testing.T) {
|
||||
<-p.Ready()
|
||||
defer p.Close()
|
||||
|
||||
px, _ := p.(*proxy)
|
||||
px, _ := p.(*proxyServer)
|
||||
px.listener.Close()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
data := []byte("Hello World!")
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
if d := receive(t, ln); !bytes.Equal(d, data) {
|
||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxy_ShutdownListener(t *testing.T) {
|
||||
func TestServer_ShutdownListener(t *testing.T) {
|
||||
scheme := "unix"
|
||||
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
|
||||
defer func() {
|
||||
@ -389,10 +391,10 @@ func TestProxy_ShutdownListener(t *testing.T) {
|
||||
os.RemoveAll(dstAddr)
|
||||
}()
|
||||
|
||||
ln := listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
p := NewProxy(ProxyConfig{
|
||||
p := NewServer(ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -404,23 +406,23 @@ func TestProxy_ShutdownListener(t *testing.T) {
|
||||
ln.Close()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
ln = listen(t, scheme, dstAddr, TLSInfo{})
|
||||
ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
|
||||
defer ln.Close()
|
||||
|
||||
data := []byte("Hello World!")
|
||||
send(t, data, scheme, srcAddr, TLSInfo{})
|
||||
send(t, data, scheme, srcAddr, transport.TLSInfo{})
|
||||
if d := receive(t, ln); !bytes.Equal(d, data) {
|
||||
t.Fatalf("expected %q, got %q", string(data), string(d))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxyHTTP_Insecure_DelayTx(t *testing.T) { testProxyHTTP(t, false, true) }
|
||||
func TestProxyHTTP_Secure_DelayTx(t *testing.T) { testProxyHTTP(t, true, true) }
|
||||
func TestProxyHTTP_Insecure_DelayRx(t *testing.T) { testProxyHTTP(t, false, false) }
|
||||
func TestProxyHTTP_Secure_DelayRx(t *testing.T) { testProxyHTTP(t, true, false) }
|
||||
func testProxyHTTP(t *testing.T, secure, delayTx bool) {
|
||||
func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
|
||||
func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) }
|
||||
func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
|
||||
func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) }
|
||||
func testServerHTTP(t *testing.T, secure, delayTx bool) {
|
||||
scheme := "tcp"
|
||||
ln1, ln2 := listen(t, scheme, "localhost:0", TLSInfo{}), listen(t, scheme, "localhost:0", TLSInfo{})
|
||||
ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
|
||||
srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
|
||||
ln1.Close()
|
||||
ln2.Close()
|
||||
@ -464,7 +466,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
|
||||
}()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
cfg := ProxyConfig{
|
||||
cfg := ServerConfig{
|
||||
Logger: testLogger,
|
||||
From: url.URL{Scheme: scheme, Host: srcAddr},
|
||||
To: url.URL{Scheme: scheme, Host: dstAddr},
|
||||
@ -472,7 +474,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
|
||||
if secure {
|
||||
cfg.TLSInfo = testTLSInfo
|
||||
}
|
||||
p := NewProxy(cfg)
|
||||
p := NewServer(cfg)
|
||||
<-p.Ready()
|
||||
defer p.Close()
|
||||
|
||||
@ -481,7 +483,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
|
||||
now := time.Now()
|
||||
var resp *http.Response
|
||||
if secure {
|
||||
tp, terr := NewTransport(testTLSInfo, 3*time.Second)
|
||||
tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
|
||||
if terr != nil {
|
||||
t.Fatal(terr)
|
||||
}
|
||||
@ -517,7 +519,7 @@ func testProxyHTTP(t *testing.T, secure, delayTx bool) {
|
||||
|
||||
now = time.Now()
|
||||
if secure {
|
||||
tp, terr := NewTransport(testTLSInfo, 3*time.Second)
|
||||
tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
|
||||
if terr != nil {
|
||||
t.Fatal(terr)
|
||||
}
|
||||
@ -553,10 +555,10 @@ func newUnixAddr() string {
|
||||
return addr
|
||||
}
|
||||
|
||||
func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener) {
|
||||
func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
|
||||
var err error
|
||||
if !tlsInfo.Empty() {
|
||||
ln, err = NewListener(addr, scheme, &tlsInfo)
|
||||
ln, err = transport.NewListener(addr, scheme, &tlsInfo)
|
||||
} else {
|
||||
ln, err = net.Listen(scheme, addr)
|
||||
}
|
||||
@ -566,11 +568,11 @@ func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener
|
||||
return ln
|
||||
}
|
||||
|
||||
func send(t *testing.T, data []byte, scheme, addr string, tlsInfo TLSInfo) {
|
||||
func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
|
||||
var out net.Conn
|
||||
var err error
|
||||
if !tlsInfo.Empty() {
|
||||
tp, terr := NewTransport(tlsInfo, 3*time.Second)
|
||||
tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
|
||||
if terr != nil {
|
||||
t.Fatal(terr)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user