mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Godep: update go-etcd version
This commit is contained in:
parent
7ad2b22498
commit
f6f7ef6b3a
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -22,8 +22,8 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/coreos/go-etcd/etcd",
|
"ImportPath": "github.com/coreos/go-etcd/etcd",
|
||||||
"Comment": "v0.2.0-rc1-130-g6aa2da5",
|
"Comment": "v2.0.0-7-g73a8ef7",
|
||||||
"Rev": "6aa2da5a7a905609c93036b9307185a04a5a84a5"
|
"Rev": "73a8ef737e8ea002281a28b4cb92a1de121ad4c6"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/coreos/go-semver/semver",
|
"ImportPath": "github.com/coreos/go-semver/semver",
|
||||||
|
70
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go
generated
vendored
70
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go
generated
vendored
@ -7,11 +7,13 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,6 +30,10 @@ const (
|
|||||||
defaultBufferSize = 10
|
defaultBufferSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rand.Seed(int64(time.Now().Nanosecond()))
|
||||||
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
CertFile string `json:"certFile"`
|
CertFile string `json:"certFile"`
|
||||||
KeyFile string `json:"keyFile"`
|
KeyFile string `json:"keyFile"`
|
||||||
@ -36,10 +42,17 @@ type Config struct {
|
|||||||
Consistency string `json:"consistency"`
|
Consistency string `json:"consistency"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type credentials struct {
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
config Config `json:"config"`
|
config Config `json:"config"`
|
||||||
cluster *Cluster `json:"cluster"`
|
cluster *Cluster `json:"cluster"`
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
credentials *credentials
|
||||||
|
transport *http.Transport
|
||||||
persistence io.Writer
|
persistence io.Writer
|
||||||
cURLch chan string
|
cURLch chan string
|
||||||
// CheckRetry can be used to control the policy for failed requests
|
// CheckRetry can be used to control the policy for failed requests
|
||||||
@ -64,8 +77,7 @@ func NewClient(machines []string) *Client {
|
|||||||
config := Config{
|
config := Config{
|
||||||
// default timeout is one second
|
// default timeout is one second
|
||||||
DialTimeout: time.Second,
|
DialTimeout: time.Second,
|
||||||
// default consistency level is STRONG
|
Consistency: WEAK_CONSISTENCY,
|
||||||
Consistency: STRONG_CONSISTENCY,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &Client{
|
client := &Client{
|
||||||
@ -89,8 +101,7 @@ func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error)
|
|||||||
config := Config{
|
config := Config{
|
||||||
// default timeout is one second
|
// default timeout is one second
|
||||||
DialTimeout: time.Second,
|
DialTimeout: time.Second,
|
||||||
// default consistency level is STRONG
|
Consistency: WEAK_CONSISTENCY,
|
||||||
Consistency: STRONG_CONSISTENCY,
|
|
||||||
CertFile: cert,
|
CertFile: cert,
|
||||||
KeyFile: key,
|
KeyFile: key,
|
||||||
CaCertFile: make([]string, 0),
|
CaCertFile: make([]string, 0),
|
||||||
@ -166,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) {
|
|||||||
// Override the Client's HTTP Transport object
|
// Override the Client's HTTP Transport object
|
||||||
func (c *Client) SetTransport(tr *http.Transport) {
|
func (c *Client) SetTransport(tr *http.Transport) {
|
||||||
c.httpClient.Transport = tr
|
c.httpClient.Transport = tr
|
||||||
|
c.transport = tr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) SetCredentials(username, password string) {
|
||||||
|
c.credentials = &credentials{username, password}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Close() {
|
||||||
|
c.transport.DisableKeepAlives = true
|
||||||
|
c.transport.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
|
||||||
// initHTTPClient initializes a HTTP client for etcd client
|
// initHTTPClient initializes a HTTP client for etcd client
|
||||||
func (c *Client) initHTTPClient() {
|
func (c *Client) initHTTPClient() {
|
||||||
tr := &http.Transport{
|
c.transport = &http.Transport{
|
||||||
Dial: c.dial,
|
Dial: c.dial,
|
||||||
TLSClientConfig: &tls.Config{
|
TLSClientConfig: &tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
c.httpClient = &http.Client{Transport: tr}
|
c.httpClient = &http.Client{Transport: c.transport}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initHTTPClient initializes a HTTPS client for etcd client
|
// initHTTPClient initializes a HTTPS client for etcd client
|
||||||
@ -292,11 +313,28 @@ func (c *Client) SyncCluster() bool {
|
|||||||
// internalSyncCluster syncs cluster information using the given machine list.
|
// internalSyncCluster syncs cluster information using the given machine list.
|
||||||
func (c *Client) internalSyncCluster(machines []string) bool {
|
func (c *Client) internalSyncCluster(machines []string) bool {
|
||||||
for _, machine := range machines {
|
for _, machine := range machines {
|
||||||
|
httpPath := c.createHttpPath(machine, path.Join(version, "members"))
|
||||||
|
resp, err := c.httpClient.Get(httpPath)
|
||||||
|
if err != nil {
|
||||||
|
// try another machine in the cluster
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK { // fall-back to old endpoint
|
||||||
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
||||||
resp, err := c.httpClient.Get(httpPath)
|
resp, err := c.httpClient.Get(httpPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// try another machine in the cluster
|
// try another machine in the cluster
|
||||||
continue
|
continue
|
||||||
|
}
|
||||||
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
// try another machine in the cluster
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// update Machines List
|
||||||
|
c.cluster.updateFromStr(string(b))
|
||||||
} else {
|
} else {
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
@ -305,18 +343,26 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// update Machines List
|
var mCollection memberCollection
|
||||||
c.cluster.updateFromStr(string(b))
|
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||||
|
// try another machine
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// update leader
|
urls := make([]string, 0)
|
||||||
// the first one in the machine list is the leader
|
for _, m := range mCollection {
|
||||||
c.cluster.switchLeader(0)
|
urls = append(urls, m.ClientURLs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// update Machines List
|
||||||
|
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||||
|
}
|
||||||
|
|
||||||
logger.Debug("sync.machines ", c.cluster.Machines)
|
logger.Debug("sync.machines ", c.cluster.Machines)
|
||||||
c.saveConfig()
|
c.saveConfig()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
18
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go
generated
vendored
18
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go
generated
vendored
@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// To pass this test, we need to create a cluster of 3 machines
|
// To pass this test, we need to create a cluster of 3 machines
|
||||||
// The server should be listening on 127.0.0.1:4001, 4002, 4003
|
// The server should be listening on localhost:4001, 4002, 4003
|
||||||
func TestSync(t *testing.T) {
|
func TestSync(t *testing.T) {
|
||||||
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
|
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
|
||||||
|
|
||||||
@ -36,8 +36,8 @@ func TestSync(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if host != "127.0.0.1" {
|
if host != "localhost" {
|
||||||
t.Fatal("Host must be 127.0.0.1")
|
t.Fatal("Host must be localhost")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) {
|
|||||||
t.Fatalf("The two configs should be equal!")
|
t.Fatalf("The two configs should be equal!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientRetry(t *testing.T) {
|
||||||
|
c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"})
|
||||||
|
// use first endpoint as the picked url
|
||||||
|
c.cluster.picked = 0
|
||||||
|
if _, err := c.Set("foo", "bar", 5); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := c.Delete("foo", true); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
32
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go
generated
vendored
32
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go
generated
vendored
@ -1,13 +1,14 @@
|
|||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/url"
|
"math/rand"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
Leader string `json:"leader"`
|
Leader string `json:"leader"`
|
||||||
Machines []string `json:"machines"`
|
Machines []string `json:"machines"`
|
||||||
|
picked int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCluster(machines []string) *Cluster {
|
func NewCluster(machines []string) *Cluster {
|
||||||
@ -18,34 +19,19 @@ func NewCluster(machines []string) *Cluster {
|
|||||||
|
|
||||||
// default leader and machines
|
// default leader and machines
|
||||||
return &Cluster{
|
return &Cluster{
|
||||||
Leader: machines[0],
|
Leader: "",
|
||||||
Machines: machines,
|
Machines: machines,
|
||||||
|
picked: rand.Intn(len(machines)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// switchLeader switch the current leader to machines[num]
|
func (cl *Cluster) failure() { cl.picked = rand.Intn(len(cl.Machines)) }
|
||||||
func (cl *Cluster) switchLeader(num int) {
|
func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
|
||||||
logger.Debugf("switch.leader[from %v to %v]",
|
|
||||||
cl.Leader, cl.Machines[num])
|
|
||||||
|
|
||||||
cl.Leader = cl.Machines[num]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cl *Cluster) updateFromStr(machines string) {
|
func (cl *Cluster) updateFromStr(machines string) {
|
||||||
cl.Machines = strings.Split(machines, ",")
|
cl.Machines = strings.Split(machines, ",")
|
||||||
|
for i := range cl.Machines {
|
||||||
|
cl.Machines[i] = strings.TrimSpace(cl.Machines[i])
|
||||||
}
|
}
|
||||||
|
cl.picked = rand.Intn(len(cl.Machines))
|
||||||
func (cl *Cluster) updateLeader(leader string) {
|
|
||||||
logger.Debugf("update.leader[%s,%s]", cl.Leader, leader)
|
|
||||||
cl.Leader = leader
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cl *Cluster) updateLeaderFromURL(u *url.URL) {
|
|
||||||
var leader string
|
|
||||||
if u.Scheme == "" {
|
|
||||||
leader = "http://" + u.Host
|
|
||||||
} else {
|
|
||||||
leader = u.Scheme + "://" + u.Host
|
|
||||||
}
|
|
||||||
cl.updateLeader(leader)
|
|
||||||
}
|
}
|
||||||
|
1
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/config.json
generated
vendored
Normal file
1
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/config.json
generated
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"config":{"certFile":"","keyFile":"","caCertFiles":null,"timeout":1000000000,"consistency":"STRONG"},"cluster":{"leader":"http://127.0.0.1:4001","machines":["http://127.0.0.1:4001"]}}
|
1
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go
generated
vendored
1
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go
generated
vendored
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
ErrCodeEtcdNotReachable = 501
|
ErrCodeEtcdNotReachable = 501
|
||||||
|
ErrCodeUnhandledHTTPStatus = 502
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
5
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go
generated
vendored
5
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go
generated
vendored
@ -18,9 +18,14 @@ func (c *Client) Get(key string, sort, recursive bool) (*Response, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) {
|
func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) {
|
||||||
|
var q bool
|
||||||
|
if c.config.Consistency == STRONG_CONSISTENCY {
|
||||||
|
q = true
|
||||||
|
}
|
||||||
ops := Options{
|
ops := Options{
|
||||||
"recursive": recursive,
|
"recursive": recursive,
|
||||||
"sorted": sort,
|
"sorted": sort,
|
||||||
|
"quorum": q,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.get(key, ops)
|
return c.get(key, ops)
|
||||||
|
30
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go
generated
vendored
Normal file
30
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go
generated
vendored
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package etcd
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
type Member struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
PeerURLs []string `json:"peerURLs"`
|
||||||
|
ClientURLs []string `json:"clientURLs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type memberCollection []Member
|
||||||
|
|
||||||
|
func (c *memberCollection) UnmarshalJSON(data []byte) error {
|
||||||
|
d := struct {
|
||||||
|
Members []Member
|
||||||
|
}{}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(data, &d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Members == nil {
|
||||||
|
*c = make([]Member, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
*c = d.Members
|
||||||
|
return nil
|
||||||
|
}
|
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMemberCollectionUnmarshal(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
body []byte
|
||||||
|
want memberCollection
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
body: []byte(`{"members":[]}`),
|
||||||
|
want: memberCollection([]Member{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
|
||||||
|
want: memberCollection(
|
||||||
|
[]Member{
|
||||||
|
{
|
||||||
|
ID: "2745e2525fce8fe",
|
||||||
|
Name: "node3",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:7003",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:4003",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "42134f434382925",
|
||||||
|
Name: "node1",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:2380",
|
||||||
|
"http://127.0.0.1:7001",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:2379",
|
||||||
|
"http://127.0.0.1:4001",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "94088180e21eb87b",
|
||||||
|
Name: "node2",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:7002",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:4002",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
var got memberCollection
|
||||||
|
err := json.Unmarshal(tt.body, &got)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("#%d: unexpected error: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(tt.want, got) {
|
||||||
|
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
2
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go
generated
vendored
2
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go
generated
vendored
@ -18,7 +18,7 @@ type validOptions map[string]reflect.Kind
|
|||||||
var (
|
var (
|
||||||
VALID_GET_OPTIONS = validOptions{
|
VALID_GET_OPTIONS = validOptions{
|
||||||
"recursive": reflect.Bool,
|
"recursive": reflect.Bool,
|
||||||
"consistent": reflect.Bool,
|
"quorum": reflect.Bool,
|
||||||
"sorted": reflect.Bool,
|
"sorted": reflect.Bool,
|
||||||
"wait": reflect.Bool,
|
"wait": reflect.Bool,
|
||||||
"waitIndex": reflect.Uint64,
|
"waitIndex": reflect.Uint64,
|
||||||
|
89
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go
generated
vendored
89
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go
generated
vendored
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
@ -39,15 +38,9 @@ func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan
|
|||||||
// getCancelable issues a cancelable GET request
|
// getCancelable issues a cancelable GET request
|
||||||
func (c *Client) getCancelable(key string, options Options,
|
func (c *Client) getCancelable(key string, options Options,
|
||||||
cancel <-chan bool) (*RawResponse, error) {
|
cancel <-chan bool) (*RawResponse, error) {
|
||||||
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
logger.Debugf("get %s [%s]", key, c.cluster.pick())
|
||||||
p := keyToPath(key)
|
p := keyToPath(key)
|
||||||
|
|
||||||
// If consistency level is set to STRONG, append
|
|
||||||
// the `consistent` query string.
|
|
||||||
if c.config.Consistency == STRONG_CONSISTENCY {
|
|
||||||
options["consistent"] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
str, err := options.toParameters(VALID_GET_OPTIONS)
|
str, err := options.toParameters(VALID_GET_OPTIONS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -73,7 +66,7 @@ func (c *Client) get(key string, options Options) (*RawResponse, error) {
|
|||||||
func (c *Client) put(key string, value string, ttl uint64,
|
func (c *Client) put(key string, value string, ttl uint64,
|
||||||
options Options) (*RawResponse, error) {
|
options Options) (*RawResponse, error) {
|
||||||
|
|
||||||
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
||||||
p := keyToPath(key)
|
p := keyToPath(key)
|
||||||
|
|
||||||
str, err := options.toParameters(VALID_PUT_OPTIONS)
|
str, err := options.toParameters(VALID_PUT_OPTIONS)
|
||||||
@ -94,7 +87,7 @@ func (c *Client) put(key string, value string, ttl uint64,
|
|||||||
|
|
||||||
// post issues a POST request
|
// post issues a POST request
|
||||||
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
||||||
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
||||||
p := keyToPath(key)
|
p := keyToPath(key)
|
||||||
|
|
||||||
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
|
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
|
||||||
@ -109,7 +102,7 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error
|
|||||||
|
|
||||||
// delete issues a DELETE request
|
// delete issues a DELETE request
|
||||||
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
||||||
logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
|
logger.Debugf("delete %s [%s]", key, c.cluster.pick())
|
||||||
p := keyToPath(key)
|
p := keyToPath(key)
|
||||||
|
|
||||||
str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
||||||
@ -130,7 +123,6 @@ func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|||||||
|
|
||||||
// SendRequest sends a HTTP request and returns a Response as defined by etcd
|
// SendRequest sends a HTTP request and returns a Response as defined by etcd
|
||||||
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
|
|
||||||
var req *http.Request
|
var req *http.Request
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
var httpPath string
|
var httpPath string
|
||||||
@ -196,13 +188,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
|
|
||||||
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
||||||
|
|
||||||
if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
// get httpPath if not set
|
||||||
// If it's a GET and consistency level is set to WEAK,
|
if httpPath == "" {
|
||||||
// then use a random machine.
|
httpPath = c.getHttpPath(rr.RelativePath)
|
||||||
httpPath = c.getHttpPath(true, rr.RelativePath)
|
|
||||||
} else {
|
|
||||||
// Else use the leader.
|
|
||||||
httpPath = c.getHttpPath(false, rr.RelativePath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a cURL command if curlChan is set
|
// Return a cURL command if curlChan is set
|
||||||
@ -211,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
for key, value := range rr.Values {
|
for key, value := range rr.Values {
|
||||||
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
||||||
}
|
}
|
||||||
|
if c.credentials != nil {
|
||||||
|
command += fmt.Sprintf(" -u %s", c.credentials.username)
|
||||||
|
}
|
||||||
c.sendCURL(command)
|
c.sendCURL(command)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.credentials != nil {
|
||||||
|
req.SetBasicAuth(c.credentials.username, c.credentials.password)
|
||||||
|
}
|
||||||
|
|
||||||
resp, err = c.httpClient.Do(req)
|
resp, err = c.httpClient.Do(req)
|
||||||
|
// clear previous httpPath
|
||||||
|
httpPath = ""
|
||||||
defer func() {
|
defer func() {
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
@ -264,7 +261,7 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
return nil, checkErr
|
return nil, checkErr
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cluster.switchLeader(attempt % len(c.cluster.Machines))
|
c.cluster.failure()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -295,17 +292,14 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if resp is TemporaryRedirect, set the new leader and retry
|
|
||||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||||
u, err := resp.Location()
|
u, err := resp.Location()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning(err)
|
logger.Warning(err)
|
||||||
} else {
|
} else {
|
||||||
// Update cluster leader based on redirect location
|
// set httpPath for following redirection
|
||||||
// because it should point to the leader address
|
httpPath = u.String()
|
||||||
c.cluster.updateLeaderFromURL(u)
|
|
||||||
logger.Debug("recv.response.relocate ", u.String())
|
|
||||||
}
|
}
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
continue
|
continue
|
||||||
@ -333,34 +327,45 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||||||
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
||||||
err error) error {
|
err error) error {
|
||||||
|
|
||||||
if numReqs >= 2*len(cluster.Machines) {
|
if numReqs > 2*len(cluster.Machines) {
|
||||||
return newError(ErrCodeEtcdNotReachable,
|
errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
|
||||||
"Tried to connect to each peer twice and failed", 0)
|
return newError(ErrCodeEtcdNotReachable, errStr, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
code := lastResp.StatusCode
|
if isEmptyResponse(lastResp) {
|
||||||
if code == http.StatusInternalServerError {
|
// always retry if it failed to get response from one machine
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !shouldRetry(lastResp) {
|
||||||
|
body := []byte("nil")
|
||||||
|
if lastResp.Body != nil {
|
||||||
|
if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
|
||||||
|
body = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
|
||||||
|
return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
|
||||||
|
}
|
||||||
|
// sleep some time and expect leader election finish
|
||||||
time.Sleep(time.Millisecond * 200)
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
logger.Warning("bad response status code", lastResp.StatusCode)
|
||||||
}
|
|
||||||
|
|
||||||
logger.Warning("bad response status code", code)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) getHttpPath(random bool, s ...string) string {
|
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
||||||
var machine string
|
|
||||||
if random {
|
// shouldRetry returns whether the reponse deserves retry.
|
||||||
machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
|
func shouldRetry(r http.Response) bool {
|
||||||
} else {
|
// TODO: only retry when the cluster is in leader election
|
||||||
machine = c.cluster.Leader
|
// We cannot do it exactly because etcd doesn't support it well.
|
||||||
|
return r.StatusCode == http.StatusInternalServerError
|
||||||
}
|
}
|
||||||
|
|
||||||
fullPath := machine + "/" + version
|
func (c *Client) getHttpPath(s ...string) string {
|
||||||
|
fullPath := c.cluster.pick() + "/" + version
|
||||||
for _, seg := range s {
|
for _, seg := range s {
|
||||||
fullPath = fullPath + "/" + seg
|
fullPath = fullPath + "/" + seg
|
||||||
}
|
}
|
||||||
|
|
||||||
return fullPath
|
return fullPath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
6
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go
generated
vendored
6
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go
generated
vendored
@ -19,7 +19,7 @@ func TestSetCurlChan(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5",
|
expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5",
|
||||||
c.cluster.Leader)
|
c.cluster.pick())
|
||||||
actual := c.RecvCURL()
|
actual := c.RecvCURL()
|
||||||
if expected != actual {
|
if expected != actual {
|
||||||
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
||||||
@ -32,8 +32,8 @@ func TestSetCurlChan(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&recursive=false&sorted=false",
|
expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?quorum=true&recursive=false&sorted=false",
|
||||||
c.cluster.Leader)
|
c.cluster.pick())
|
||||||
actual = c.RecvCURL()
|
actual = c.RecvCURL()
|
||||||
if expected != actual {
|
if expected != actual {
|
||||||
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
|
||||||
|
5
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go
generated
vendored
5
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go
generated
vendored
@ -1,3 +1,6 @@
|
|||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
const version = "v2"
|
const (
|
||||||
|
version = "v2"
|
||||||
|
packageVersion = "v2.0.0+git"
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user