From a912ddcf20da324e19277b4f3af5f3d6ac6f76d8 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 2 Jan 2018 08:55:14 -0800 Subject: [PATCH] etcd-agent: use "pkg/transport.Proxy" Signed-off-by: Gyuho Lee --- tools/functional-tester/etcd-agent/agent.go | 151 +++++++++++++++++++- 1 file changed, 145 insertions(+), 6 deletions(-) diff --git a/tools/functional-tester/etcd-agent/agent.go b/tools/functional-tester/etcd-agent/agent.go index beb276c61..97c6f0c3e 100644 --- a/tools/functional-tester/etcd-agent/agent.go +++ b/tools/functional-tester/etcd-agent/agent.go @@ -15,14 +15,19 @@ package main import ( + "fmt" + "net" + "net/url" "os" "os/exec" "path/filepath" + "strconv" + "sync" "syscall" "time" "github.com/coreos/etcd/pkg/fileutil" - "github.com/coreos/etcd/pkg/netutil" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" ) @@ -40,6 +45,9 @@ type Agent struct { logfile *os.File cfg AgentConfig + + pmu sync.Mutex + advertisePortToProxy map[int]transport.Proxy } type AgentConfig struct { @@ -68,7 +76,13 @@ func newAgent(cfg AgentConfig) (*Agent, error) { return nil, err } - return &Agent{state: stateUninitialized, cmd: c, logfile: f, cfg: cfg}, nil + return &Agent{ + state: stateUninitialized, + cmd: c, + logfile: f, + cfg: cfg, + advertisePortToProxy: make(map[int]transport.Proxy), + }, nil } // start starts a new etcd process with the given args. @@ -84,6 +98,85 @@ func (a *Agent) start(args ...string) error { } a.state = stateStarted + + a.pmu.Lock() + defer a.pmu.Unlock() + if len(a.advertisePortToProxy) == 0 { + // enough time for etcd start before setting up proxy + time.Sleep(time.Second) + var ( + err error + s string + listenClientURL *url.URL + advertiseClientURL *url.URL + advertiseClientURLPort int + listenPeerURL *url.URL + advertisePeerURL *url.URL + advertisePeerURLPort int + ) + for i := range args { + switch args[i] { + case "--listen-client-urls": + listenClientURL, err = url.Parse(args[i+1]) + if err != nil { + return err + } + case "--advertise-client-urls": + advertiseClientURL, err = url.Parse(args[i+1]) + if err != nil { + return err + } + _, s, err = net.SplitHostPort(advertiseClientURL.Host) + if err != nil { + return err + } + advertiseClientURLPort, err = strconv.Atoi(s) + if err != nil { + return err + } + case "--listen-peer-urls": + listenPeerURL, err = url.Parse(args[i+1]) + if err != nil { + return err + } + case "--initial-advertise-peer-urls": + advertisePeerURL, err = url.Parse(args[i+1]) + if err != nil { + return err + } + _, s, err = net.SplitHostPort(advertisePeerURL.Host) + if err != nil { + return err + } + advertisePeerURLPort, err = strconv.Atoi(s) + if err != nil { + return err + } + } + } + + clientProxy := transport.NewProxy(transport.ProxyConfig{ + From: *advertiseClientURL, + To: *listenClientURL, + }) + select { + case err = <-clientProxy.Error(): + return err + case <-time.After(time.Second): + } + a.advertisePortToProxy[advertiseClientURLPort] = clientProxy + + peerProxy := transport.NewProxy(transport.ProxyConfig{ + From: *advertisePeerURL, + To: *listenPeerURL, + }) + select { + case err = <-peerProxy.Error(): + return err + case <-time.After(time.Second): + } + a.advertisePortToProxy[advertisePeerURLPort] = peerProxy + } return nil } @@ -93,6 +186,24 @@ func (a *Agent) stopWithSig(sig os.Signal) error { return nil } + a.pmu.Lock() + if len(a.advertisePortToProxy) > 0 { + for _, p := range a.advertisePortToProxy { + if err := p.Close(); err != nil { + a.pmu.Unlock() + return err + } + select { + case <-p.Done(): + // enough time to release port + time.Sleep(time.Second) + case <-time.After(time.Second): + } + } + a.advertisePortToProxy = make(map[int]transport.Proxy) + } + a.pmu.Unlock() + err := stopWithSig(a.cmd, sig) if err != nil { return err @@ -177,18 +288,46 @@ func (a *Agent) terminate() error { } func (a *Agent) dropPort(port int) error { - return netutil.DropPort(port) + a.pmu.Lock() + defer a.pmu.Unlock() + + p, ok := a.advertisePortToProxy[port] + if !ok { + return fmt.Errorf("%d does not have proxy", port) + } + p.BlackholeTx() + p.BlackholeRx() + return nil } func (a *Agent) recoverPort(port int) error { - return netutil.RecoverPort(port) + a.pmu.Lock() + defer a.pmu.Unlock() + + p, ok := a.advertisePortToProxy[port] + if !ok { + return fmt.Errorf("%d does not have proxy", port) + } + p.UnblackholeTx() + p.UnblackholeRx() + return nil } func (a *Agent) setLatency(ms, rv int) error { + a.pmu.Lock() + defer a.pmu.Unlock() + if ms == 0 { - return netutil.RemoveLatency() + for _, p := range a.advertisePortToProxy { + p.UndelayTx() + p.UndelayRx() + } } - return netutil.SetLatency(ms, rv) + for _, p := range a.advertisePortToProxy { + p.DelayTx(time.Duration(ms)*time.Millisecond, time.Duration(rv)*time.Millisecond) + p.DelayRx(time.Duration(ms)*time.Millisecond, time.Duration(rv)*time.Millisecond) + } + return nil } func (a *Agent) status() client.Status {