diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index cbbd7ad9c..31d3c2a3a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -36,12 +36,16 @@ type Client struct { } // Setup a basic conf and cluster -func NewClient() *Client { +func NewClient(machines []string) *Client { + // if an empty slice was sent in then just assume localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } // default leader and machines cluster := Cluster{ - Leader: "http://127.0.0.1:4001", - Machines: []string{"http://127.0.0.1:4001"}, + Leader: machines[0], + Machines: machines, } config := Config{ @@ -107,6 +111,10 @@ func (c *Client) SetCluster(machines []string) bool { return success } +func (c *Client) GetCluster() []string { + return c.cluster.Machines +} + // sycn cluster information using the existing machine list func (c *Client) SyncCluster() bool { success := c.internalSyncCluster(c.cluster.Machines) @@ -128,14 +136,16 @@ func (c *Client) internalSyncCluster(machines []string) bool { // try another machine in the cluster continue } + // update Machines List - c.cluster.Machines = strings.Split(string(b), ",") + c.cluster.Machines = strings.Split(string(b), ", ") // update leader // the first one in the machine list is the leader logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) c.cluster.Leader = c.cluster.Machines[0] + logger.Debug("sync.machines ", c.cluster.Machines) return true } } @@ -146,6 +156,9 @@ func (c *Client) internalSyncCluster(machines []string) bool { func (c *Client) createHttpPath(serverName string, _path string) string { u, _ := url.Parse(serverName) u.Path = path.Join(u.Path, "/", _path) + if u.Scheme == "" { + u.Scheme = "http" + } return u.String() } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index 45a99e96c..29f138113 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -3,6 +3,8 @@ package etcd import ( "fmt" "testing" + "net/url" + "net" ) // To pass this test, we need to create a cluster of 3 machines @@ -10,13 +12,31 @@ import ( func TestSync(t *testing.T) { fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") - c := NewClient() + c := NewClient(nil) success := c.SyncCluster() if !success { t.Fatal("cannot sync machines") } + for _, m := range(c.GetCluster()) { + u, err := url.Parse(m) + if err != nil { + t.Fatal(err) + } + if u.Scheme != "http" { + t.Fatal("scheme must be http") + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + t.Fatal(err) + } + if host != "127.0.0.1" { + t.Fatal("Host must be 127.0.0.1") + } + } + badMachines := []string{"abc", "edef"} success = c.SetCluster(badMachines) diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go index a5f980167..52756d09f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -6,7 +6,7 @@ import ( func TestDelete(t *testing.T) { - c := NewClient() + c := NewClient(nil) c.Set("foo", "bar", 100) result, err := c.Delete("foo") diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index 8e3852c9f..ff8137400 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -7,7 +7,7 @@ import ( func TestGet(t *testing.T) { - c := NewClient() + c := NewClient(nil) c.Set("foo", "bar", 100) diff --git a/third_party/github.com/coreos/go-etcd/etcd/list_test.go b/third_party/github.com/coreos/go-etcd/etcd/list_test.go index 1e98e7645..382bb356d 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/list_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/list_test.go @@ -6,7 +6,7 @@ import ( ) func TestList(t *testing.T) { - c := NewClient() + c := NewClient(nil) c.Set("foo_list/foo", "bar", 100) c.Set("foo_list/fooo", "barbar", 100) diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_test.go index dc46608d7..3809ee952 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_test.go @@ -6,7 +6,7 @@ import ( ) func TestSet(t *testing.T) { - c := NewClient() + c := NewClient(nil) result, err := c.Set("foo", "bar", 100) diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go index ba6d0e8f5..5dbd854b5 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go @@ -6,7 +6,7 @@ import ( ) func TestTestAndSet(t *testing.T) { - c := NewClient() + c := NewClient(nil) c.Set("foo_testAndSet", "bar", 100) diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index 5da5565c6..7f59ed065 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -16,6 +16,11 @@ type respAndErr struct { err error } +// Errors introduced by the Watch command. +var ( + ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") +) + // Watch any change under the given prefix. // When a sinceIndex is given, watch will try to scan from that index to the last index // and will return any changes under the given prefix during the history @@ -66,7 +71,7 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor resp, err = res.resp, res.err case <-stop: - resp, err = nil, errors.New("User stoped watch") + resp, err = nil, ErrWatchStoppedByUser } } else { resp, err = c.sendWatchRequest(key, sinceIndex) diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index 5e18a2b29..0d9348518 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -8,7 +8,7 @@ import ( ) func TestWatch(t *testing.T) { - c := NewClient() + c := NewClient(nil) go setHelper("bar", c) @@ -35,9 +35,12 @@ func TestWatch(t *testing.T) { go setLoop("bar", c) - go reciver(ch, stop) + go receiver(ch, stop) - c.Watch("watch_foo", 0, ch, stop) + _, err = c.Watch("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } } func setHelper(value string, c *Client) { @@ -54,7 +57,7 @@ func setLoop(value string, c *Client) { } } -func reciver(c chan *store.Response, stop chan bool) { +func receiver(c chan *store.Response, stop chan bool) { for i := 0; i < 10; i++ { <-c } diff --git a/third_party/github.com/coreos/go-etcd/examples/sync-cluster/README.md b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/README.md new file mode 100644 index 000000000..145744feb --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/README.md @@ -0,0 +1,3 @@ +Example script from the sync-cluster bug https://github.com/coreos/go-etcd/issues/27 + +TODO: turn this into a test case diff --git a/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go new file mode 100644 index 000000000..8249b4bdc --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/examples/sync-cluster/sync-cluster.go @@ -0,0 +1,51 @@ + +package main + +import ( + "fmt" + "github.com/coreos/go-etcd/etcd" + "strconv" + "time" +) + +func main() { + fmt.Println("etcd-client started") + c := etcd.NewClient(nil) + c.SetCluster([]string{ + "http://127.0.0.1:4001", + "http://127.0.0.1:4002", + "http://127.0.0.1:4003", + }) + + ticker := time.NewTicker(time.Second * 3) + + for { + select { + case d := <-ticker.C: + n := d.Second() + if n <= 0 { + n = 60 + } + + for ok := c.SyncCluster(); ok == false; { + fmt.Println("SyncCluster failed, trying again") + time.Sleep(100 * time.Millisecond) + } + + result, err := c.Set("foo", "exp_"+strconv.Itoa(n), 0) + if err != nil { + fmt.Println("set error", err) + } else { + fmt.Printf("set %+v\n", result) + } + + ss, err := c.Get("foo") + if err != nil { + fmt.Println("get error", err) + } else { + fmt.Println(len(ss)) + } + + } + } +}