mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
proxy: prints out when endpoints are found
This commit is contained in:
parent
2e051c1c61
commit
a8e72b6285
@ -15,18 +15,12 @@
|
|||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect"
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
@ -194,7 +188,8 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
|
|||||||
go func(etcdp *etcdProcess) {
|
go func(etcdp *etcdProcess) {
|
||||||
rs := readyStr
|
rs := readyStr
|
||||||
if etcdp.cfg.isProxy {
|
if etcdp.cfg.isProxy {
|
||||||
rs = "proxy: listening for client requests on"
|
// rs = "proxy: listening for client requests on"
|
||||||
|
rs = "proxy: endpoints found"
|
||||||
}
|
}
|
||||||
ok, err := etcdp.proc.ExpectRegex(rs)
|
ok, err := etcdp.proc.ExpectRegex(rs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -216,72 +211,9 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if epc.cfg.proxySize > 0 {
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
ok, _ := isProxyReady(epc)
|
|
||||||
if ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return epc, nil
|
return epc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isProxyReady(clus *etcdProcessCluster) (bool, error) {
|
|
||||||
if clus.cfg.proxySize == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
proxies := clus.proxies()
|
|
||||||
if len(proxies) == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
endpoint := proxies[0].cfg.acurl.String()
|
|
||||||
|
|
||||||
am := make(map[string]struct{})
|
|
||||||
as := []string{}
|
|
||||||
for _, cfg := range clus.cfg.etcdProcessConfigs() {
|
|
||||||
if cfg.isProxy {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
v := cfg.acurl.String()
|
|
||||||
if _, ok := am[v]; !ok {
|
|
||||||
am[v] = struct{}{}
|
|
||||||
as = append(as, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Strings(as)
|
|
||||||
|
|
||||||
emap1 := make(map[string][]string)
|
|
||||||
emap1["endpoints"] = as
|
|
||||||
|
|
||||||
resp, err := http.Get(endpoint + "/v2/config/local/proxy")
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
emap2 := make(map[string][]string)
|
|
||||||
dec := json.NewDecoder(resp.Body)
|
|
||||||
for {
|
|
||||||
if err := dec.Decode(&emap2); err == io.EOF {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if vs, ok := emap2["endpoints"]; !ok {
|
|
||||||
return false, nil
|
|
||||||
} else {
|
|
||||||
sort.Strings(vs)
|
|
||||||
emap2["endpoints"] = vs
|
|
||||||
}
|
|
||||||
|
|
||||||
return reflect.DeepEqual(emap1, emap2), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
|
func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
|
||||||
if fileutil.Exist("../bin/etcd") == false {
|
if fileutil.Exist("../bin/etcd") == false {
|
||||||
return nil, fmt.Errorf("could not find etcd binary")
|
return nil, fmt.Errorf("could not find etcd binary")
|
||||||
|
@ -26,6 +26,8 @@ import (
|
|||||||
// as in etcdmain/config.go.
|
// as in etcdmain/config.go.
|
||||||
const defaultRefreshInterval = 30000 * time.Millisecond
|
const defaultRefreshInterval = 30000 * time.Millisecond
|
||||||
|
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
|
func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
|
||||||
d := &director{
|
d := &director{
|
||||||
uf: urlsFunc,
|
uf: urlsFunc,
|
||||||
@ -38,12 +40,22 @@ func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterv
|
|||||||
// and whenever there is no available proxy endpoints,
|
// and whenever there is no available proxy endpoints,
|
||||||
// give 1-second refreshInterval.
|
// give 1-second refreshInterval.
|
||||||
for {
|
for {
|
||||||
|
es := d.endpoints()
|
||||||
ri := refreshInterval
|
ri := refreshInterval
|
||||||
if ri >= defaultRefreshInterval {
|
if ri >= defaultRefreshInterval {
|
||||||
if len(d.endpoints()) == 0 {
|
if len(es) == 0 {
|
||||||
ri = time.Second
|
ri = time.Second
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(es) > 0 {
|
||||||
|
once.Do(func() {
|
||||||
|
var sl []string
|
||||||
|
for _, e := range es {
|
||||||
|
sl = append(sl, e.URL.String())
|
||||||
|
}
|
||||||
|
plog.Infof("endpoints found %q", sl)
|
||||||
|
})
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-time.After(ri):
|
case <-time.After(ri):
|
||||||
d.refresh()
|
d.refresh()
|
||||||
@ -69,7 +81,7 @@ func (d *director) refresh() {
|
|||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
uu, err := url.Parse(u)
|
uu, err := url.Parse(u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("proxy: upstream URL invalid: %v", err)
|
plog.Printf("upstream URL invalid: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
|
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user