From 849f88edbfc50d197aefb1a0699c6941ed10fee7 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 2 Jan 2018 08:48:00 -0800 Subject: [PATCH] etcd-test-proxy: initial commit Signed-off-by: Gyuho Lee --- tools/etcd-test-proxy/Procfile | 14 +++ tools/etcd-test-proxy/README.md | 184 +++++++++++++++++++++++++++ tools/etcd-test-proxy/main.go | 216 ++++++++++++++++++++++++++++++++ 3 files changed, 414 insertions(+) create mode 100644 tools/etcd-test-proxy/Procfile create mode 100644 tools/etcd-test-proxy/README.md create mode 100644 tools/etcd-test-proxy/main.go diff --git a/tools/etcd-test-proxy/Procfile b/tools/etcd-test-proxy/Procfile new file mode 100644 index 000000000..e820c1ea6 --- /dev/null +++ b/tools/etcd-test-proxy/Procfile @@ -0,0 +1,14 @@ +s1: bin/etcd --name s1 --data-dir /tmp/etcd-test-proxy-data.s1 --listen-client-urls http://127.0.0.1:1379 --advertise-client-urls http://127.0.0.1:13790 --listen-peer-urls http://127.0.0.1:1380 --initial-advertise-peer-urls http://127.0.0.1:13800 --initial-cluster-token tkn --initial-cluster 's1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800' --initial-cluster-state new + +s1-client-proxy: bin/etcd-test-proxy --from localhost:13790 --to localhost:1379 --http-port 1378 +s1-peer-proxy: bin/etcd-test-proxy --from localhost:13800 --to localhost:1380 --http-port 1381 + +s2: bin/etcd --name s2 --data-dir /tmp/etcd-test-proxy-data.s2 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:23790 --listen-peer-urls http://127.0.0.1:2380 --initial-advertise-peer-urls http://127.0.0.1:23800 --initial-cluster-token tkn --initial-cluster 's1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800' --initial-cluster-state new + +s2-client-proxy: bin/etcd-test-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 +s2-peer-proxy: bin/etcd-test-proxy --from localhost:23800 --to localhost:2380 --http-port 2381 + +s3: bin/etcd --name s3 --data-dir /tmp/etcd-test-proxy-data.s3 --listen-client-urls http://127.0.0.1:3379 --advertise-client-urls http://127.0.0.1:33790 --listen-peer-urls http://127.0.0.1:3380 --initial-advertise-peer-urls http://127.0.0.1:33800 --initial-cluster-token tkn --initial-cluster 's1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800' --initial-cluster-state new + +s3-client-proxy: bin/etcd-test-proxy --from localhost:33790 --to localhost:3379 --http-port 3378 +s3-client-proxy: bin/etcd-test-proxy --from localhost:33800 --to localhost:3380 --http-port 3381 diff --git a/tools/etcd-test-proxy/README.md b/tools/etcd-test-proxy/README.md new file mode 100644 index 000000000..e7836809d --- /dev/null +++ b/tools/etcd-test-proxy/README.md @@ -0,0 +1,184 @@ +#### etcd-test-proxy + +Proxy layer that simulates various network conditions. + +Test locally + +```bash +$ ./build +$ ./bin/etcd + +$ make build-etcd-test-proxy -f ./hack/scripts-dev/Makefile + +$ ./bin/etcd-test-proxy --help +$ ./bin/etcd-test-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose + +$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar +$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar +``` + +Proxy overhead per request is under 500μs + +```bash +$ go build -v -o ./bin/benchmark ./cmd/tools/benchmark + +$ ./bin/benchmark \ + --endpoints localhost:2379 \ + --conns 5 \ + --clients 15 \ + put \ + --key-size 48 \ + --val-size 50000 \ + --total 10000 + +< tcp://localhost:2379] + +$ ETCDCTL_API=3 ./bin/etcdctl \ + --endpoints localhost:23790 \ + put foo bar +# Error: context deadline exceeded + +$ curl -L http://localhost:2378/pause-tx -X DELETE +# unpaused forwarding [tcp://localhost:23790 -> tcp://localhost:2379] +``` + +Drop client packets + +```bash +$ curl -L http://localhost:2378/blackhole-tx -X PUT +# blackholed; dropping packets [tcp://localhost:23790 -> tcp://localhost:2379] + +$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar +# Error: context deadline exceeded + +$ curl -L http://localhost:2378/blackhole-tx -X DELETE +# unblackholed; restart forwarding [tcp://localhost:23790 -> tcp://localhost:2379] +``` + +Trigger leader election + +```bash +$ ./build +$ make build-etcd-test-proxy -f ./hack/scripts-dev/Makefile + +$ rm -rf /tmp/etcd-test-proxy-data.s* +$ goreman -f ./tools/etcd-test-proxy/Procfile start + +$ ETCDCTL_API=3 ./bin/etcdctl \ + --endpoints localhost:13790,localhost:23790,localhost:33790 \ + member list + +# isolate s1 when s1 is the current leader +$ curl -L http://localhost:1381/blackhole-tx -X PUT +$ curl -L http://localhost:1381/blackhole-rx -X PUT +# s1 becomes follower after election timeout +``` diff --git a/tools/etcd-test-proxy/main.go b/tools/etcd-test-proxy/main.go new file mode 100644 index 000000000..812edcbaf --- /dev/null +++ b/tools/etcd-test-proxy/main.go @@ -0,0 +1,216 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "net/url" + "os" + "os/signal" + "syscall" + "time" + + "github.com/coreos/etcd/pkg/transport" + + "google.golang.org/grpc/grpclog" +) + +var from string +var to string +var httpPort int +var verbose bool + +func main() { + // TODO: support TLS + flag.StringVar(&from, "from", "localhost:23790", "Address URL to proxy from.") + flag.StringVar(&to, "to", "localhost:2379", "Address URL to forward.") + flag.IntVar(&httpPort, "http-port", 2378, "Port to serve etcd-test-proxy API.") + flag.BoolVar(&verbose, "verbose", false, "'true' to run proxy in verbose mode.") + + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %q:\n", os.Args[0]) + fmt.Fprintln(os.Stderr, ` +etcd-test-proxy simulates various network conditions for etcd testing purposes. +See README.md for more examples. + +Example: + +# build etcd +$ ./build +$ ./bin/etcd + +# build etcd-test-proxy +$ make build-etcd-test-proxy -f ./hack/scripts-dev/Makefile + +# to test etcd with proxy layer +$ ./bin/etcd-test-proxy --help +$ ./bin/etcd-test-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose + +$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar +$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar +`) + flag.PrintDefaults() + } + + flag.Parse() + + cfg := transport.ProxyConfig{ + From: url.URL{Scheme: "tcp", Host: from}, + To: url.URL{Scheme: "tcp", Host: to}, + } + if verbose { + cfg.Logger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5) + } + p := transport.NewProxy(cfg) + <-p.Ready() + defer p.Close() + + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(fmt.Sprintf("proxying [%s -> %s]\n", p.From(), p.To()))) + }) + mux.HandleFunc("/delay-tx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + w.Write([]byte(fmt.Sprintf("current send latency %v\n", p.LatencyTx()))) + case http.MethodPut, http.MethodPost: + if err := req.ParseForm(); err != nil { + w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error()))) + return + } + lat, err := time.ParseDuration(req.PostForm.Get("latency")) + if err != nil { + w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error()))) + return + } + rv, err := time.ParseDuration(req.PostForm.Get("random-variable")) + if err != nil { + w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error()))) + return + } + p.DelayTx(lat, rv) + w.Write([]byte(fmt.Sprintf("added send latency %v±%v (current latency %v)\n", lat, rv, p.LatencyTx()))) + case http.MethodDelete: + lat := p.LatencyTx() + p.UndelayTx() + w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + mux.HandleFunc("/delay-rx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + w.Write([]byte(fmt.Sprintf("current receive latency %v\n", p.LatencyRx()))) + case http.MethodPut, http.MethodPost: + if err := req.ParseForm(); err != nil { + w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error()))) + return + } + lat, err := time.ParseDuration(req.PostForm.Get("latency")) + if err != nil { + w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error()))) + return + } + rv, err := time.ParseDuration(req.PostForm.Get("random-variable")) + if err != nil { + w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error()))) + return + } + p.DelayRx(lat, rv) + w.Write([]byte(fmt.Sprintf("added receive latency %v±%v (current latency %v)\n", lat, rv, p.LatencyRx()))) + case http.MethodDelete: + lat := p.LatencyRx() + p.UndelayRx() + w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + mux.HandleFunc("/pause-tx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodPut, http.MethodPost: + p.PauseTx() + w.Write([]byte(fmt.Sprintf("paused forwarding [%s -> %s]\n", p.From(), p.To()))) + case http.MethodDelete: + p.UnpauseTx() + w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s -> %s]\n", p.From(), p.To()))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + mux.HandleFunc("/pause-rx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodPut, http.MethodPost: + p.PauseRx() + w.Write([]byte(fmt.Sprintf("paused forwarding [%s <- %s]\n", p.From(), p.To()))) + case http.MethodDelete: + p.UnpauseRx() + w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s <- %s]\n", p.From(), p.To()))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + mux.HandleFunc("/blackhole-tx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodPut, http.MethodPost: + p.BlackholeTx() + w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s -> %s]\n", p.From(), p.To()))) + case http.MethodDelete: + p.UnblackholeTx() + w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s -> %s]\n", p.From(), p.To()))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + mux.HandleFunc("/blackhole-rx", func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodPut, http.MethodPost: + p.BlackholeRx() + w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s <- %s]\n", p.From(), p.To()))) + case http.MethodDelete: + p.UnblackholeRx() + w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s <- %s]\n", p.From(), p.To()))) + default: + w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method))) + } + }) + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", httpPort), + Handler: mux, + } + defer srv.Close() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(sig) + + go func() { + s := <-sig + fmt.Printf("\n\nreceived signal %q, shutting down HTTP server\n\n", s) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + err := srv.Shutdown(ctx) + cancel() + fmt.Printf("gracefully stopped HTTP server with %v\n\n", err) + os.Exit(0) + }() + + fmt.Printf("\nserving HTTP server http://localhost:%d\n\n", httpPort) + err := srv.ListenAndServe() + fmt.Printf("HTTP server exit with error %v\n", err) +}