mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
bump deps
This commit is contained in:
1
third_party/github.com/coreos/go-etcd/.gitignore
vendored
Normal file
1
third_party/github.com/coreos/go-etcd/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
config.json
|
||||
43
third_party/github.com/coreos/go-etcd/README.md
vendored
43
third_party/github.com/coreos/go-etcd/README.md
vendored
@@ -1,12 +1,8 @@
|
||||
# go-etcd
|
||||
|
||||
golang client library for etcd
|
||||
The official etcd v0.2 client library for Go.
|
||||
|
||||
This etcd client library is under heavy development. Check back soon for more
|
||||
docs. In the meantime, check out [etcd](https://github.com/coreos/etcd) for
|
||||
details on the client protocol.
|
||||
|
||||
For usage see example below or look at godoc: [go-etcd/etcd](http://godoc.org/github.com/coreos/go-etcd/etcd)
|
||||
For usage, please refer to: [go-etcd/etcd](http://godoc.org/github.com/coreos/go-etcd/etcd).
|
||||
|
||||
## Install
|
||||
|
||||
@@ -14,37 +10,6 @@ For usage see example below or look at godoc: [go-etcd/etcd](http://godoc.org/gi
|
||||
go get github.com/coreos/go-etcd/etcd
|
||||
```
|
||||
|
||||
## Examples
|
||||
## License
|
||||
|
||||
Returning error values are not showed for the sake of simplicity, but you
|
||||
should check them.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func main() {
|
||||
c := etcd.NewClient() // default binds to http://0.0.0.0:4001
|
||||
|
||||
// SET the value "bar" to the key "foo" with zero TTL
|
||||
// returns a: *Response
|
||||
res, _ := c.Set("foo", "bar", 0)
|
||||
fmt.Printf("set response: %+v\n", res)
|
||||
|
||||
// GET the value that is stored for the key "foo"
|
||||
// return a slice: []*Response
|
||||
values, _ := c.Get("foo")
|
||||
for i, res := range values { // .. and print them out
|
||||
fmt.Printf("[%d] get response: %+v\n", i, res)
|
||||
}
|
||||
|
||||
// DELETE the key "foo"
|
||||
// returns a: *Response
|
||||
res, _ = c.Delete("foo")
|
||||
fmt.Printf("delete response: %+v\n", res)
|
||||
}
|
||||
```
|
||||
See LICENSE file.
|
||||
|
||||
11
third_party/github.com/coreos/go-etcd/etcd/add_child.go
vendored
Normal file
11
third_party/github.com/coreos/go-etcd/etcd/add_child.go
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
package etcd
|
||||
|
||||
// Add a new directory with a random etcd-generated key under the given path.
|
||||
func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) {
|
||||
return c.post(key, "", ttl)
|
||||
}
|
||||
|
||||
// Add a new file with a random etcd-generated key under the given path.
|
||||
func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) {
|
||||
return c.post(key, value, ttl)
|
||||
}
|
||||
73
third_party/github.com/coreos/go-etcd/etcd/add_child_test.go
vendored
Normal file
73
third_party/github.com/coreos/go-etcd/etcd/add_child_test.go
vendored
Normal file
@@ -0,0 +1,73 @@
|
||||
package etcd
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestAddChild(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("fooDir")
|
||||
c.DeleteAll("nonexistentDir")
|
||||
}()
|
||||
|
||||
c.SetDir("fooDir", 5)
|
||||
|
||||
_, err := c.AddChild("fooDir", "v0", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = c.AddChild("fooDir", "v1", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
resp, err := c.Get("fooDir", true)
|
||||
// The child with v0 should proceed the child with v1 because it's added
|
||||
// earlier, so it should have a lower key.
|
||||
if !(len(resp.Kvs) == 2 && (resp.Kvs[0].Value == "v0" && resp.Kvs[1].Value == "v1")) {
|
||||
t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+
|
||||
" The response was: %#v", resp)
|
||||
}
|
||||
|
||||
// Creating a child under a nonexistent directory should succeed.
|
||||
// The directory should be created.
|
||||
resp, err = c.AddChild("nonexistentDir", "foo", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddChildDir(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("fooDir")
|
||||
c.DeleteAll("nonexistentDir")
|
||||
}()
|
||||
|
||||
c.SetDir("fooDir", 5)
|
||||
|
||||
_, err := c.AddChildDir("fooDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = c.AddChildDir("fooDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
resp, err := c.Get("fooDir", true)
|
||||
// The child with v0 should proceed the child with v1 because it's added
|
||||
// earlier, so it should have a lower key.
|
||||
if !(len(resp.Kvs) == 2 && (len(resp.Kvs[0].KVPairs) == 0 && len(resp.Kvs[1].KVPairs) == 0)) {
|
||||
t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+
|
||||
" The response was: %#v", resp)
|
||||
}
|
||||
|
||||
// Creating a child under a nonexistent directory should succeed.
|
||||
// The directory should be created.
|
||||
resp, err = c.AddChildDir("nonexistentDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
314
third_party/github.com/coreos/go-etcd/etcd/client.go
vendored
314
third_party/github.com/coreos/go-etcd/etcd/client.go
vendored
@@ -2,12 +2,16 @@ package etcd
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -17,25 +21,43 @@ const (
|
||||
HTTPS
|
||||
)
|
||||
|
||||
// See SetConsistency for how to use these constants.
|
||||
const (
|
||||
// Using strings rather than iota because the consistency level
|
||||
// could be persisted to disk, so it'd be better to use
|
||||
// human-readable values.
|
||||
STRONG_CONSISTENCY = "STRONG"
|
||||
WEAK_CONSISTENCY = "WEAK"
|
||||
)
|
||||
|
||||
type Cluster struct {
|
||||
Leader string
|
||||
Machines []string
|
||||
Leader string `json:"leader"`
|
||||
Machines []string `json:"machines"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
CertFile string
|
||||
KeyFile string
|
||||
Scheme string
|
||||
Timeout time.Duration
|
||||
CertFile string `json:"certFile"`
|
||||
KeyFile string `json:"keyFile"`
|
||||
Scheme string `json:"scheme"`
|
||||
Timeout time.Duration `json:"timeout"`
|
||||
Consistency string `json: "consistency"`
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
cluster Cluster
|
||||
config Config
|
||||
httpClient *http.Client
|
||||
cluster Cluster `json:"cluster"`
|
||||
config Config `json:"config"`
|
||||
httpClient *http.Client
|
||||
persistence io.Writer
|
||||
}
|
||||
|
||||
// Setup a basic conf and cluster
|
||||
type options map[string]interface{}
|
||||
|
||||
// An internally-used data structure that represents a mapping
|
||||
// between valid options and their kinds
|
||||
type validOptions map[string]reflect.Kind
|
||||
|
||||
// NewClient create a basic client that is configured to be used
|
||||
// with the given machine list.
|
||||
func NewClient(machines []string) *Client {
|
||||
// if an empty slice was sent in then just assume localhost
|
||||
if len(machines) == 0 {
|
||||
@@ -53,30 +75,168 @@ func NewClient(machines []string) *Client {
|
||||
Scheme: "http",
|
||||
// default timeout is one second
|
||||
Timeout: time.Second,
|
||||
// default consistency level is STRONG
|
||||
Consistency: STRONG_CONSISTENCY,
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
client := &Client{
|
||||
cluster: cluster,
|
||||
config: config,
|
||||
}
|
||||
|
||||
return &Client{
|
||||
cluster: cluster,
|
||||
config: config,
|
||||
httpClient: &http.Client{Transport: tr},
|
||||
err := setupHttpClient(client)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func (c *Client) SetCertAndKey(cert string, key string) (bool, error) {
|
||||
// NewClientFile creates a client from a given file path.
|
||||
// The given file is expected to use the JSON format.
|
||||
func NewClientFile(fpath string) (*Client, error) {
|
||||
fi, err := os.Open(fpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := fi.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return NewClientReader(fi)
|
||||
}
|
||||
|
||||
// NewClientReader creates a Client configured from a given reader.
|
||||
// The config is expected to use the JSON format.
|
||||
func NewClientReader(reader io.Reader) (*Client, error) {
|
||||
var client Client
|
||||
|
||||
b, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(b, &client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = setupHttpClient(&client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &client, nil
|
||||
}
|
||||
|
||||
func setupHttpClient(client *Client) error {
|
||||
if client.config.CertFile != "" && client.config.KeyFile != "" {
|
||||
err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
client.config.CertFile = ""
|
||||
client.config.KeyFile = ""
|
||||
tr := &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
client.httpClient = &http.Client{Transport: tr}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetPersistence sets a writer to which the config will be
|
||||
// written every time it's changed.
|
||||
func (c *Client) SetPersistence(writer io.Writer) {
|
||||
c.persistence = writer
|
||||
}
|
||||
|
||||
// SetConsistency changes the consistency level of the client.
|
||||
//
|
||||
// When consistency is set to STRONG_CONSISTENCY, all requests,
|
||||
// including GET, are sent to the leader. This means that, assuming
|
||||
// the absence of leader failures, GET requests are guranteed to see
|
||||
// the changes made by previous requests.
|
||||
//
|
||||
// When consistency is set to WEAK_CONSISTENCY, other requests
|
||||
// are still sent to the leader, but GET requests are sent to a
|
||||
// random server from the server pool. This reduces the read
|
||||
// load on the leader, but it's not guranteed that the GET requests
|
||||
// will see changes made by previous requests (they might have not
|
||||
// yet been commited on non-leader servers).
|
||||
func (c *Client) SetConsistency(consistency string) error {
|
||||
if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) {
|
||||
return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.")
|
||||
}
|
||||
c.config.Consistency = consistency
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON implements the Marshaller interface
|
||||
// as defined by the standard JSON package.
|
||||
func (c *Client) MarshalJSON() ([]byte, error) {
|
||||
b, err := json.Marshal(struct {
|
||||
Config Config `json:"config"`
|
||||
Cluster Cluster `json:"cluster"`
|
||||
}{
|
||||
Config: c.config,
|
||||
Cluster: c.cluster,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the Unmarshaller interface
|
||||
// as defined by the standard JSON package.
|
||||
func (c *Client) UnmarshalJSON(b []byte) error {
|
||||
temp := struct {
|
||||
Config Config `json: "config"`
|
||||
Cluster Cluster `json: "cluster"`
|
||||
}{}
|
||||
err := json.Unmarshal(b, &temp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.cluster = temp.Cluster
|
||||
c.config = temp.Config
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveConfig saves the current config using c.persistence.
|
||||
func (c *Client) saveConfig() error {
|
||||
if c.persistence != nil {
|
||||
b, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.persistence.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) SetCertAndKey(cert string, key string) error {
|
||||
if cert != "" && key != "" {
|
||||
tlsCert, err := tls.LoadX509KeyPair(cert, key)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
return err
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
@@ -88,24 +248,27 @@ func (c *Client) SetCertAndKey(cert string, key string) (bool, error) {
|
||||
}
|
||||
|
||||
c.httpClient = &http.Client{Transport: tr}
|
||||
return true, nil
|
||||
c.saveConfig()
|
||||
return nil
|
||||
}
|
||||
return false, errors.New("Require both cert and key path")
|
||||
return errors.New("Require both cert and key path")
|
||||
}
|
||||
|
||||
func (c *Client) SetScheme(scheme int) (bool, error) {
|
||||
func (c *Client) SetScheme(scheme int) error {
|
||||
if scheme == HTTP {
|
||||
c.config.Scheme = "http"
|
||||
return true, nil
|
||||
c.saveConfig()
|
||||
return nil
|
||||
}
|
||||
if scheme == HTTPS {
|
||||
c.config.Scheme = "https"
|
||||
return true, nil
|
||||
c.saveConfig()
|
||||
return nil
|
||||
}
|
||||
return false, errors.New("Unknown Scheme")
|
||||
return errors.New("Unknown Scheme")
|
||||
}
|
||||
|
||||
// Try to sync from the given machine
|
||||
// SetCluster updates config using the given machine list.
|
||||
func (c *Client) SetCluster(machines []string) bool {
|
||||
success := c.internalSyncCluster(machines)
|
||||
return success
|
||||
@@ -115,13 +278,13 @@ func (c *Client) GetCluster() []string {
|
||||
return c.cluster.Machines
|
||||
}
|
||||
|
||||
// sycn cluster information using the existing machine list
|
||||
// SyncCluster updates config using the internal machine list.
|
||||
func (c *Client) SyncCluster() bool {
|
||||
success := c.internalSyncCluster(c.cluster.Machines)
|
||||
return success
|
||||
}
|
||||
|
||||
// sync cluster information by providing machine list
|
||||
// internalSyncCluster syncs cluster information using the given machine list.
|
||||
func (c *Client) internalSyncCluster(machines []string) bool {
|
||||
for _, machine := range machines {
|
||||
httpPath := c.createHttpPath(machine, version+"/machines")
|
||||
@@ -146,16 +309,19 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
||||
c.cluster.Leader = c.cluster.Machines[0]
|
||||
|
||||
logger.Debug("sync.machines ", c.cluster.Machines)
|
||||
c.saveConfig()
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// serverName should contain both hostName and port
|
||||
// createHttpPath creates a complete HTTP URL.
|
||||
// serverName should contain both the host name and a port number, if any.
|
||||
func (c *Client) createHttpPath(serverName string, _path string) string {
|
||||
u, _ := url.Parse(serverName)
|
||||
u.Path = path.Join(u.Path, "/", _path)
|
||||
|
||||
if u.Scheme == "" {
|
||||
u.Scheme = "http"
|
||||
}
|
||||
@@ -167,18 +333,6 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
||||
return net.DialTimeout(network, addr, time.Second)
|
||||
}
|
||||
|
||||
func (c *Client) getHttpPath(s ...string) string {
|
||||
u, _ := url.Parse(c.cluster.Leader)
|
||||
|
||||
u.Path = path.Join(u.Path, "/", version)
|
||||
|
||||
for _, seg := range s {
|
||||
u.Path = path.Join(u.Path, seg)
|
||||
}
|
||||
|
||||
return u.String()
|
||||
}
|
||||
|
||||
func (c *Client) updateLeader(httpPath string) {
|
||||
u, _ := url.Parse(httpPath)
|
||||
|
||||
@@ -191,77 +345,5 @@ func (c *Client) updateLeader(httpPath string) {
|
||||
|
||||
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
|
||||
c.cluster.Leader = leader
|
||||
}
|
||||
|
||||
// Wrap GET, POST and internal error handling
|
||||
func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) {
|
||||
|
||||
var resp *http.Response
|
||||
var err error
|
||||
var req *http.Request
|
||||
|
||||
retry := 0
|
||||
// if we connect to a follower, we will retry until we found a leader
|
||||
for {
|
||||
|
||||
httpPath := c.getHttpPath(_path)
|
||||
|
||||
logger.Debug("send.request.to ", httpPath)
|
||||
if body == "" {
|
||||
|
||||
req, _ = http.NewRequest(method, httpPath, nil)
|
||||
|
||||
} else {
|
||||
req, _ = http.NewRequest(method, httpPath, strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
|
||||
}
|
||||
|
||||
resp, err = c.httpClient.Do(req)
|
||||
|
||||
logger.Debug("recv.response.from ", httpPath)
|
||||
// network error, change a machine!
|
||||
if err != nil {
|
||||
retry++
|
||||
if retry > 2*len(c.cluster.Machines) {
|
||||
return nil, errors.New("Cannot reach servers")
|
||||
}
|
||||
num := retry % len(c.cluster.Machines)
|
||||
logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")
|
||||
c.cluster.Leader = c.cluster.Machines[num]
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
continue
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
httpPath := resp.Header.Get("Location")
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if httpPath == "" {
|
||||
return nil, errors.New("Cannot get redirection location")
|
||||
}
|
||||
|
||||
c.updateLeader(httpPath)
|
||||
logger.Debug("send.redirect")
|
||||
// try to connect the leader
|
||||
continue
|
||||
} else if resp.StatusCode == http.StatusInternalServerError {
|
||||
resp.Body.Close()
|
||||
|
||||
retry++
|
||||
if retry > 2*len(c.cluster.Machines) {
|
||||
return nil, errors.New("Cannot reach servers")
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
logger.Debug("send.return.response ", httpPath)
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
logger.Debug("error.from ", httpPath, " ", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
c.saveConfig()
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"net/url"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// To pass this test, we need to create a cluster of 3 machines
|
||||
@@ -19,7 +21,7 @@ func TestSync(t *testing.T) {
|
||||
t.Fatal("cannot sync machines")
|
||||
}
|
||||
|
||||
for _, m := range(c.GetCluster()) {
|
||||
for _, m := range c.GetCluster() {
|
||||
u, err := url.Parse(m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -27,7 +29,7 @@ func TestSync(t *testing.T) {
|
||||
if u.Scheme != "http" {
|
||||
t.Fatal("scheme must be http")
|
||||
}
|
||||
|
||||
|
||||
host, _, err := net.SplitHostPort(u.Host)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -56,3 +58,37 @@ func TestSync(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPersistence(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
c.SyncCluster()
|
||||
|
||||
fo, err := os.Create("config.json")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := fo.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
c.SetPersistence(fo)
|
||||
err = c.saveConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c2, err := NewClientFile("config.json")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify that the two clients have the same config
|
||||
b1, _ := json.Marshal(c)
|
||||
b2, _ := json.Marshal(c2)
|
||||
|
||||
if string(b1) != string(b2) {
|
||||
t.Fatalf("The two configs should be equal!")
|
||||
}
|
||||
}
|
||||
|
||||
18
third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go
vendored
Normal file
18
third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
package etcd
|
||||
|
||||
import "fmt"
|
||||
|
||||
func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) {
|
||||
if prevValue == "" && prevIndex == 0 {
|
||||
return nil, fmt.Errorf("You must give either prevValue or prevIndex.")
|
||||
}
|
||||
|
||||
options := options{}
|
||||
if prevValue != "" {
|
||||
options["prevValue"] = prevValue
|
||||
}
|
||||
if prevIndex != 0 {
|
||||
options["prevIndex"] = prevIndex
|
||||
}
|
||||
return c.put(key, value, ttl, options)
|
||||
}
|
||||
51
third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go
vendored
Normal file
51
third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go
vendored
Normal file
@@ -0,0 +1,51 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCompareAndSwap(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
}()
|
||||
|
||||
c.Set("foo", "bar", 5)
|
||||
|
||||
// This should succeed
|
||||
resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Value == "bar2" && resp.PrevValue == "bar" &&
|
||||
resp.Key == "/foo" && resp.TTL == 5) {
|
||||
t.Fatalf("CompareAndSwap 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail because it gives an incorrect prevValue
|
||||
resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0)
|
||||
if err == nil {
|
||||
t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp)
|
||||
}
|
||||
|
||||
resp, err = c.Set("foo", "bar", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// This should succeed
|
||||
resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.ModifiedIndex)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Value == "bar2" && resp.PrevValue == "bar" &&
|
||||
resp.Key == "/foo" && resp.TTL == 5) {
|
||||
t.Fatalf("CompareAndSwap 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail because it gives an incorrect prevIndex
|
||||
resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514)
|
||||
if err == nil {
|
||||
t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp)
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,8 @@ var logger *log.Logger
|
||||
|
||||
func init() {
|
||||
setLogger(log.PriErr)
|
||||
// Uncomment the following line if you want to see lots of logs
|
||||
// OpenDebug()
|
||||
}
|
||||
|
||||
func OpenDebug() {
|
||||
|
||||
@@ -1,40 +1,17 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
)
|
||||
|
||||
func (c *Client) Delete(key string) (*Response, error) {
|
||||
|
||||
resp, err := c.sendRequest("DELETE", path.Join("keys", key), "")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
var result Response
|
||||
|
||||
err = json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
|
||||
// DeleteAll deletes everything under the given key. If the key
|
||||
// points to a file, the file will be deleted. If the key points
|
||||
// to a directory, then everything under the directory, include
|
||||
// all child directories, will be deleted.
|
||||
func (c *Client) DeleteAll(key string) (*Response, error) {
|
||||
return c.delete(key, options{
|
||||
"recursive": true,
|
||||
})
|
||||
}
|
||||
|
||||
// Delete deletes the given key. If the key points to a
|
||||
// directory, the method will fail.
|
||||
func (c *Client) Delete(key string) (*Response, error) {
|
||||
return c.delete(key, nil)
|
||||
}
|
||||
|
||||
@@ -5,18 +5,60 @@ import (
|
||||
)
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
}()
|
||||
|
||||
c.Set("foo", "bar", 100)
|
||||
result, err := c.Delete("foo")
|
||||
c.Set("foo", "bar", 5)
|
||||
resp, err := c.Delete("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if result.PrevValue != "bar" || result.Value != "" {
|
||||
t.Fatalf("Delete failed with %s %s", result.PrevValue,
|
||||
result.Value)
|
||||
if !(resp.PrevValue == "bar" && resp.Value == "") {
|
||||
t.Fatalf("Delete failed with %s %s", resp.PrevValue,
|
||||
resp.Value)
|
||||
}
|
||||
|
||||
resp, err = c.Delete("foo")
|
||||
if err == nil {
|
||||
t.Fatalf("Delete should have failed because the key foo did not exist. "+
|
||||
"The response was: %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteAll(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
c.DeleteAll("fooDir")
|
||||
}()
|
||||
|
||||
c.Set("foo", "bar", 5)
|
||||
resp, err := c.DeleteAll("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.PrevValue == "bar" && resp.Value == "") {
|
||||
t.Fatalf("DeleteAll 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
c.SetDir("fooDir", 5)
|
||||
c.Set("fooDir/foo", "bar", 5)
|
||||
resp, err = c.DeleteAll("fooDir")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.PrevValue == "" && resp.Value == "") {
|
||||
t.Fatalf("DeleteAll 2 failed: %#v", resp)
|
||||
}
|
||||
|
||||
resp, err = c.DeleteAll("foo")
|
||||
if err == nil {
|
||||
t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+
|
||||
"The response was: %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,82 +1,23 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
)
|
||||
|
||||
func (c *Client) Get(key string) ([]*Response, error) {
|
||||
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
||||
resp, err := c.sendRequest("GET", path.Join("keys", key), "")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
return convertGetResponse(b)
|
||||
|
||||
// GetDir gets the all contents under the given key.
|
||||
// If the key points to a file, the file is returned.
|
||||
// If the key points to a directory, everything under it is returnd,
|
||||
// including all contents under all child directories.
|
||||
func (c *Client) GetAll(key string, sort bool) (*Response, error) {
|
||||
return c.get(key, options{
|
||||
"recursive": true,
|
||||
"sorted": sort,
|
||||
})
|
||||
}
|
||||
|
||||
// GetTo gets the value of the key from a given machine address.
|
||||
// If the given machine is not available it returns an error.
|
||||
// Mainly use for testing purpose
|
||||
func (c *Client) GetFrom(key string, addr string) ([]*Response, error) {
|
||||
httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
|
||||
|
||||
resp, err := c.httpClient.Get(httpPath)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
return convertGetResponse(b)
|
||||
}
|
||||
|
||||
// Convert byte stream to response.
|
||||
func convertGetResponse(b []byte) ([]*Response, error) {
|
||||
|
||||
var results []*Response
|
||||
var result *Response
|
||||
|
||||
err := json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
err = json.Unmarshal(b, &results)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else {
|
||||
results = make([]*Response, 1)
|
||||
results[0] = result
|
||||
}
|
||||
return results, nil
|
||||
// Get gets the file or directory associated with the given key.
|
||||
// If the key points to a directory, files and directories under
|
||||
// it will be returned in sorted or unsorted order, depending on
|
||||
// the sort flag. Note that contents under child directories
|
||||
// will not be returned. To get those contents, use GetAll.
|
||||
func (c *Client) Get(key string, sort bool) (*Response, error) {
|
||||
return c.get(key, options{
|
||||
"sorted": sort,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,46 +1,99 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
}()
|
||||
|
||||
c.Set("foo", "bar", 100)
|
||||
c.Set("foo", "bar", 5)
|
||||
|
||||
// wait for commit
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
result, err := c.Get("foo", false)
|
||||
|
||||
results, err := c.Get("foo")
|
||||
|
||||
if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results, err = c.Get("goo")
|
||||
if result.Key != "/foo" || result.Value != "bar" {
|
||||
t.Fatalf("Get failed with %s %s %v", result.Key, result.Value, result.TTL)
|
||||
}
|
||||
|
||||
result, err = c.Get("goo", false)
|
||||
if err == nil {
|
||||
t.Fatalf("should not be able to get non-exist key")
|
||||
}
|
||||
}
|
||||
|
||||
results, err = c.GetFrom("foo", "0.0.0.0:4001")
|
||||
func TestGetAll(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("fooDir")
|
||||
}()
|
||||
|
||||
if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
|
||||
c.SetDir("fooDir", 5)
|
||||
c.Set("fooDir/k0", "v0", 5)
|
||||
c.Set("fooDir/k1", "v1", 5)
|
||||
|
||||
// Return kv-pairs in sorted order
|
||||
result, err := c.Get("fooDir", true)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results, err = c.GetFrom("foo", "0.0.0.0:4009")
|
||||
expected := kvPairs{
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/k0",
|
||||
Value: "v0",
|
||||
},
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/k1",
|
||||
Value: "v1",
|
||||
},
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("should not get from port 4009")
|
||||
if !reflect.DeepEqual(result.Kvs, expected) {
|
||||
t.Fatalf("(actual) %v != (expected) %v", result.Kvs, expected)
|
||||
}
|
||||
|
||||
// Test the `recursive` option
|
||||
c.SetDir("fooDir/childDir", 5)
|
||||
c.Set("fooDir/childDir/k2", "v2", 5)
|
||||
|
||||
// Return kv-pairs in sorted order
|
||||
result, err = c.GetAll("fooDir", true)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected = kvPairs{
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/childDir",
|
||||
Dir: true,
|
||||
KVPairs: kvPairs{
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/childDir/k2",
|
||||
Value: "v2",
|
||||
},
|
||||
},
|
||||
},
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/k0",
|
||||
Value: "v0",
|
||||
},
|
||||
KeyValuePair{
|
||||
Key: "/fooDir/k1",
|
||||
Value: "v1",
|
||||
},
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(result.Kvs, expected) {
|
||||
t.Fatalf("(actual) %v != (expected) %v", result.Kvs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
|
||||
c.Set("foo_list/foo", "bar", 100)
|
||||
c.Set("foo_list/fooo", "barbar", 100)
|
||||
c.Set("foo_list/foooo/foo", "barbarbar", 100)
|
||||
// wait for commit
|
||||
time.Sleep(time.Second)
|
||||
|
||||
_, err := c.Get("foo_list")
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
}
|
||||
290
third_party/github.com/coreos/go-etcd/etcd/requests.go
vendored
Normal file
290
third_party/github.com/coreos/go-etcd/etcd/requests.go
vendored
Normal file
@@ -0,0 +1,290 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Valid options for GET, PUT, POST, DELETE
|
||||
// Using CAPITALIZED_UNDERSCORE to emphasize that these
|
||||
// values are meant to be used as constants.
|
||||
var (
|
||||
VALID_GET_OPTIONS = validOptions{
|
||||
"recursive": reflect.Bool,
|
||||
"consistent": reflect.Bool,
|
||||
"sorted": reflect.Bool,
|
||||
"wait": reflect.Bool,
|
||||
"waitIndex": reflect.Uint64,
|
||||
}
|
||||
|
||||
VALID_PUT_OPTIONS = validOptions{
|
||||
"prevValue": reflect.String,
|
||||
"prevIndex": reflect.Uint64,
|
||||
"prevExist": reflect.Bool,
|
||||
}
|
||||
|
||||
VALID_POST_OPTIONS = validOptions{}
|
||||
|
||||
VALID_DELETE_OPTIONS = validOptions{
|
||||
"recursive": reflect.Bool,
|
||||
}
|
||||
|
||||
curlChan chan string
|
||||
)
|
||||
|
||||
// SetCurlChan sets a channel to which cURL commands which can be used to
|
||||
// re-produce requests are sent. This is useful for debugging.
|
||||
func SetCurlChan(c chan string) {
|
||||
curlChan = c
|
||||
}
|
||||
|
||||
// get issues a GET request
|
||||
func (c *Client) get(key string, options options) (*Response, error) {
|
||||
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
||||
|
||||
p := path.Join("keys", key)
|
||||
// If consistency level is set to STRONG, append
|
||||
// the `consistent` query string.
|
||||
if c.config.Consistency == STRONG_CONSISTENCY {
|
||||
options["consistent"] = true
|
||||
}
|
||||
if options != nil {
|
||||
str, err := optionsToString(options, VALID_GET_OPTIONS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p += str
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("GET", p, url.Values{})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// put issues a PUT request
|
||||
func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) {
|
||||
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
||||
v := url.Values{}
|
||||
|
||||
if value != "" {
|
||||
v.Set("value", value)
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
v.Set("ttl", fmt.Sprintf("%v", ttl))
|
||||
}
|
||||
|
||||
p := path.Join("keys", key)
|
||||
if options != nil {
|
||||
str, err := optionsToString(options, VALID_PUT_OPTIONS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p += str
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("PUT", p, v)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// post issues a POST request
|
||||
func (c *Client) post(key string, value string, ttl uint64) (*Response, error) {
|
||||
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
||||
v := url.Values{}
|
||||
|
||||
if value != "" {
|
||||
v.Set("value", value)
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
v.Set("ttl", fmt.Sprintf("%v", ttl))
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("POST", path.Join("keys", key), v)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// delete issues a DELETE request
|
||||
func (c *Client) delete(key string, options options) (*Response, error) {
|
||||
logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
|
||||
v := url.Values{}
|
||||
|
||||
p := path.Join("keys", key)
|
||||
if options != nil {
|
||||
str, err := optionsToString(options, VALID_DELETE_OPTIONS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p += str
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("DELETE", p, v)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// sendRequest sends a HTTP request and returns a Response as defined by etcd
|
||||
func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) {
|
||||
var body string = values.Encode()
|
||||
var resp *http.Response
|
||||
var req *http.Request
|
||||
|
||||
retry := 0
|
||||
// if we connect to a follower, we will retry until we found a leader
|
||||
for {
|
||||
var httpPath string
|
||||
|
||||
// If _path has schema already, then it's assumed to be
|
||||
// a complete URL and therefore needs no further processing.
|
||||
u, err := url.Parse(_path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if u.Scheme != "" {
|
||||
httpPath = _path
|
||||
} else {
|
||||
if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
||||
// If it's a GET and consistency level is set to WEAK,
|
||||
// then use a random machine.
|
||||
httpPath = c.getHttpPath(true, _path)
|
||||
} else {
|
||||
// Else use the leader.
|
||||
httpPath = c.getHttpPath(false, _path)
|
||||
}
|
||||
}
|
||||
|
||||
// Return a cURL command if curlChan is set
|
||||
if curlChan != nil {
|
||||
command := fmt.Sprintf("curl -X %s %s", method, httpPath)
|
||||
for key, value := range values {
|
||||
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
||||
}
|
||||
curlChan <- command
|
||||
}
|
||||
|
||||
logger.Debug("send.request.to ", httpPath, " | method ", method)
|
||||
if body == "" {
|
||||
|
||||
req, _ = http.NewRequest(method, httpPath, nil)
|
||||
|
||||
} else {
|
||||
req, _ = http.NewRequest(method, httpPath, strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
|
||||
}
|
||||
|
||||
resp, err = c.httpClient.Do(req)
|
||||
|
||||
logger.Debug("recv.response.from ", httpPath)
|
||||
// network error, change a machine!
|
||||
if err != nil {
|
||||
retry++
|
||||
if retry > 2*len(c.cluster.Machines) {
|
||||
return nil, errors.New("Cannot reach servers")
|
||||
}
|
||||
num := retry % len(c.cluster.Machines)
|
||||
logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")
|
||||
c.cluster.Leader = c.cluster.Machines[num]
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
continue
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
httpPath := resp.Header.Get("Location")
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if httpPath == "" {
|
||||
return nil, errors.New("Cannot get redirection location")
|
||||
}
|
||||
|
||||
c.updateLeader(httpPath)
|
||||
logger.Debug("send.redirect")
|
||||
// try to connect the leader
|
||||
continue
|
||||
} else if resp.StatusCode == http.StatusInternalServerError {
|
||||
resp.Body.Close()
|
||||
|
||||
retry++
|
||||
if retry > 2*len(c.cluster.Machines) {
|
||||
return nil, errors.New("Cannot reach servers")
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
logger.Debug("send.return.response ", httpPath)
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
logger.Debug("error.from ", httpPath, " ", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert HTTP response to etcd response
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !(resp.StatusCode == http.StatusOK ||
|
||||
resp.StatusCode == http.StatusCreated) {
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
var result Response
|
||||
|
||||
err = json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (c *Client) getHttpPath(random bool, s ...string) string {
|
||||
var machine string
|
||||
if random {
|
||||
machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
|
||||
} else {
|
||||
machine = c.cluster.Leader
|
||||
}
|
||||
|
||||
fullPath := machine + "/" + version
|
||||
for _, seg := range s {
|
||||
fullPath = fullPath + "/" + seg
|
||||
}
|
||||
|
||||
return fullPath
|
||||
}
|
||||
@@ -6,11 +6,12 @@ import (
|
||||
|
||||
// The response object from the server.
|
||||
type Response struct {
|
||||
Action string `json:"action"`
|
||||
Key string `json:"key"`
|
||||
Dir bool `json:"dir,omitempty"`
|
||||
PrevValue string `json:"prevValue,omitempty"`
|
||||
Value string `json:"value,omitempty"`
|
||||
Action string `json:"action"`
|
||||
Key string `json:"key"`
|
||||
Dir bool `json:"dir,omitempty"`
|
||||
PrevValue string `json:"prevValue,omitempty"`
|
||||
Value string `json:"value,omitempty"`
|
||||
Kvs kvPairs `json:"kvs,omitempty"`
|
||||
|
||||
// If the key did not exist before the action,
|
||||
// this field should be set to true
|
||||
@@ -22,5 +23,28 @@ type Response struct {
|
||||
TTL int64 `json:"ttl,omitempty"`
|
||||
|
||||
// The command index of the raft machine when the command is executed
|
||||
Index uint64 `json:"index"`
|
||||
ModifiedIndex uint64 `json:"modifiedIndex"`
|
||||
}
|
||||
|
||||
// When user list a directory, we add all the node into key-value pair slice
|
||||
type KeyValuePair struct {
|
||||
Key string `json:"key, omitempty"`
|
||||
Value string `json:"value,omitempty"`
|
||||
Dir bool `json:"dir,omitempty"`
|
||||
KVPairs kvPairs `json:"kvs,omitempty"`
|
||||
}
|
||||
|
||||
type kvPairs []KeyValuePair
|
||||
|
||||
// interfaces for sorting
|
||||
func (kvs kvPairs) Len() int {
|
||||
return len(kvs)
|
||||
}
|
||||
|
||||
func (kvs kvPairs) Less(i, j int) bool {
|
||||
return kvs[i].Key < kvs[j].Key
|
||||
}
|
||||
|
||||
func (kvs kvPairs) Swap(i, j int) {
|
||||
kvs[i], kvs[j] = kvs[j], kvs[i]
|
||||
}
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
)
|
||||
|
||||
func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
|
||||
logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
||||
v := url.Values{}
|
||||
v.Set("value", value)
|
||||
|
||||
if ttl > 0 {
|
||||
v.Set("ttl", fmt.Sprintf("%v", ttl))
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
return convertSetResponse(b)
|
||||
|
||||
}
|
||||
|
||||
// SetTo sets the value of the key to a given machine address.
|
||||
// If the given machine is not available or is not leader it returns an error
|
||||
// Mainly use for testing purpose.
|
||||
func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*Response, error) {
|
||||
v := url.Values{}
|
||||
v.Set("value", value)
|
||||
|
||||
if ttl > 0 {
|
||||
v.Set("ttl", fmt.Sprintf("%v", ttl))
|
||||
}
|
||||
|
||||
httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
|
||||
|
||||
resp, err := c.httpClient.PostForm(httpPath, v)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
return convertSetResponse(b)
|
||||
}
|
||||
|
||||
// Convert byte stream to response.
|
||||
func convertSetResponse(b []byte) (*Response, error) {
|
||||
var result Response
|
||||
|
||||
err := json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
43
third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go
vendored
Normal file
43
third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go
vendored
Normal file
@@ -0,0 +1,43 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSetCurlChan(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
}()
|
||||
|
||||
curlChan := make(chan string, 1)
|
||||
SetCurlChan(curlChan)
|
||||
|
||||
_, err := c.Set("foo", "bar", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5",
|
||||
c.cluster.Leader)
|
||||
actual := <-curlChan
|
||||
if expected != actual {
|
||||
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
||||
actual, expected)
|
||||
}
|
||||
|
||||
c.SetConsistency(STRONG_CONSISTENCY)
|
||||
_, err = c.Get("foo", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&sorted=false",
|
||||
c.cluster.Leader)
|
||||
actual = <-curlChan
|
||||
if expected != actual {
|
||||
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
||||
actual, expected)
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
|
||||
result, err := c.Set("foo", "bar", 100)
|
||||
|
||||
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
result, err = c.Set("foo", "bar", 100)
|
||||
|
||||
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
|
||||
}
|
||||
|
||||
result, err = c.SetTo("toFoo", "bar", 100, "0.0.0.0:4001")
|
||||
|
||||
if err != nil || result.Key != "/toFoo" || result.Value != "bar" || result.TTL != 99 {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Fatalf("SetTo failed with %s %s %v", result.Key, result.Value, result.TTL)
|
||||
}
|
||||
|
||||
}
|
||||
43
third_party/github.com/coreos/go-etcd/etcd/set_update_create.go
vendored
Normal file
43
third_party/github.com/coreos/go-etcd/etcd/set_update_create.go
vendored
Normal file
@@ -0,0 +1,43 @@
|
||||
package etcd
|
||||
|
||||
// SetDir sets the given key to a directory.
|
||||
func (c *Client) SetDir(key string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, "", ttl, nil)
|
||||
}
|
||||
|
||||
// UpdateDir updates the given key to a directory. It succeeds only if the
|
||||
// given key already exists.
|
||||
func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, "", ttl, options{
|
||||
"prevExist": true,
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateDir creates a directory under the given key. It succeeds only if
|
||||
// the given key does not yet exist.
|
||||
func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, "", ttl, options{
|
||||
"prevExist": false,
|
||||
})
|
||||
}
|
||||
|
||||
// Set sets the given key to the given value.
|
||||
func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, value, ttl, nil)
|
||||
}
|
||||
|
||||
// Update updates the given key to the given value. It succeeds only if the
|
||||
// given key already exists.
|
||||
func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, value, ttl, options{
|
||||
"prevExist": true,
|
||||
})
|
||||
}
|
||||
|
||||
// Create creates a file with the given value under the given key. It succeeds
|
||||
// only if the given key does not yet exist.
|
||||
func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) {
|
||||
return c.put(key, value, ttl, options{
|
||||
"prevExist": false,
|
||||
})
|
||||
}
|
||||
183
third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go
vendored
Normal file
183
third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go
vendored
Normal file
@@ -0,0 +1,183 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
}()
|
||||
|
||||
resp, err := c.Set("foo", "bar", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.Key != "/foo" || resp.Value != "bar" || resp.TTL != 5 {
|
||||
t.Fatalf("Set 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
resp, err = c.Set("foo", "bar2", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/foo" && resp.Value == "bar2" &&
|
||||
resp.PrevValue == "bar" && resp.TTL == 5) {
|
||||
t.Fatalf("Set 2 failed: %#v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdate(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
c.DeleteAll("nonexistent")
|
||||
}()
|
||||
|
||||
resp, err := c.Set("foo", "bar", 5)
|
||||
t.Logf("%#v", resp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// This should succeed.
|
||||
resp, err = c.Update("foo", "wakawaka", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.Action == "update" && resp.Key == "/foo" &&
|
||||
resp.PrevValue == "bar" && resp.TTL == 5) {
|
||||
t.Fatalf("Update 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail because the key does not exist.
|
||||
resp, err = c.Update("nonexistent", "whatever", 5)
|
||||
if err == nil {
|
||||
t.Fatalf("The key %v did not exist, so the update should have failed."+
|
||||
"The response was: %#v", resp.Key, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("newKey")
|
||||
}()
|
||||
|
||||
newKey := "/newKey"
|
||||
newValue := "/newValue"
|
||||
|
||||
// This should succeed
|
||||
resp, err := c.Create(newKey, newValue, 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.Action == "create" && resp.Key == newKey &&
|
||||
resp.Value == newValue && resp.PrevValue == "" && resp.TTL == 5) {
|
||||
t.Fatalf("Create 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail, because the key is already there
|
||||
resp, err = c.Create(newKey, newValue, 5)
|
||||
if err == nil {
|
||||
t.Fatalf("The key %v did exist, so the creation should have failed."+
|
||||
"The response was: %#v", resp.Key, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDir(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("foo")
|
||||
c.DeleteAll("fooDir")
|
||||
}()
|
||||
|
||||
resp, err := c.SetDir("fooDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/fooDir" && resp.Value == "" && resp.TTL == 5) {
|
||||
t.Fatalf("SetDir 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail because /fooDir already points to a directory
|
||||
resp, err = c.SetDir("/fooDir", 5)
|
||||
if err == nil {
|
||||
t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+
|
||||
"The response was: %#v", resp)
|
||||
}
|
||||
|
||||
_, err = c.Set("foo", "bar", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// This should succeed
|
||||
resp, err = c.SetDir("foo", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/foo" && resp.Value == "" &&
|
||||
resp.PrevValue == "bar" && resp.TTL == 5) {
|
||||
t.Fatalf("SetDir 2 failed: %#v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDir(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("fooDir")
|
||||
}()
|
||||
|
||||
resp, err := c.SetDir("fooDir", 5)
|
||||
t.Logf("%#v", resp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// This should succeed.
|
||||
resp, err = c.UpdateDir("fooDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.Action == "update" && resp.Key == "/fooDir" &&
|
||||
resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) {
|
||||
t.Fatalf("UpdateDir 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail because the key does not exist.
|
||||
resp, err = c.UpdateDir("nonexistentDir", 5)
|
||||
if err == nil {
|
||||
t.Fatalf("The key %v did not exist, so the update should have failed."+
|
||||
"The response was: %#v", resp.Key, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateDir(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("fooDir")
|
||||
}()
|
||||
|
||||
// This should succeed
|
||||
resp, err := c.CreateDir("fooDir", 5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !(resp.Action == "create" && resp.Key == "/fooDir" &&
|
||||
resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) {
|
||||
t.Fatalf("CreateDir 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
// This should fail, because the key is already there
|
||||
resp, err = c.CreateDir("fooDir", 5)
|
||||
if err == nil {
|
||||
t.Fatalf("The key %v did exist, so the creation should have failed."+
|
||||
"The response was: %#v", resp.Key, resp)
|
||||
}
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
)
|
||||
|
||||
func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*Response, bool, error) {
|
||||
logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader)
|
||||
v := url.Values{}
|
||||
v.Set("value", value)
|
||||
v.Set("prevValue", prevValue)
|
||||
|
||||
if ttl > 0 {
|
||||
v.Set("ttl", fmt.Sprintf("%v", ttl))
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode())
|
||||
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, false, handleError(b)
|
||||
}
|
||||
|
||||
var result Response
|
||||
|
||||
err = json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if result.PrevValue == prevValue && result.Value == value {
|
||||
|
||||
return &result, true, nil
|
||||
}
|
||||
|
||||
return &result, false, nil
|
||||
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTestAndSet(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
|
||||
c.Set("foo_testAndSet", "bar", 100)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
results := make(chan bool, 3)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
testAndSet("foo_testAndSet", "bar", "barbar", results, c)
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
result := <-results
|
||||
if result {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
if count != 1 {
|
||||
t.Fatalf("test and set fails %v", count)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testAndSet(key string, prevValue string, value string, ch chan bool, c *Client) {
|
||||
_, success, _ := c.TestAndSet(key, prevValue, value, 0)
|
||||
ch <- success
|
||||
}
|
||||
33
third_party/github.com/coreos/go-etcd/etcd/utils.go
vendored
Normal file
33
third_party/github.com/coreos/go-etcd/etcd/utils.go
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
// Utility functions
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// Convert options to a string of HTML parameters
|
||||
func optionsToString(options options, vops validOptions) (string, error) {
|
||||
p := "?"
|
||||
v := url.Values{}
|
||||
for opKey, opVal := range options {
|
||||
// Check if the given option is valid (that it exists)
|
||||
kind := vops[opKey]
|
||||
if kind == reflect.Invalid {
|
||||
return "", fmt.Errorf("Invalid option: %v", opKey)
|
||||
}
|
||||
|
||||
// Check if the given option is of the valid type
|
||||
t := reflect.TypeOf(opVal)
|
||||
if kind != t.Kind() {
|
||||
return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.",
|
||||
opKey, kind, t.Kind())
|
||||
}
|
||||
|
||||
v.Set(opKey, fmt.Sprintf("%v", opVal))
|
||||
}
|
||||
p += v.Encode()
|
||||
return p, nil
|
||||
}
|
||||
@@ -1,3 +1,3 @@
|
||||
package etcd
|
||||
|
||||
const version = "v1"
|
||||
const version = "v2"
|
||||
|
||||
130
third_party/github.com/coreos/go-etcd/etcd/watch.go
vendored
130
third_party/github.com/coreos/go-etcd/etcd/watch.go
vendored
@@ -1,42 +1,47 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
)
|
||||
|
||||
type respAndErr struct {
|
||||
resp *http.Response
|
||||
err error
|
||||
}
|
||||
|
||||
// Errors introduced by the Watch command.
|
||||
var (
|
||||
ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel")
|
||||
)
|
||||
|
||||
// Watch any change under the given prefix.
|
||||
// When a sinceIndex is given, watch will try to scan from that index to the last index
|
||||
// and will return any changes under the given prefix during the history
|
||||
// WatchAll returns the first change under the given prefix since the given index. To
|
||||
// watch for the latest change, set waitIndex = 0.
|
||||
//
|
||||
// If the prefix points to a directory, any change under it, including all child directories,
|
||||
// will be returned.
|
||||
//
|
||||
// If a receiver channel is given, it will be a long-term watch. Watch will block at the
|
||||
// channel. And after someone receive the channel, it will go on to watch that prefix.
|
||||
// If a stop channel is given, client can close long-term watch using the stop channel
|
||||
func (c *Client) WatchAll(prefix string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) {
|
||||
return c.watch(prefix, waitIndex, true, receiver, stop)
|
||||
}
|
||||
|
||||
func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) {
|
||||
// Watch returns the first change to the given key since the given index. To
|
||||
// watch for the latest change, set waitIndex = 0.
|
||||
//
|
||||
// If a receiver channel is given, it will be a long-term watch. Watch will block at the
|
||||
// channel. And after someone receive the channel, it will go on to watch that
|
||||
// prefix. If a stop channel is given, client can close long-term watch using
|
||||
// the stop channel
|
||||
func (c *Client) Watch(key string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) {
|
||||
return c.watch(key, waitIndex, false, receiver, stop)
|
||||
}
|
||||
|
||||
func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver chan *Response, stop chan bool) (*Response, error) {
|
||||
logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader)
|
||||
if receiver == nil {
|
||||
return c.watchOnce(prefix, sinceIndex, stop)
|
||||
|
||||
return c.watchOnce(prefix, waitIndex, recursive, stop)
|
||||
} else {
|
||||
for {
|
||||
resp, err := c.watchOnce(prefix, sinceIndex, stop)
|
||||
resp, err := c.watchOnce(prefix, waitIndex, recursive, stop)
|
||||
if resp != nil {
|
||||
sinceIndex = resp.Index + 1
|
||||
waitIndex = resp.ModifiedIndex + 1
|
||||
receiver <- resp
|
||||
} else {
|
||||
return nil, err
|
||||
@@ -49,70 +54,37 @@ func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response
|
||||
|
||||
// helper func
|
||||
// return when there is change under the given prefix
|
||||
func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*Response, error) {
|
||||
func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*Response, error) {
|
||||
|
||||
var resp *http.Response
|
||||
var err error
|
||||
respChan := make(chan *Response)
|
||||
errChan := make(chan error)
|
||||
|
||||
if stop != nil {
|
||||
ch := make(chan respAndErr)
|
||||
|
||||
go func() {
|
||||
resp, err = c.sendWatchRequest(key, sinceIndex)
|
||||
|
||||
ch <- respAndErr{resp, err}
|
||||
}()
|
||||
|
||||
// select at stop or continue to receive
|
||||
select {
|
||||
|
||||
case res := <-ch:
|
||||
resp, err = res.resp, res.err
|
||||
|
||||
case <-stop:
|
||||
resp, err = nil, ErrWatchStoppedByUser
|
||||
go func() {
|
||||
options := options{
|
||||
"wait": true,
|
||||
}
|
||||
if waitIndex > 0 {
|
||||
options["waitIndex"] = waitIndex
|
||||
}
|
||||
if recursive {
|
||||
options["recursive"] = true
|
||||
}
|
||||
} else {
|
||||
resp, err = c.sendWatchRequest(key, sinceIndex)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
resp, err := c.get(key, options)
|
||||
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
respChan <- resp
|
||||
}()
|
||||
|
||||
select {
|
||||
case resp := <-respChan:
|
||||
return resp, nil
|
||||
case err := <-errChan:
|
||||
return nil, err
|
||||
case <-stop:
|
||||
return nil, ErrWatchStoppedByUser
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
|
||||
return nil, handleError(b)
|
||||
}
|
||||
|
||||
var result Response
|
||||
|
||||
err = json.Unmarshal(b, &result)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (c *Client) sendWatchRequest(key string, sinceIndex uint64) (*http.Response, error) {
|
||||
if sinceIndex == 0 {
|
||||
resp, err := c.sendRequest("GET", path.Join("watch", key), "")
|
||||
return resp, err
|
||||
} else {
|
||||
v := url.Values{}
|
||||
v.Set("index", fmt.Sprintf("%v", sinceIndex))
|
||||
resp, err := c.sendRequest("POST", path.Join("watch", key), v.Encode())
|
||||
return resp, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,31 +8,34 @@ import (
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("watch_foo")
|
||||
}()
|
||||
|
||||
go setHelper("bar", c)
|
||||
go setHelper("watch_foo", "bar", c)
|
||||
|
||||
result, err := c.Watch("watch_foo", 0, nil, nil)
|
||||
|
||||
if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
|
||||
resp, err := c.Watch("watch_foo", 0, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/watch_foo" && resp.Value == "bar") {
|
||||
t.Fatalf("Watch 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
result, err = c.Watch("watch_foo", result.Index, nil, nil)
|
||||
go setHelper("watch_foo", "bar", c)
|
||||
|
||||
if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
|
||||
resp, err = c.Watch("watch_foo", resp.ModifiedIndex, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/watch_foo" && resp.Value == "bar") {
|
||||
t.Fatalf("Watch 2 failed: %#v", resp)
|
||||
}
|
||||
|
||||
ch := make(chan *Response, 10)
|
||||
stop := make(chan bool, 1)
|
||||
|
||||
go setLoop("bar", c)
|
||||
go setLoop("watch_foo", "bar", c)
|
||||
|
||||
go receiver(ch, stop)
|
||||
|
||||
@@ -42,16 +45,55 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func setHelper(value string, c *Client) {
|
||||
time.Sleep(time.Second)
|
||||
c.Set("watch_foo/foo", value, 100)
|
||||
func TestWatchAll(t *testing.T) {
|
||||
c := NewClient(nil)
|
||||
defer func() {
|
||||
c.DeleteAll("watch_foo")
|
||||
}()
|
||||
|
||||
go setHelper("watch_foo/foo", "bar", c)
|
||||
|
||||
resp, err := c.WatchAll("watch_foo", 0, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") {
|
||||
t.Fatalf("WatchAll 1 failed: %#v", resp)
|
||||
}
|
||||
|
||||
go setHelper("watch_foo/foo", "bar", c)
|
||||
|
||||
resp, err = c.WatchAll("watch_foo", resp.ModifiedIndex, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") {
|
||||
t.Fatalf("WatchAll 2 failed: %#v", resp)
|
||||
}
|
||||
|
||||
ch := make(chan *Response, 10)
|
||||
stop := make(chan bool, 1)
|
||||
|
||||
go setLoop("watch_foo/foo", "bar", c)
|
||||
|
||||
go receiver(ch, stop)
|
||||
|
||||
_, err = c.WatchAll("watch_foo", 0, ch, stop)
|
||||
if err != ErrWatchStoppedByUser {
|
||||
t.Fatalf("Watch returned a non-user stop error")
|
||||
}
|
||||
}
|
||||
|
||||
func setLoop(value string, c *Client) {
|
||||
func setHelper(key, value string, c *Client) {
|
||||
time.Sleep(time.Second)
|
||||
c.Set(key, value, 100)
|
||||
}
|
||||
|
||||
func setLoop(key, value string, c *Client) {
|
||||
time.Sleep(time.Second)
|
||||
for i := 0; i < 10; i++ {
|
||||
newValue := fmt.Sprintf("%s_%v", value, i)
|
||||
c.Set("watch_foo/foo", newValue, 100)
|
||||
c.Set(key, newValue, 100)
|
||||
time.Sleep(time.Second / 10)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
var count = 0
|
||||
|
||||
func main() {
|
||||
|
||||
good := 0
|
||||
bad := 0
|
||||
|
||||
ch := make(chan bool, 10)
|
||||
// set up a lock
|
||||
c := etcd.NewClient()
|
||||
c.Set("lock", "unlock", 0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
go t(i, ch, etcd.NewClient())
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if <-ch {
|
||||
good++
|
||||
} else {
|
||||
bad++
|
||||
}
|
||||
}
|
||||
fmt.Println("good: ", good, "bad: ", bad)
|
||||
}
|
||||
|
||||
func t(num int, ch chan bool, c *etcd.Client) {
|
||||
for i := 0; i < 100; i++ {
|
||||
if lock(c) {
|
||||
// a stupid spin lock
|
||||
count++
|
||||
fmt.Println(num, " got the lock and update count to", count)
|
||||
unlock(c)
|
||||
fmt.Println(num, " released the lock")
|
||||
} else {
|
||||
ch <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
ch <- true
|
||||
}
|
||||
|
||||
// A stupid spin lock
|
||||
func lock(c *etcd.Client) bool {
|
||||
for {
|
||||
_, success, _ := c.TestAndSet("lock", "unlock", "lock", 0)
|
||||
|
||||
if success != true {
|
||||
fmt.Println("tried lock failed!")
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func unlock(c *etcd.Client) {
|
||||
for {
|
||||
_, err := c.Set("lock", "unlock", 0)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"time"
|
||||
)
|
||||
|
||||
var count = 0
|
||||
|
||||
func main() {
|
||||
ch := make(chan bool, 10)
|
||||
// set up a lock
|
||||
for i := 0; i < 100; i++ {
|
||||
go t(i, ch, etcd.NewClient())
|
||||
}
|
||||
start := time.Now()
|
||||
for i := 0; i < 100; i++ {
|
||||
<-ch
|
||||
}
|
||||
fmt.Println(time.Now().Sub(start), ": ", 100*50, "commands")
|
||||
}
|
||||
|
||||
func t(num int, ch chan bool, c *etcd.Client) {
|
||||
c.SyncCluster()
|
||||
for i := 0; i < 50; i++ {
|
||||
str := fmt.Sprintf("foo_%d", num*i)
|
||||
c.Set(str, "10", 0)
|
||||
}
|
||||
ch <- true
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
Example script from the sync-cluster bug https://github.com/coreos/go-etcd/issues/27
|
||||
|
||||
TODO: turn this into a test case
|
||||
@@ -1,51 +0,0 @@
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("etcd-client started")
|
||||
c := etcd.NewClient(nil)
|
||||
c.SetCluster([]string{
|
||||
"http://127.0.0.1:4001",
|
||||
"http://127.0.0.1:4002",
|
||||
"http://127.0.0.1:4003",
|
||||
})
|
||||
|
||||
ticker := time.NewTicker(time.Second * 3)
|
||||
|
||||
for {
|
||||
select {
|
||||
case d := <-ticker.C:
|
||||
n := d.Second()
|
||||
if n <= 0 {
|
||||
n = 60
|
||||
}
|
||||
|
||||
for ok := c.SyncCluster(); ok == false; {
|
||||
fmt.Println("SyncCluster failed, trying again")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
result, err := c.Set("foo", "exp_"+strconv.Itoa(n), 0)
|
||||
if err != nil {
|
||||
fmt.Println("set error", err)
|
||||
} else {
|
||||
fmt.Printf("set %+v\n", result)
|
||||
}
|
||||
|
||||
ss, err := c.Get("foo")
|
||||
if err != nil {
|
||||
fmt.Println("get error", err)
|
||||
} else {
|
||||
fmt.Println(len(ss))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user