From 2b623cf0fae70652c00422d4f88ca837b8e36672 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 21 Sep 2014 11:26:59 -0700 Subject: [PATCH] discovery: init commit --- client/client.go | 6 +- client/http.go | 4 +- discovery/discovery.go | 136 ++++++++++++++++ discovery/discovery_test.go | 317 ++++++++++++++++++++++++++++++++++++ discovery/doc.go | 20 +++ 5 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 discovery/discovery.go create mode 100644 discovery/discovery_test.go create mode 100644 discovery/doc.go diff --git a/client/client.go b/client/client.go index 0fbf0d880..4f34032a9 100644 --- a/client/client.go +++ b/client/client.go @@ -16,8 +16,8 @@ var ( type Client interface { Create(key, value string, ttl time.Duration) (*Response, error) Get(key string) (*Response, error) - Watch(key string) Watcher - RecursiveWatch(key string) Watcher + Watch(key string, idx uint64) Watcher + RecursiveWatch(key string, idx uint64) Watcher } type Watcher interface { @@ -30,7 +30,7 @@ type Response struct { PrevNode *Node `json:"prevNode"` } -type Nodes []Node +type Nodes []*Node type Node struct { Key string `json:"key"` Value string `json:"value"` diff --git a/client/http.go b/client/http.go index 41ce4b19e..89e607908 100644 --- a/client/http.go +++ b/client/http.go @@ -127,7 +127,7 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, [] return resp, body, err } -func (c *httpClient) Watch(key string, idx uint64) *httpWatcher { +func (c *httpClient) Watch(key string, idx uint64) Watcher { return &httpWatcher{ httpClient: *c, nextWait: waitAction{ @@ -138,7 +138,7 @@ func (c *httpClient) Watch(key string, idx uint64) *httpWatcher { } } -func (c *httpClient) RecursiveWatch(key string, idx uint64) *httpWatcher { +func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher { return &httpWatcher{ httpClient: *c, nextWait: waitAction{ diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 000000000..3dedd8984 --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,136 @@ +package discovery + +import ( + "errors" + "fmt" + "path" + "sort" + "strconv" + "strings" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdserver/etcdhttp" +) + +var ( + ErrInvalidURL = errors.New("discovery: invalid URL") + ErrBadCluster = errors.New("discovery: bad key/value inside cluster") + ErrSizeNotFound = errors.New("discovery: size key not found") + ErrTokenNotFound = errors.New("discovery: token not found") + ErrDuplicateID = errors.New("discovery: found duplicate id") + ErrFullCluster = errors.New("discovery: cluster is full") +) + +type discovery struct { + cluster string + id int64 + ctx []byte + c client.Client +} + +func (d *discovery) discover() (*etcdhttp.Peers, error) { + if err := d.createSelf(); err != nil { + return nil, err + } + + nodes, size, err := d.checkCluster() + if err != nil { + return nil, err + } + + all, err := d.waitNodes(nodes, size) + if err != nil { + return nil, err + } + + return nodesToPeers(all) +} + +func (d *discovery) createSelf() error { + self := path.Join("/", d.cluster, fmt.Sprintf("%d", d.id)) + // create self key + resp, err := d.c.Create(self, string(d.ctx), 0) + if err != nil { + return err + } + + // ensure self appears on the server we connected to + w := d.c.Watch(self, resp.Node.CreatedIndex) + if _, err = w.Next(); err != nil { + return err + } + return nil +} + +func (d *discovery) checkCluster() (client.Nodes, int, error) { + self := path.Join("/", d.cluster, fmt.Sprintf("%d", d.id)) + resp, err := d.c.Get(d.cluster) + if err != nil { + return nil, 0, err + } + nodes := resp.Node.Nodes + snodes := SortableNodes{nodes} + sort.Sort(snodes) + + // find cluster size + if nodes[0].Key != path.Join("/", d.cluster, "size") { + return nil, 0, ErrSizeNotFound + } + size, err := strconv.Atoi(nodes[0].Value) + if err != nil { + return nil, 0, ErrBadCluster + } + + // remove size key from nodes + nodes = nodes[1:] + + // find self position + for i := range nodes { + if nodes[i].Key == self { + break + } + if i >= size-1 { + return nil, size, ErrFullCluster + } + } + return nodes, size, nil +} + +func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) { + if len(nodes) > size { + nodes = nodes[:size] + } + w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex) + all := make(client.Nodes, len(nodes)) + copy(all, nodes) + // wait for others + for len(all) < size { + resp, err := w.Next() + if err != nil { + return nil, err + } + all = append(all, resp.Node) + } + return all, nil +} + +func nodesToPeers(ns client.Nodes) (*etcdhttp.Peers, error) { + s := make([]string, len(ns)) + for i, n := range ns { + s[i] = n.Value + } + + var peers etcdhttp.Peers + if err := peers.Set(strings.Join(s, "&")); err != nil { + return nil, err + } + return &peers, nil +} + +type SortableNodes struct{ client.Nodes } + +func (ns SortableNodes) Len() int { return len(ns.Nodes) } +func (ns SortableNodes) Less(i, j int) bool { + return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex +} +func (ns SortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] } diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go new file mode 100644 index 000000000..c4aad602f --- /dev/null +++ b/discovery/discovery_test.go @@ -0,0 +1,317 @@ +package discovery + +import ( + "errors" + + "reflect" + "testing" + "time" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdserver/etcdhttp" +) + +func TestCheckCluster(t *testing.T) { + cluster := "1000" + self := "/1000/1" + + tests := []struct { + nodes []*client.Node + werr error + wsize int + }{ + { + // self is in the size range + client.Nodes{ + {Key: "/1000/size", Value: "3", CreatedIndex: 1}, + {Key: self, CreatedIndex: 2}, + {Key: "/1000/2", CreatedIndex: 3}, + {Key: "/1000/3", CreatedIndex: 4}, + {Key: "/1000/4", CreatedIndex: 5}, + }, + nil, + 3, + }, + { + // self is in the size range + client.Nodes{ + {Key: "/1000/size", Value: "3", CreatedIndex: 1}, + {Key: "/1000/2", CreatedIndex: 2}, + {Key: "/1000/3", CreatedIndex: 3}, + {Key: self, CreatedIndex: 4}, + {Key: "/1000/4", CreatedIndex: 5}, + }, + nil, + 3, + }, + { + // self is out of the size range + client.Nodes{ + {Key: "/1000/size", Value: "3", CreatedIndex: 1}, + {Key: "/1000/2", CreatedIndex: 2}, + {Key: "/1000/3", CreatedIndex: 3}, + {Key: "/1000/4", CreatedIndex: 4}, + {Key: self, CreatedIndex: 5}, + }, + ErrFullCluster, + 3, + }, + { + // self is not in the cluster + client.Nodes{ + {Key: "/1000/size", Value: "3", CreatedIndex: 1}, + {Key: "/1000/2", CreatedIndex: 2}, + {Key: "/1000/3", CreatedIndex: 3}, + }, + nil, + 3, + }, + { + client.Nodes{ + {Key: "/1000/size", Value: "3", CreatedIndex: 1}, + {Key: "/1000/2", CreatedIndex: 2}, + {Key: "/1000/3", CreatedIndex: 3}, + {Key: "/1000/4", CreatedIndex: 4}, + }, + ErrFullCluster, + 3, + }, + { + // bad size key + client.Nodes{ + {Key: "/1000/size", Value: "bad", CreatedIndex: 1}, + }, + ErrBadCluster, + 0, + }, + { + // no size key + client.Nodes{ + {Key: self, CreatedIndex: 1}, + }, + ErrSizeNotFound, + 0, + }, + } + + for i, tt := range tests { + resp := &client.Response{ + Node: &client.Node{ + Key: cluster, + Nodes: tt.nodes, + }, + } + + c := &clientWithResp{ + rs: []*client.Response{resp}, + } + + d := discovery{cluster: cluster, id: 1, c: c} + ns, size, err := d.checkCluster() + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if reflect.DeepEqual(ns, tt.nodes) { + t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes) + } + if size != tt.wsize { + t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize) + } + } +} + +func TestWaitNodes(t *testing.T) { + all := client.Nodes{ + {Key: "/1000/1", CreatedIndex: 2}, + {Key: "/1000/2", CreatedIndex: 3}, + {Key: "/1000/3", CreatedIndex: 4}, + } + + tests := []struct { + nodes client.Nodes + size int + rs []*client.Response + + werr error + wall client.Nodes + }{ + { + all, + 3, + []*client.Response{}, + nil, + all, + }, + { + all[:1], + 3, + []*client.Response{ + {Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}}, + {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}}, + }, + nil, + all, + }, + { + all[:2], + 3, + []*client.Response{ + {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}}, + }, + nil, + all, + }, + } + + for i, tt := range tests { + c := &clientWithResp{nil, &watcherWithResp{tt.rs}} + d := &discovery{cluster: "1000", c: c} + g, err := d.waitNodes(tt.nodes, tt.size) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if !reflect.DeepEqual(g, tt.wall) { + t.Errorf("#%d: all = %v, want %v", i, g, tt.wall) + } + } +} + +func TestCreateSelf(t *testing.T) { + rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}} + + w := &watcherWithResp{rs} + errw := &watcherWithErr{errors.New("watch err")} + + c := &clientWithResp{rs, w} + errc := &clientWithErr{errors.New("create err"), w} + errwc := &clientWithResp{rs, errw} + + tests := []struct { + c client.Client + werr error + }{ + // no error + {c, nil}, + // client.create returns an error + {errc, errc.err}, + // watcher.next retuens an error + {errwc, errw.err}, + } + + for i, tt := range tests { + d := discovery{cluster: "1000", c: tt.c} + if err := d.createSelf(); err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, nil) + } + } +} + +func TestNodesToPeers(t *testing.T) { + nodes := client.Nodes{ + {Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1}, + {Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2}, + {Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3}, + } + w := &etcdhttp.Peers{} + w.Set("1=1.1.1.1&2=2.2.2.2&3=3.3.3.3") + + badnodes := client.Nodes{{Key: "1000/1", Value: "1=1.1.1.1&???", CreatedIndex: 1}} + + tests := []struct { + ns client.Nodes + wp *etcdhttp.Peers + we bool + }{ + {nodes, w, false}, + {badnodes, nil, true}, + } + + for i, tt := range tests { + peers, err := nodesToPeers(tt.ns) + if tt.we { + if err == nil { + t.Fatalf("#%d: err = %v, want not nil", i, err) + } + } else { + if err != nil { + t.Fatalf("#%d: err = %v, want nil", i, err) + } + } + if !reflect.DeepEqual(peers, tt.wp) { + t.Errorf("#%d: peers = %v, want %v", i, peers, tt.wp) + } + } +} + +type clientWithResp struct { + rs []*client.Response + w client.Watcher +} + +func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*client.Response, error) { + if len(c.rs) == 0 { + return &client.Response{}, nil + } + r := c.rs[0] + c.rs = c.rs[1:] + return r, nil +} + +func (c *clientWithResp) Get(key string) (*client.Response, error) { + if len(c.rs) == 0 { + return &client.Response{}, nil + } + r := c.rs[0] + c.rs = c.rs[1:] + return r, nil +} + +func (c *clientWithResp) Watch(key string, waitIndex uint64) client.Watcher { + return c.w +} + +func (c *clientWithResp) RecursiveWatch(key string, waitIndex uint64) client.Watcher { + return c.w +} + +type clientWithErr struct { + err error + w client.Watcher +} + +func (c *clientWithErr) Create(key string, value string, ttl time.Duration) (*client.Response, error) { + return &client.Response{}, c.err +} + +func (c *clientWithErr) Get(key string) (*client.Response, error) { + return &client.Response{}, c.err +} + +func (c *clientWithErr) Watch(key string, waitIndex uint64) client.Watcher { + return c.w +} + +func (c *clientWithErr) RecursiveWatch(key string, waitIndex uint64) client.Watcher { + return c.w +} + +type watcherWithResp struct { + rs []*client.Response +} + +func (w *watcherWithResp) Next() (*client.Response, error) { + if len(w.rs) == 0 { + return &client.Response{}, nil + } + r := w.rs[0] + w.rs = w.rs[1:] + return r, nil +} + +type watcherWithErr struct { + err error +} + +func (w *watcherWithErr) Next() (*client.Response, error) { + return &client.Response{}, w.err +} diff --git a/discovery/doc.go b/discovery/doc.go new file mode 100644 index 000000000..eb231e238 --- /dev/null +++ b/discovery/doc.go @@ -0,0 +1,20 @@ +// Copyright 2014 CoreOS Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package discovery provides an implementation of the cluster discovery that +is used by etcd. + +*/ +package discovery