Merge pull request #1398 from jonboulle/proxy

proxy not working on master
This commit is contained in:
Jonathan Boulle 2014-10-24 15:55:31 -07:00
commit e630910e31
7 changed files with 80 additions and 58 deletions

View File

@ -2,4 +2,4 @@
etcd1: bin/etcd -name node1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new
etcd2: bin/etcd -name node2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new
etcd3: bin/etcd -name node3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003' -initial-cluster-state new
#proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers 'localhost:7001,localhost:7002,localhost:7003'
proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -initial-cluster 'node1=http://localhost:7001,node2=http://localhost:7002,node3=http://localhost:7003'

View File

@ -19,6 +19,7 @@ package etcdserver
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"math/rand"
@ -184,7 +185,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
haveWAL := wal.Exist(cfg.WALDir())
switch {
case !haveWAL && cfg.ClusterState == ClusterStateValueExisting:
cl := getClusterFromPeers(cfg.Cluster.PeerURLs())
cl, err := GetClusterFromPeers(cfg.Cluster.PeerURLs())
if err != nil {
log.Fatal(err)
}
if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil {
log.Fatalf("etcdserver: %v", err)
}
@ -669,7 +673,7 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
s.storage.Cut()
}
func getClusterFromPeers(urls []string) *Cluster {
func GetClusterFromPeers(urls []string) (*Cluster, error) {
for _, u := range urls {
resp, err := http.Get(u + "/members")
if err != nil {
@ -691,10 +695,9 @@ func getClusterFromPeers(urls []string) *Cluster {
log.Printf("etcdserver: parse uint error: %v", err)
continue
}
return NewClusterFromMembers("", id, membs)
return NewClusterFromMembers("", id, membs), nil
}
log.Fatalf("etcdserver: could not retrieve cluster information from the given urls")
return nil
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}
func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) {

15
main.go
View File

@ -232,11 +232,18 @@ func startProxy() {
log.Fatal(err)
}
ph, err := proxy.NewHandler(pt, cls.PeerURLs())
if err != nil {
log.Fatal(err)
// TODO(jonboulle): update peerURLs dynamically (i.e. when updating
// clientURLs) instead of just using the initial fixed list here
peerURLs := cls.PeerURLs()
uf := func() []string {
cls, err := etcdserver.GetClusterFromPeers(peerURLs)
if err != nil {
log.Printf("etcd: %v", err)
return []string{}
}
return cls.ClientURLs()
}
ph := proxy.NewHandler(pt, uf)
ph = &pkg.CORSHandler{
Handler: ph,
Info: cors,

View File

@ -17,7 +17,6 @@
package proxy
import (
"errors"
"log"
"net/url"
"sync"
@ -28,28 +27,52 @@ const (
// amount of time an endpoint will be held in a failed
// state before being reconsidered for proxied requests
endpointFailureWait = 5 * time.Second
// how often the proxy will attempt to refresh its set of endpoints
refreshEndpoints = 30 * time.Second
)
func newDirector(scheme string, addrs []string) (*director, error) {
if len(addrs) == 0 {
return nil, errors.New("one or more upstream addresses required")
func newDirector(urlsFunc GetProxyURLs) *director {
d := &director{
uf: urlsFunc,
}
endpoints := make([]*endpoint, len(addrs))
for i, addr := range addrs {
u := url.URL{Scheme: scheme, Host: addr}
endpoints[i] = newEndpoint(u)
}
d := director{ep: endpoints}
return &d, nil
d.refresh()
go func() {
for {
select {
case <-time.After(refreshEndpoints):
d.refresh()
}
}
}()
return d
}
type director struct {
sync.Mutex
ep []*endpoint
uf GetProxyURLs
}
func (d *director) refresh() {
urls := d.uf()
d.Lock()
defer d.Unlock()
var endpoints []*endpoint
for _, u := range urls {
uu, err := url.Parse(u)
if err != nil {
log.Printf("upstream URL invalid: %v", err)
continue
}
endpoints = append(endpoints, newEndpoint(*uu))
}
d.ep = endpoints
}
func (d *director) endpoints() []*endpoint {
d.Lock()
defer d.Unlock()
filtered := make([]*endpoint, 0)
for _, ep := range d.ep {
if ep.Available {

View File

@ -24,41 +24,36 @@ import (
func TestNewDirectorScheme(t *testing.T) {
tests := []struct {
scheme string
addrs []string
want []string
urls []string
want []string
}{
{
scheme: "http",
addrs: []string{"192.0.2.8:4002", "example.com:8080"},
want: []string{"http://192.0.2.8:4002", "http://example.com:8080"},
urls: []string{"http://192.0.2.8:4002", "http://example.com:8080"},
want: []string{"http://192.0.2.8:4002", "http://example.com:8080"},
},
{
scheme: "https",
addrs: []string{"192.0.2.8:4002", "example.com:8080"},
want: []string{"https://192.0.2.8:4002", "https://example.com:8080"},
urls: []string{"https://192.0.2.8:4002", "https://example.com:8080"},
want: []string{"https://192.0.2.8:4002", "https://example.com:8080"},
},
// accept addrs without a port
// accept urls without a port
{
scheme: "http",
addrs: []string{"192.0.2.8"},
want: []string{"http://192.0.2.8"},
urls: []string{"http://192.0.2.8"},
want: []string{"http://192.0.2.8"},
},
// accept addrs even if they are garbage
// accept urls even if they are garbage
{
scheme: "http",
addrs: []string{"."},
want: []string{"http://."},
urls: []string{"http://."},
want: []string{"http://."},
},
}
for i, tt := range tests {
got, err := newDirector(tt.scheme, tt.addrs)
if err != nil {
t.Errorf("#%d: newDirectory returned unexpected error: %v", i, err)
uf := func() []string {
return tt.urls
}
got := newDirector(uf)
for ii, wep := range tt.want {
gep := got.ep[ii].URL.String()

View File

@ -20,23 +20,17 @@ import (
"net/http"
)
func NewHandler(t *http.Transport, addrs []string) (http.Handler, error) {
scheme := "http"
if t.TLSClientConfig != nil {
scheme = "https"
}
// GetProxyURLs is a function which should return the current set of URLs to
// which client requests should be proxied. This function will be queried
// periodically by the proxy Handler to refresh the set of available
// backends.
type GetProxyURLs func() []string
d, err := newDirector(scheme, addrs)
if err != nil {
return nil, err
}
rp := reverseProxy{
director: d,
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs) http.Handler {
return &reverseProxy{
director: newDirector(urlsFunc),
transport: t,
}
return &rp, nil
}
func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) {

View File

@ -78,7 +78,7 @@ func TestReverseProxyServe(t *testing.T) {
for i, tt := range tests {
rp := reverseProxy{
director: &director{tt.eps},
director: &director{ep: tt.eps},
transport: tt.rt,
}