From bafe960dba2c02c98f215cfbebf74a51e73a36ea Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 10 Sep 2014 15:22:50 -0700 Subject: [PATCH 1/4] etcdhttp: add Peers.Endpoints --- etcdserver/etcdhttp/http.go | 24 +++++++++----- etcdserver/etcdhttp/http_test.go | 54 ++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 392237414..6d52bf15a 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -85,6 +85,20 @@ func (ps Peers) IDs() []int64 { return ids } +// Endpoints returns a list of all peer addresses. Each address is +// prefixed with "http://". The returned list is sorted (asc). +func (ps Peers) Endpoints() []string { + endpoints := make([]string, 0) + for _, addrs := range ps { + for _, addr := range addrs { + endpoints = append(endpoints, addScheme(addr)) + } + } + sort.Strings(endpoints) + + return endpoints +} + var errClosed = errors.New("etcdhttp: client closed connection") const DefaultTimeout = 500 * time.Millisecond @@ -209,14 +223,8 @@ func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) { allow(w, "GET", "HEAD") return } - urls := make([]string, 0) - for _, addrs := range h.Peers { - for _, addr := range addrs { - urls = append(urls, addScheme(addr)) - } - } - sort.Strings(urls) - w.Write([]byte(strings.Join(urls, ", "))) + endpoints := h.Peers.Endpoints() + w.Write([]byte(strings.Join(endpoints, ", "))) } func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) { diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 8b2493f9f..2b1cebd20 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -397,3 +397,57 @@ func TestServeMachines(t *testing.T) { t.Errorf("header = %d, want %d", writer.Code, http.StatusOK) } } + +func TestPeersEndpoints(t *testing.T) { + tests := []struct { + peers Peers + endpoints []string + }{ + // single peer with a single address + { + peers: Peers(map[int64][]string{ + 1: []string{"192.0.2.1"}, + }), + endpoints: []string{"http://192.0.2.1"}, + }, + + // single peer with a single address with a port + { + peers: Peers(map[int64][]string{ + 1: []string{"192.0.2.1:8001"}, + }), + endpoints: []string{"http://192.0.2.1:8001"}, + }, + + // several peers explicitly unsorted + { + peers: Peers(map[int64][]string{ + 2: []string{"192.0.2.3", "192.0.2.4"}, + 3: []string{"192.0.2.5", "192.0.2.6"}, + 1: []string{"192.0.2.1", "192.0.2.2"}, + }), + endpoints: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, + }, + + // no peers + { + peers: Peers(map[int64][]string{}), + endpoints: []string{}, + }, + + // peer with no endpoints + { + peers: Peers(map[int64][]string{ + 3: []string{}, + }), + endpoints: []string{}, + }, + } + + for i, tt := range tests { + endpoints := tt.peers.Endpoints() + if !reflect.DeepEqual(tt.endpoints, endpoints) { + t.Errorf("#%d: peers.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints) + } + } +} From a3334eed23475433fbdcfcfd331b989347a1c922 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 10 Sep 2014 11:06:31 -0700 Subject: [PATCH 2/4] main: break out startEtcd func --- main.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index f7cfab6e2..8c8e86ea6 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,13 @@ func init() { func main() { flag.Parse() + h := startEtcd() + + http.Handle("/", h) + log.Fatal(http.ListenAndServe(*laddr, nil)) +} + +func startEtcd() http.Handler { id, err := strconv.ParseInt(*fid, 0, 64) if err != nil { log.Fatal(err) @@ -67,13 +74,14 @@ func main() { Ticker: tk.C, } etcdserver.Start(s) - h := &etcdhttp.Handler{ + + h := etcdhttp.Handler{ Timeout: *timeout, Server: s, Peers: *peers, } - http.Handle("/", h) - log.Fatal(http.ListenAndServe(*laddr, nil)) + + return &h } // startRaft starts a raft node from the given wal dir. From e5a482266f93dd44d5634fefd2c0d35b06dc3a83 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 10 Sep 2014 15:23:35 -0700 Subject: [PATCH 3/4] proxy: introduce director The director class drives an httputil.ReverseProxy. This is used when etcd is deployed in proxy mode. --- proxy/proxy.go | 64 ++++++++++++++++++++++++++++++++++++++++ proxy/proxy_test.go | 71 +++++++++++++++++++++++++++++++++++++++++++++ test | 2 +- 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 proxy/proxy.go create mode 100644 proxy/proxy_test.go diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 000000000..167b46ad5 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,64 @@ +package proxy + +import ( + "errors" + "fmt" + "net/http" + "net/http/httputil" + "net/url" +) + +func NewHandler(endpoints []string) (*httputil.ReverseProxy, error) { + d, err := newDirector(endpoints) + if err != nil { + return nil, err + } + + proxy := httputil.ReverseProxy{ + Director: d.direct, + Transport: &http.Transport{}, + FlushInterval: 0, + } + + return &proxy, nil +} + +func newDirector(endpoints []string) (*director, error) { + if len(endpoints) == 0 { + return nil, errors.New("one or more endpoints required") + } + + urls := make([]url.URL, len(endpoints)) + for i, e := range endpoints { + u, err := url.Parse(e) + if err != nil { + return nil, fmt.Errorf("invalid endpoint %q: %v", e, err) + } + + if u.Scheme == "" { + return nil, fmt.Errorf("invalid endpoint %q: scheme required", e) + } + + if u.Host == "" { + return nil, fmt.Errorf("invalid endpoint %q: host empty", e) + } + + urls[i] = *u + } + + d := director{ + endpoints: urls, + } + + return &d, nil +} + +type director struct { + endpoints []url.URL +} + +func (d *director) direct(req *http.Request) { + choice := d.endpoints[0] + req.URL.Scheme = choice.Scheme + req.URL.Host = choice.Host +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go new file mode 100644 index 000000000..707660cc2 --- /dev/null +++ b/proxy/proxy_test.go @@ -0,0 +1,71 @@ +package proxy + +import ( + "net/http" + "net/url" + "reflect" + "testing" +) + +func TestNewDirector(t *testing.T) { + tests := []struct { + good bool + endpoints []string + }{ + {true, []string{"http://192.0.2.8"}}, + {true, []string{"http://192.0.2.8:8001"}}, + {true, []string{"http://example.com"}}, + {true, []string{"http://example.com:8001"}}, + {true, []string{"http://192.0.2.8:8001", "http://example.com:8002"}}, + + {false, []string{"192.0.2.8"}}, + {false, []string{"192.0.2.8:8001"}}, + {false, []string{""}}, + } + + for i, tt := range tests { + _, err := newDirector(tt.endpoints) + if tt.good != (err == nil) { + t.Errorf("#%d: expected success = %t, got err = %v", i, tt.good, err) + } + } +} + +func TestDirectorDirect(t *testing.T) { + d := &director{ + endpoints: []url.URL{ + url.URL{ + Scheme: "http", + Host: "bar.example.com", + }, + }, + } + + req := &http.Request{ + Method: "GET", + Host: "foo.example.com", + URL: &url.URL{ + Host: "foo.example.com", + Path: "/v2/keys/baz", + }, + } + + d.direct(req) + + want := &http.Request{ + Method: "GET", + // this field must not change + Host: "foo.example.com", + URL: &url.URL{ + // the Scheme field is updated per the director's first endpoint + Scheme: "http", + // the Host field is updated per the director's first endpoint + Host: "bar.example.com", + Path: "/v2/keys/baz", + }, + } + + if !reflect.DeepEqual(want, req) { + t.Fatalf("HTTP request does not match expected criteria: want=%#v got=%#v", want, req) + } +} diff --git a/test b/test index f3d422bd2..d41c7f69b 100755 --- a/test +++ b/test @@ -14,7 +14,7 @@ COVER=${COVER:-"-cover"} source ./build -TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional raft store" +TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional proxy raft store" FORMATTABLE="$TESTABLE cors.go main.go" # user has not provided PKG override From 7415d530205faf0555a33b101c289f74e739d21a Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 10 Sep 2014 11:19:11 -0700 Subject: [PATCH 4/4] proxy: add proxy-mode functionality to etcd daemon --- main.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index 8c8e86ea6..dc9ec4214 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/proxy" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" @@ -23,10 +24,11 @@ const ( ) var ( - fid = flag.String("id", "0x1", "ID of this server") - timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") - laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')") - dir = flag.String("data-dir", "", "Path to the data directory") + fid = flag.String("id", "0x1", "ID of this server") + timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") + laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')") + dir = flag.String("data-dir", "", "Path to the data directory") + proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.") peers = &etcdhttp.Peers{} ) @@ -39,7 +41,12 @@ func init() { func main() { flag.Parse() - h := startEtcd() + var h http.Handler + if *proxyMode { + h = startProxy() + } else { + h = startEtcd() + } http.Handle("/", h) log.Fatal(http.ListenAndServe(*laddr, nil)) @@ -115,3 +122,12 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) { n := raft.Restart(id, peerIDs, 10, 1, st, ents) return n, w } + +func startProxy() http.Handler { + h, err := proxy.NewHandler((*peers).Endpoints()) + if err != nil { + log.Fatal(err) + } + + return h +}