Merge pull request #1510 from xiangli-cmu/fix_discovery

discovery: fix discovery for not working on customized discovery service
This commit is contained in:
Xiang Li 2014-10-29 18:33:06 -07:00
commit 38617f5c9b
2 changed files with 22 additions and 5 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/strutil"
) )
var ( var (
@ -187,7 +188,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
nodes := make(client.Nodes, 0) nodes := make(client.Nodes, 0)
// append non-config keys to nodes // append non-config keys to nodes
for _, n := range resp.Node.Nodes { for _, n := range resp.Node.Nodes {
if !strings.Contains(n.Key, configKey) { if !(path.Base(n.Key) == path.Base(configKey)) {
nodes = append(nodes, n) nodes = append(nodes, n)
} }
} }
@ -197,7 +198,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
// find self position // find self position
for i := range nodes { for i := range nodes {
if strings.Contains(nodes[i].Key, d.selfKey()) { if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
break break
} }
if i >= size-1 { if i >= size-1 {
@ -241,8 +242,17 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error
w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1) w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
all := make(client.Nodes, len(nodes)) all := make(client.Nodes, len(nodes))
copy(all, nodes) copy(all, nodes)
for _, n := range all {
if path.Base(n.Key) == path.Base(d.selfKey()) {
log.Printf("discovery: found self %s in the cluster", path.Base(d.selfKey()))
} else {
log.Printf("discovery: found peer %s in the cluster", path.Base(n.Key))
}
}
// wait for others // wait for others
for len(all) < size { for len(all) < size {
log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
resp, err := w.Next() resp, err := w.Next()
if err != nil { if err != nil {
if err == client.ErrTimeout { if err == client.ErrTimeout {
@ -250,13 +260,15 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error
} }
return nil, err return nil, err
} }
log.Printf("discovery: found peer %s in the cluster", path.Base(resp.Node.Key))
all = append(all, resp.Node) all = append(all, resp.Node)
} }
log.Printf("discovery: found %d needed peer(s)", len(all))
return all, nil return all, nil
} }
func (d *discovery) selfKey() string { func (d *discovery) selfKey() string {
return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id)) return path.Join("/", d.cluster, strutil.IDAsHex(d.id))
} }
func nodesToCluster(ns client.Nodes) string { func nodesToCluster(ns client.Nodes) string {

View File

@ -88,7 +88,7 @@ func TestProxyFuncFromEnv(t *testing.T) {
} }
func TestCheckCluster(t *testing.T) { func TestCheckCluster(t *testing.T) {
cluster := "1000" cluster := "/prefix/1000"
self := "/1000/1" self := "/1000/1"
tests := []struct { tests := []struct {
@ -100,6 +100,7 @@ func TestCheckCluster(t *testing.T) {
// self is in the size range // self is in the size range
[]*client.Node{ []*client.Node{
{Key: "/1000/_config/size", Value: "3", CreatedIndex: 1}, {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
{Key: "/1000/_config/"},
{Key: self, CreatedIndex: 2}, {Key: self, CreatedIndex: 2},
{Key: "/1000/2", CreatedIndex: 3}, {Key: "/1000/2", CreatedIndex: 3},
{Key: "/1000/3", CreatedIndex: 4}, {Key: "/1000/3", CreatedIndex: 4},
@ -112,6 +113,7 @@ func TestCheckCluster(t *testing.T) {
// self is in the size range // self is in the size range
[]*client.Node{ []*client.Node{
{Key: "/1000/_config/size", Value: "3", CreatedIndex: 1}, {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
{Key: "/1000/_config/"},
{Key: "/1000/2", CreatedIndex: 2}, {Key: "/1000/2", CreatedIndex: 2},
{Key: "/1000/3", CreatedIndex: 3}, {Key: "/1000/3", CreatedIndex: 3},
{Key: self, CreatedIndex: 4}, {Key: self, CreatedIndex: 4},
@ -124,6 +126,7 @@ func TestCheckCluster(t *testing.T) {
// self is out of the size range // self is out of the size range
[]*client.Node{ []*client.Node{
{Key: "/1000/_config/size", Value: "3", CreatedIndex: 1}, {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
{Key: "/1000/_config/"},
{Key: "/1000/2", CreatedIndex: 2}, {Key: "/1000/2", CreatedIndex: 2},
{Key: "/1000/3", CreatedIndex: 3}, {Key: "/1000/3", CreatedIndex: 3},
{Key: "/1000/4", CreatedIndex: 4}, {Key: "/1000/4", CreatedIndex: 4},
@ -136,6 +139,7 @@ func TestCheckCluster(t *testing.T) {
// self is not in the cluster // self is not in the cluster
[]*client.Node{ []*client.Node{
{Key: "/1000/_config/size", Value: "3", CreatedIndex: 1}, {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
{Key: "/1000/_config/"},
{Key: "/1000/2", CreatedIndex: 2}, {Key: "/1000/2", CreatedIndex: 2},
{Key: "/1000/3", CreatedIndex: 3}, {Key: "/1000/3", CreatedIndex: 3},
}, },
@ -145,6 +149,7 @@ func TestCheckCluster(t *testing.T) {
{ {
[]*client.Node{ []*client.Node{
{Key: "/1000/_config/size", Value: "3", CreatedIndex: 1}, {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
{Key: "/1000/_config/"},
{Key: "/1000/2", CreatedIndex: 2}, {Key: "/1000/2", CreatedIndex: 2},
{Key: "/1000/3", CreatedIndex: 3}, {Key: "/1000/3", CreatedIndex: 3},
{Key: "/1000/4", CreatedIndex: 4}, {Key: "/1000/4", CreatedIndex: 4},
@ -175,7 +180,7 @@ func TestCheckCluster(t *testing.T) {
rs = append(rs, &client.Response{ rs = append(rs, &client.Response{
Node: &client.Node{ Node: &client.Node{
Key: cluster, Key: cluster,
Nodes: tt.nodes, Nodes: tt.nodes[1:],
}, },
}) })
} }