bump(github.com/coreos/go-etcd): 0cc84e9bc81c45e074864360adc549e61a3a7f83

This commit is contained in:
Ben Johnson 2013-10-14 10:05:39 -06:00
parent 375f7a73b9
commit 1843f7bda5
11 changed files with 110 additions and 15 deletions

View File

@ -36,12 +36,16 @@ type Client struct {
}
// Setup a basic conf and cluster
func NewClient() *Client {
func NewClient(machines []string) *Client {
// if an empty slice was sent in then just assume localhost
if len(machines) == 0 {
machines = []string{"http://127.0.0.1:4001"}
}
// default leader and machines
cluster := Cluster{
Leader: "http://127.0.0.1:4001",
Machines: []string{"http://127.0.0.1:4001"},
Leader: machines[0],
Machines: machines,
}
config := Config{
@ -107,6 +111,10 @@ func (c *Client) SetCluster(machines []string) bool {
return success
}
func (c *Client) GetCluster() []string {
return c.cluster.Machines
}
// sycn cluster information using the existing machine list
func (c *Client) SyncCluster() bool {
success := c.internalSyncCluster(c.cluster.Machines)
@ -128,14 +136,16 @@ func (c *Client) internalSyncCluster(machines []string) bool {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.Machines = strings.Split(string(b), ",")
c.cluster.Machines = strings.Split(string(b), ", ")
// update leader
// the first one in the machine list is the leader
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
c.cluster.Leader = c.cluster.Machines[0]
logger.Debug("sync.machines ", c.cluster.Machines)
return true
}
}
@ -146,6 +156,9 @@ func (c *Client) internalSyncCluster(machines []string) bool {
func (c *Client) createHttpPath(serverName string, _path string) string {
u, _ := url.Parse(serverName)
u.Path = path.Join(u.Path, "/", _path)
if u.Scheme == "" {
u.Scheme = "http"
}
return u.String()
}

View File

@ -3,6 +3,8 @@ package etcd
import (
"fmt"
"testing"
"net/url"
"net"
)
// To pass this test, we need to create a cluster of 3 machines
@ -10,13 +12,31 @@ import (
func TestSync(t *testing.T) {
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
c := NewClient()
c := NewClient(nil)
success := c.SyncCluster()
if !success {
t.Fatal("cannot sync machines")
}
for _, m := range(c.GetCluster()) {
u, err := url.Parse(m)
if err != nil {
t.Fatal(err)
}
if u.Scheme != "http" {
t.Fatal("scheme must be http")
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatal(err)
}
if host != "127.0.0.1" {
t.Fatal("Host must be 127.0.0.1")
}
}
badMachines := []string{"abc", "edef"}
success = c.SetCluster(badMachines)

View File

@ -6,7 +6,7 @@ import (
func TestDelete(t *testing.T) {
c := NewClient()
c := NewClient(nil)
c.Set("foo", "bar", 100)
result, err := c.Delete("foo")

View File

@ -7,7 +7,7 @@ import (
func TestGet(t *testing.T) {
c := NewClient()
c := NewClient(nil)
c.Set("foo", "bar", 100)

View File

@ -6,7 +6,7 @@ import (
)
func TestList(t *testing.T) {
c := NewClient()
c := NewClient(nil)
c.Set("foo_list/foo", "bar", 100)
c.Set("foo_list/fooo", "barbar", 100)

View File

@ -6,7 +6,7 @@ import (
)
func TestSet(t *testing.T) {
c := NewClient()
c := NewClient(nil)
result, err := c.Set("foo", "bar", 100)

View File

@ -6,7 +6,7 @@ import (
)
func TestTestAndSet(t *testing.T) {
c := NewClient()
c := NewClient(nil)
c.Set("foo_testAndSet", "bar", 100)

View File

@ -16,6 +16,11 @@ type respAndErr struct {
err error
}
// Errors introduced by the Watch command.
var (
ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel")
)
// Watch any change under the given prefix.
// When a sinceIndex is given, watch will try to scan from that index to the last index
// and will return any changes under the given prefix during the history
@ -66,7 +71,7 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor
resp, err = res.resp, res.err
case <-stop:
resp, err = nil, errors.New("User stoped watch")
resp, err = nil, ErrWatchStoppedByUser
}
} else {
resp, err = c.sendWatchRequest(key, sinceIndex)

View File

@ -8,7 +8,7 @@ import (
)
func TestWatch(t *testing.T) {
c := NewClient()
c := NewClient(nil)
go setHelper("bar", c)
@ -35,9 +35,12 @@ func TestWatch(t *testing.T) {
go setLoop("bar", c)
go reciver(ch, stop)
go receiver(ch, stop)
c.Watch("watch_foo", 0, ch, stop)
_, err = c.Watch("watch_foo", 0, ch, stop)
if err != ErrWatchStoppedByUser {
t.Fatalf("Watch returned a non-user stop error")
}
}
func setHelper(value string, c *Client) {
@ -54,7 +57,7 @@ func setLoop(value string, c *Client) {
}
}
func reciver(c chan *store.Response, stop chan bool) {
func receiver(c chan *store.Response, stop chan bool) {
for i := 0; i < 10; i++ {
<-c
}

View File

@ -0,0 +1,3 @@
Example script from the sync-cluster bug https://github.com/coreos/go-etcd/issues/27
TODO: turn this into a test case

View File

@ -0,0 +1,51 @@
package main
import (
"fmt"
"github.com/coreos/go-etcd/etcd"
"strconv"
"time"
)
func main() {
fmt.Println("etcd-client started")
c := etcd.NewClient(nil)
c.SetCluster([]string{
"http://127.0.0.1:4001",
"http://127.0.0.1:4002",
"http://127.0.0.1:4003",
})
ticker := time.NewTicker(time.Second * 3)
for {
select {
case d := <-ticker.C:
n := d.Second()
if n <= 0 {
n = 60
}
for ok := c.SyncCluster(); ok == false; {
fmt.Println("SyncCluster failed, trying again")
time.Sleep(100 * time.Millisecond)
}
result, err := c.Set("foo", "exp_"+strconv.Itoa(n), 0)
if err != nil {
fmt.Println("set error", err)
} else {
fmt.Printf("set %+v\n", result)
}
ss, err := c.Get("foo")
if err != nil {
fmt.Println("get error", err)
} else {
fmt.Println(len(ss))
}
}
}
}