diff --git a/Procfile b/Procfile index e0ba5c860..3b4de56b3 100644 --- a/Procfile +++ b/Procfile @@ -2,4 +2,4 @@ etcd1: bin/etcd -name node1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new etcd2: bin/etcd -name node2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new etcd3: bin/etcd -name node3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new -#proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers 'localhost:7001,localhost:7002,localhost:7003' +proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' diff --git a/etcdserver/server.go b/etcdserver/server.go index bc7c2713c..27dac2641 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,7 @@ package etcdserver import ( "encoding/json" "errors" + "fmt" "io/ioutil" "log" "math/rand" @@ -184,7 +185,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer { haveWAL := wal.Exist(cfg.WALDir()) switch { case !haveWAL && cfg.ClusterState == ClusterStateValueExisting: - cl := getClusterFromPeers(cfg.Cluster.PeerURLs()) + cl, err := GetClusterFromPeers(cfg.Cluster.PeerURLs()) + if err != nil { + log.Fatal(err) + } if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil { log.Fatalf("etcdserver: %v", err) } @@ -669,7 +673,7 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { s.storage.Cut() } -func getClusterFromPeers(urls []string) *Cluster { +func GetClusterFromPeers(urls []string) (*Cluster, error) { for _, u := range urls { resp, err := http.Get(u + "/members") if err != nil { @@ -691,10 +695,9 @@ func getClusterFromPeers(urls []string) *Cluster { log.Printf("etcdserver: parse uint error: %v", err) continue } - return NewClusterFromMembers("", id, membs) + return NewClusterFromMembers("", id, membs), nil } - log.Fatalf("etcdserver: could not retrieve cluster information from the given urls") - return nil + return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") } func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) { diff --git a/main.go b/main.go index 120aa6263..96c42fc33 100644 --- a/main.go +++ b/main.go @@ -232,11 +232,18 @@ func startProxy() { log.Fatal(err) } - ph, err := proxy.NewHandler(pt, cls.PeerURLs()) - if err != nil { - log.Fatal(err) + // TODO(jonboulle): update peerURLs dynamically (i.e. when updating + // clientURLs) instead of just using the initial fixed list here + peerURLs := cls.PeerURLs() + uf := func() []string { + cls, err := etcdserver.GetClusterFromPeers(peerURLs) + if err != nil { + log.Printf("etcd: %v", err) + return []string{} + } + return cls.ClientURLs() } - + ph := proxy.NewHandler(pt, uf) ph = &pkg.CORSHandler{ Handler: ph, Info: cors, diff --git a/proxy/director.go b/proxy/director.go index 73e2a24f3..c8f23a4c8 100644 --- a/proxy/director.go +++ b/proxy/director.go @@ -17,7 +17,6 @@ package proxy import ( - "errors" "log" "net/url" "sync" @@ -28,28 +27,52 @@ const ( // amount of time an endpoint will be held in a failed // state before being reconsidered for proxied requests endpointFailureWait = 5 * time.Second + + // how often the proxy will attempt to refresh its set of endpoints + refreshEndpoints = 30 * time.Second ) -func newDirector(scheme string, addrs []string) (*director, error) { - if len(addrs) == 0 { - return nil, errors.New("one or more upstream addresses required") +func newDirector(urlsFunc GetProxyURLs) *director { + d := &director{ + uf: urlsFunc, } - - endpoints := make([]*endpoint, len(addrs)) - for i, addr := range addrs { - u := url.URL{Scheme: scheme, Host: addr} - endpoints[i] = newEndpoint(u) - } - - d := director{ep: endpoints} - return &d, nil + d.refresh() + go func() { + for { + select { + case <-time.After(refreshEndpoints): + d.refresh() + } + } + }() + return d } type director struct { + sync.Mutex ep []*endpoint + uf GetProxyURLs +} + +func (d *director) refresh() { + urls := d.uf() + d.Lock() + defer d.Unlock() + var endpoints []*endpoint + for _, u := range urls { + uu, err := url.Parse(u) + if err != nil { + log.Printf("upstream URL invalid: %v", err) + continue + } + endpoints = append(endpoints, newEndpoint(*uu)) + } + d.ep = endpoints } func (d *director) endpoints() []*endpoint { + d.Lock() + defer d.Unlock() filtered := make([]*endpoint, 0) for _, ep := range d.ep { if ep.Available { diff --git a/proxy/director_test.go b/proxy/director_test.go index 49f6c2354..7561849a0 100644 --- a/proxy/director_test.go +++ b/proxy/director_test.go @@ -24,41 +24,36 @@ import ( func TestNewDirectorScheme(t *testing.T) { tests := []struct { - scheme string - addrs []string - want []string + urls []string + want []string }{ { - scheme: "http", - addrs: []string{"192.0.2.8:4002", "example.com:8080"}, - want: []string{"http://192.0.2.8:4002", "http://example.com:8080"}, + urls: []string{"http://192.0.2.8:4002", "http://example.com:8080"}, + want: []string{"http://192.0.2.8:4002", "http://example.com:8080"}, }, { - scheme: "https", - addrs: []string{"192.0.2.8:4002", "example.com:8080"}, - want: []string{"https://192.0.2.8:4002", "https://example.com:8080"}, + urls: []string{"https://192.0.2.8:4002", "https://example.com:8080"}, + want: []string{"https://192.0.2.8:4002", "https://example.com:8080"}, }, - // accept addrs without a port + // accept urls without a port { - scheme: "http", - addrs: []string{"192.0.2.8"}, - want: []string{"http://192.0.2.8"}, + urls: []string{"http://192.0.2.8"}, + want: []string{"http://192.0.2.8"}, }, - // accept addrs even if they are garbage + // accept urls even if they are garbage { - scheme: "http", - addrs: []string{"."}, - want: []string{"http://."}, + urls: []string{"http://."}, + want: []string{"http://."}, }, } for i, tt := range tests { - got, err := newDirector(tt.scheme, tt.addrs) - if err != nil { - t.Errorf("#%d: newDirectory returned unexpected error: %v", i, err) + uf := func() []string { + return tt.urls } + got := newDirector(uf) for ii, wep := range tt.want { gep := got.ep[ii].URL.String() diff --git a/proxy/proxy.go b/proxy/proxy.go index b54fa2457..10d26face 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -20,23 +20,17 @@ import ( "net/http" ) -func NewHandler(t *http.Transport, addrs []string) (http.Handler, error) { - scheme := "http" - if t.TLSClientConfig != nil { - scheme = "https" - } +// GetProxyURLs is a function which should return the current set of URLs to +// which client requests should be proxied. This function will be queried +// periodically by the proxy Handler to refresh the set of available +// backends. +type GetProxyURLs func() []string - d, err := newDirector(scheme, addrs) - if err != nil { - return nil, err - } - - rp := reverseProxy{ - director: d, +func NewHandler(t *http.Transport, urlsFunc GetProxyURLs) http.Handler { + return &reverseProxy{ + director: newDirector(urlsFunc), transport: t, } - - return &rp, nil } func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) { diff --git a/proxy/reverse_test.go b/proxy/reverse_test.go index 8d6f87d07..0a74dd8a7 100644 --- a/proxy/reverse_test.go +++ b/proxy/reverse_test.go @@ -78,7 +78,7 @@ func TestReverseProxyServe(t *testing.T) { for i, tt := range tests { rp := reverseProxy{ - director: &director{tt.eps}, + director: &director{ep: tt.eps}, transport: tt.rt, }