From 351e84aece21dade1818e07e0ce56a23380a33ef Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Fri, 23 Aug 2013 11:57:51 -0700 Subject: [PATCH 01/18] feat(etcd): add listen host parameter this separates out the listening IP from the advertised IP. This is necessary so that we can hit etcd on 127.0.0.1 but also advertise the right IP to the rest of the cluster. --- etcd.go | 16 ++++++++++++---- etcd_server.go | 11 ++--------- raft_server.go | 9 +++++---- util.go | 16 ++++++++++++++++ 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/etcd.go b/etcd.go index 39743d5ab..0bc3ef4ed 100644 --- a/etcd.go +++ b/etcd.go @@ -50,8 +50,10 @@ func init() { flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)") - flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication") - flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication") + flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the advertised public hostname:port for etcd client communication") + flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the advertised public hostname:port for raft server communication") + flag.StringVar(&argInfo.EtcdListenHost, "cl", "127.0.0.1", "the listening hostname for etcd client communication") + flag.StringVar(&argInfo.RaftListenHost, "sl", "127.0.0.1", "the listening hostname for raft server communication") flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface") flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile") @@ -107,6 +109,9 @@ type Info struct { EtcdURL string `json:"etcdURL"` WebURL string `json:"webURL"` + RaftListenHost string `json:"raftListenHost"` + EtcdListenHost string `json:"etcdListenHost"` + RaftTLS TLSInfo `json:"raftTLS"` EtcdTLS TLSInfo `json:"etcdTLS"` } @@ -178,6 +183,9 @@ func main() { argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme) argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http") + argInfo.RaftListenHost = sanitizeListenHost(argInfo.RaftListenHost, argInfo.RaftURL) + argInfo.EtcdListenHost = sanitizeListenHost(argInfo.EtcdListenHost, argInfo.EtcdURL) + // Read server info from file or grab it from user. if err := os.MkdirAll(dirPath, 0744); err != nil { fatalf("Unable to create path: %s", err) @@ -190,8 +198,8 @@ func main() { snapConf = newSnapshotConf() // Create etcd and raft server - e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS) - r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS) + e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS) + r = newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) startWebInterface() r.ListenAndServe() diff --git a/etcd_server.go b/etcd_server.go index 51ff1e9e8..2cef01558 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -2,7 +2,6 @@ package main import ( "net/http" - "net/url" ) type etcdServer struct { @@ -15,18 +14,12 @@ type etcdServer struct { var e *etcdServer -func newEtcdServer(name string, urlStr string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer { - u, err := url.Parse(urlStr) - - if err != nil { - fatalf("invalid url '%s': %s", e.url, err) - } - +func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer { return &etcdServer{ Server: http.Server{ Handler: NewEtcdMuxer(), TLSConfig: &tlsConf.Server, - Addr: u.Host, + Addr: listenHost, }, name: name, url: urlStr, diff --git a/raft_server.go b/raft_server.go index a90c65f93..b3b5d3277 100644 --- a/raft_server.go +++ b/raft_server.go @@ -20,13 +20,14 @@ type raftServer struct { joinIndex uint64 name string url string + listenHost string tlsConf *TLSConfig tlsInfo *TLSInfo } var r *raftServer -func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { +func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client) @@ -41,6 +42,7 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo version: raftVersion, name: name, url: url, + listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, } @@ -134,15 +136,14 @@ func startAsFollower() { // Start to listen and response raft command func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { - u, _ := url.Parse(r.url) - infof("raft server [%s:%s]", r.name, u) + infof("raft server [%s:%s]", r.name, r.listenHost) raftMux := http.NewServeMux() server := &http.Server{ Handler: raftMux, TLSConfig: &tlsConf, - Addr: u.Host, + Addr: r.listenHost, } // internal commands diff --git a/util.go b/util.go index 5f86cbaa9..8ddaa93cc 100644 --- a/util.go +++ b/util.go @@ -106,6 +106,22 @@ func sanitizeURL(host string, defaultScheme string) string { return p.String() } +// sanitizeListenHost cleans up the ListenHost parameter and appends a port +// if necessary based on the advertised port. +func sanitizeListenHost(listen string, advertised string) string { + aurl, err := url.Parse(advertised) + if err != nil { + fatal(err) + } + + _, aport, err := net.SplitHostPort(aurl.Host) + if err != nil { + fatal(err) + } + + return net.JoinHostPort(listen, aport) +} + func check(err error) { if err != nil { fatal(err) From b8d85e627e86dffec240cbec293af93c921ab838 Mon Sep 17 00:00:00 2001 From: Theo Hultberg Date: Tue, 27 Aug 2013 16:30:25 +0200 Subject: [PATCH 02/18] Change leader and machine result examples in readme --- README.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f6a4aa86b..687e48ced 100644 --- a/README.md +++ b/README.md @@ -363,7 +363,7 @@ curl -L http://127.0.0.1:4001/v1/machines We should see there are three nodes in the cluster ``` -0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003 +http://0.0.0.0:4001, http://0.0.0.0:4002, http://0.0.0.0:4003 ``` The machine list is also available via this API: @@ -373,7 +373,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines ``` ```json -[{"action":"GET","key":"/machines/node1","value":"0.0.0.0,7001,4001","index":4},{"action":"GET","key":"/machines/node3","value":"0.0.0.0,7002,4002","index":4},{"action":"GET","key":"/machines/node4","value":"0.0.0.0,7003,4003","index":4}] +[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://0.0.0.0:7001&etcd=http://0.0.0.0:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://0.0.0.0:7002&etcd=http://0.0.0.0:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://0.0.0.0:7003&etcd=http://0.0.0.0:4003","index":4}] ``` The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```. @@ -386,7 +386,7 @@ curl -L http://127.0.0.1:4001/v1/leader The first server we set up should be the leader, if it has not dead during these commands. ``` -0.0.0.0:7001 +http://0.0.0.0:7001 ``` Now we can do normal SET and GET operations on keys as we explored earlier. @@ -414,7 +414,13 @@ curl -L http://127.0.0.1:4001/v1/leader ``` ``` -0.0.0.0:7002 or 0.0.0.0:7003 +http://0.0.0.0:7002 +``` + +or + +``` +http://0.0.0.0:7003 ``` You should be able to see this: From 59599dc5194e7b03a2384d835957bbb741835025 Mon Sep 17 00:00:00 2001 From: Evan Date: Fri, 30 Aug 2013 23:52:37 -0400 Subject: [PATCH 03/18] Update README.md add README version change -F to -d for consistence add -v to show SSL handshake message --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 687e48ced..84cd8a4c1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # etcd +README version 0.1 [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd) @@ -272,7 +273,7 @@ Next, lets configure etcd to use this keypair: You can now test the configuration using https: ```sh -curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar +curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v ``` You should be able to see the handshake succeed. @@ -302,7 +303,7 @@ We can also do authentication using CA certs. The clients will provide their cer Try the same request to this server: ```sh -curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar +curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v ``` The request should be rejected by the server. From bfeed190ea335ec8d7158ebf8a53edfc8d918b5c Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sat, 31 Aug 2013 21:53:08 -0700 Subject: [PATCH 04/18] feat(etcd): Default server listen and client listen to advertised IPs Map the advertised IP to the listening IP by default. This will make things nicer for the user. --- etcd.go | 4 ++-- util.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/etcd.go b/etcd.go index 0bc3ef4ed..46546e8cc 100644 --- a/etcd.go +++ b/etcd.go @@ -52,8 +52,8 @@ func init() { flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)") flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the advertised public hostname:port for etcd client communication") flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the advertised public hostname:port for raft server communication") - flag.StringVar(&argInfo.EtcdListenHost, "cl", "127.0.0.1", "the listening hostname for etcd client communication") - flag.StringVar(&argInfo.RaftListenHost, "sl", "127.0.0.1", "the listening hostname for raft server communication") + flag.StringVar(&argInfo.EtcdListenHost, "cl", "", "the listening hostname for etcd client communication (defaults to advertised ip)") + flag.StringVar(&argInfo.RaftListenHost, "sl", "", "the listening hostname for raft server communication (defaults to advertised ip)") flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface") flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile") diff --git a/util.go b/util.go index 8ddaa93cc..22cbed641 100644 --- a/util.go +++ b/util.go @@ -114,11 +114,16 @@ func sanitizeListenHost(listen string, advertised string) string { fatal(err) } - _, aport, err := net.SplitHostPort(aurl.Host) + ahost, aport, err := net.SplitHostPort(aurl.Host) if err != nil { fatal(err) } + // If the listen host isn't set use the advertised host + if listen == "" { + listen = ahost + } + return net.JoinHostPort(listen, aport) } From e28fd7cc2bee1c92980e8496fa1f09cbb72d74c4 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sat, 31 Aug 2013 21:53:58 -0700 Subject: [PATCH 05/18] fix(README): use 127.0.0.1 everywhere 0.0.0.0 used to be the default advertised ip, fix this everywhere. --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 687e48ced..32480ba6f 100644 --- a/README.md +++ b/README.md @@ -363,7 +363,7 @@ curl -L http://127.0.0.1:4001/v1/machines We should see there are three nodes in the cluster ``` -http://0.0.0.0:4001, http://0.0.0.0:4002, http://0.0.0.0:4003 +http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003 ``` The machine list is also available via this API: @@ -373,7 +373,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines ``` ```json -[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://0.0.0.0:7001&etcd=http://0.0.0.0:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://0.0.0.0:7002&etcd=http://0.0.0.0:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://0.0.0.0:7003&etcd=http://0.0.0.0:4003","index":4}] +[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003","index":4}] ``` The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```. @@ -386,7 +386,7 @@ curl -L http://127.0.0.1:4001/v1/leader The first server we set up should be the leader, if it has not dead during these commands. ``` -http://0.0.0.0:7001 +http://127.0.0.1:7001 ``` Now we can do normal SET and GET operations on keys as we explored earlier. @@ -414,13 +414,13 @@ curl -L http://127.0.0.1:4001/v1/leader ``` ``` -http://0.0.0.0:7002 +http://127.0.0.1:7002 ``` or ``` -http://0.0.0.0:7003 +http://127.0.0.1:7003 ``` You should be able to see this: From de0a8c60ac52073267df2ced3d0250bee885db41 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sat, 31 Aug 2013 22:10:30 -0700 Subject: [PATCH 06/18] feat(README): document -cl flag --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 32480ba6f..05e789228 100644 --- a/README.md +++ b/README.md @@ -347,6 +347,9 @@ We use -s to specify server port and -c to specify client port and -d to specify ./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d nodes/node1 -n node1 ``` +**Note:** If you want to run etcd on external IP address and still have access locally you need to add `-cl 0.0.0.0` so that it will listen on both external and localhost addresses. +A similar argument `-sl` is used to setup the listening address for the server port. + Let the join two more nodes to this cluster using the -C argument: ```sh From a22bd2b8b27b095b3d19f7a62f4fdcd93cf95fb2 Mon Sep 17 00:00:00 2001 From: Evan Date: Sun, 1 Sep 2013 17:38:41 -0400 Subject: [PATCH 07/18] full version for readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 84cd8a4c1..832815ee0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # etcd -README version 0.1 +README version 0.1.0 [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd) From a90bb85bb3e307d0c1f46912ca2d328b7f3b77aa Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Sun, 1 Sep 2013 17:48:29 -0400 Subject: [PATCH 08/18] modified: README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 687e48ced..43f621cb0 100644 --- a/README.md +++ b/README.md @@ -272,7 +272,8 @@ Next, lets configure etcd to use this keypair: You can now test the configuration using https: ```sh -curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar +curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d +value=bar -v ``` You should be able to see the handshake succeed. From 51941fa613b7d1c46ce9983a7886b224f8431ea5 Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Sun, 1 Sep 2013 21:41:57 -0400 Subject: [PATCH 09/18] add timeout for transportation layer --- raft_server.go | 26 ++++++++++---------- transporter.go | 59 ++++++++++++++++++++++++++++++++++++++++++--- transporter_test.go | 35 +++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 transporter_test.go diff --git a/raft_server.go b/raft_server.go index b3b5d3277..fa3cb3a39 100644 --- a/raft_server.go +++ b/raft_server.go @@ -16,13 +16,13 @@ import ( type raftServer struct { *raft.Server - version string - joinIndex uint64 - name string - url string + version string + joinIndex uint64 + name string + url string listenHost string - tlsConf *TLSConfig - tlsInfo *TLSInfo + tlsConf *TLSConfig + tlsInfo *TLSInfo } var r *raftServer @@ -30,7 +30,7 @@ var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client) + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.DefaultHeartbeatTimeout) // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) @@ -38,13 +38,13 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi check(err) return &raftServer{ - Server: server, - version: raftVersion, - name: name, - url: url, + Server: server, + version: raftVersion, + name: name, + url: url, listenHost: listenHost, - tlsConf: tlsConf, - tlsInfo: tlsInfo, + tlsConf: tlsConf, + tlsInfo: tlsInfo, } } diff --git a/transporter.go b/transporter.go index c49479bc8..66a179791 100644 --- a/transporter.go +++ b/transporter.go @@ -9,17 +9,19 @@ import ( "io" "net" "net/http" + "time" ) // Transporter layer for communication between raft nodes type transporter struct { - client *http.Client + client *http.Client + timeout time.Duration } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config) transporter { +func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) transporter { t := transporter{} tr := &http.Transport{ @@ -32,6 +34,7 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter { } t.client = &http.Client{Transport: tr} + t.timeout = timeout return t } @@ -151,10 +154,58 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft // Send server side POST request func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - return t.client.Post(path, "application/json", body) + + postChan := make(chan interface{}, 1) + + go func() { + resp, err := t.client.Post(path, "application/json", body) + if err == nil { + postChan <- resp + } else { + postChan <- err + } + }() + + return t.waitResponse(postChan) + } // Send server side GET request func (t transporter) Get(path string) (*http.Response, error) { - return t.client.Get(path) + + getChan := make(chan interface{}, 1) + + go func() { + resp, err := t.client.Get(path) + if err == nil { + getChan <- resp + } else { + getChan <- err + } + }() + + return t.waitResponse(getChan) +} + +func (t transporter) waitResponse(responseChan chan interface{}) (*http.Response, error) { + + timeoutChan := time.After(t.timeout) + + select { + case <-timeoutChan: + return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout) + + case r := <-responseChan: + switch r := r.(type) { + case error: + return nil, r + + case *http.Response: + return r, nil + + } + } + + // for complier + return nil, nil } diff --git a/transporter_test.go b/transporter_test.go new file mode 100644 index 000000000..88fefdcf7 --- /dev/null +++ b/transporter_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "crypto/tls" + "testing" + "time" +) + +func TestTransporterTimeout(t *testing.T) { + + conf := tls.Config{} + + ts := newTransporter("http", conf, time.Second) + + _, err := ts.Get("http://127.0.0.2:7000") + if err == nil || err.Error() != "Wait Response Timeout: 1s" { + t.Fatal("timeout error: ", err.Error()) + } + + _, err = ts.Post("http://127.0.0.2:7000", nil) + if err == nil || err.Error() != "Wait Response Timeout: 1s" { + t.Fatal("timeout error: ", err.Error()) + } + + _, err = ts.Get("http://www.google.com") + if err != nil { + t.Fatal("get error") + } + + _, err = ts.Post("http://www.google.com", nil) + if err != nil { + t.Fatal("post error") + } + +} From 0c39971363dcb3d84ed93b4f070a00f9c86d2561 Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Mon, 2 Sep 2013 16:50:53 -0400 Subject: [PATCH 10/18] change default heart beat to electionTimtout --- raft_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_server.go b/raft_server.go index fa3cb3a39..fb1294840 100644 --- a/raft_server.go +++ b/raft_server.go @@ -30,7 +30,7 @@ var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.DefaultHeartbeatTimeout) + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.electionTimeout) // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) From fec65d8717d6667e11d3c785c4e7a8516c669e3a Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Mon, 2 Sep 2013 17:05:35 -0400 Subject: [PATCH 11/18] change DefaultHeartbeat to DefaultElectionTimeout modified: raft_server.go modified: transporter_test.go --- raft_server.go | 4 ++-- transporter_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/raft_server.go b/raft_server.go index fb1294840..aeaa377a4 100644 --- a/raft_server.go +++ b/raft_server.go @@ -30,8 +30,8 @@ var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.electionTimeout) - + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.DefaultElectionTimeout) + // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) diff --git a/transporter_test.go b/transporter_test.go index 88fefdcf7..7dec4ea2e 100644 --- a/transporter_test.go +++ b/transporter_test.go @@ -12,12 +12,12 @@ func TestTransporterTimeout(t *testing.T) { ts := newTransporter("http", conf, time.Second) - _, err := ts.Get("http://127.0.0.2:7000") + _, err := ts.Get("http://google.com:9999") // it doesn't exisit if err == nil || err.Error() != "Wait Response Timeout: 1s" { - t.Fatal("timeout error: ", err.Error()) + t.Fatal("timeout error: ", err.Error( )) } - _, err = ts.Post("http://127.0.0.2:7000", nil) + _, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit if err == nil || err.Error() != "Wait Response Timeout: 1s" { t.Fatal("timeout error: ", err.Error()) } From 8eaa9500e90e39cc223a0611d638c2869e2107cc Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Mon, 2 Sep 2013 19:54:46 -0400 Subject: [PATCH 12/18] change heartbeattimeout to electiontimeout --- raft_server.go | 7 +++-- transporter.go | 67 ++++++++++++++++++++------------------------- transporter_test.go | 1 + 3 files changed, 35 insertions(+), 40 deletions(-) diff --git a/raft_server.go b/raft_server.go index aeaa377a4..c123d5c80 100644 --- a/raft_server.go +++ b/raft_server.go @@ -30,7 +30,7 @@ var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.DefaultElectionTimeout) + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) @@ -169,7 +169,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { // getVersion fetches the raft version of a peer. This works for now but we // will need to do something more sophisticated later when we allow mixed // version clusters. -func getVersion(t transporter, versionURL url.URL) (string, error) { +func getVersion(t *transporter, versionURL url.URL) (string, error) { resp, err := t.Get(versionURL.String()) if err != nil { @@ -198,6 +198,7 @@ func joinCluster(cluster []string) bool { if _, ok := err.(etcdErr.Error); ok { fatal(err) } + debugf("cannot join to cluster via machine %s %s", machine, err) } } @@ -209,7 +210,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok - t, _ := r.Transporter().(transporter) + t, _ := r.Transporter().(*transporter) // Our version must match the leaders version versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} diff --git a/transporter.go b/transporter.go index 66a179791..0a871ad09 100644 --- a/transporter.go +++ b/transporter.go @@ -18,10 +18,16 @@ type transporter struct { timeout time.Duration } +// response struct +type transporterResponse struct { + resp *http.Response + err error +} + // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) transporter { +func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter { t := transporter{} tr := &http.Transport{ @@ -36,7 +42,7 @@ func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) tr t.client = &http.Client{Transport: tr} t.timeout = timeout - return t + return &t } // Dial with timeout @@ -45,7 +51,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { } // Sends AppendEntries RPCs to a peer when the server is the leader. -func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { +func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -72,7 +78,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe } // Sends RequestVote RPCs to a peer when the server is the candidate. -func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { +func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { var rvrsp *raft.RequestVoteResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -98,7 +104,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { +func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { var aersp *raft.SnapshotResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -126,7 +132,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { +func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -153,41 +159,35 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft } // Send server side POST request -func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - - postChan := make(chan interface{}, 1) +func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) { + c := make(chan *transporterResponse, 1) + go func() { - resp, err := t.client.Post(path, "application/json", body) - if err == nil { - postChan <- resp - } else { - postChan <- err - } + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Post(path, "application/json", body) + c <-tr }() - return t.waitResponse(postChan) + return t.waitResponse(c) } // Send server side GET request -func (t transporter) Get(path string) (*http.Response, error) { - - getChan := make(chan interface{}, 1) +func (t *transporter) Get(path string) (*http.Response, error) { + c := make(chan *transporterResponse, 1) + go func() { - resp, err := t.client.Get(path) - if err == nil { - getChan <- resp - } else { - getChan <- err - } + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Get(path) + c <-tr }() - return t.waitResponse(getChan) + return t.waitResponse(c) } -func (t transporter) waitResponse(responseChan chan interface{}) (*http.Response, error) { +func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) { timeoutChan := time.After(t.timeout) @@ -195,17 +195,10 @@ func (t transporter) waitResponse(responseChan chan interface{}) (*http.Response case <-timeoutChan: return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout) - case r := <-responseChan: - switch r := r.(type) { - case error: - return nil, r - - case *http.Response: - return r, nil - - } + case r:= <-responseChan: + return r.resp, r.err } // for complier return nil, nil -} +} \ No newline at end of file diff --git a/transporter_test.go b/transporter_test.go index 7dec4ea2e..ac1ecc975 100644 --- a/transporter_test.go +++ b/transporter_test.go @@ -12,6 +12,7 @@ func TestTransporterTimeout(t *testing.T) { ts := newTransporter("http", conf, time.Second) + ts.Get("http://google.com") _, err := ts.Get("http://google.com:9999") // it doesn't exisit if err == nil || err.Error() != "Wait Response Timeout: 1s" { t.Fatal("timeout error: ", err.Error( )) From 90d7ebec47967e76c4986c71f2a5b80e78b7b3c1 Mon Sep 17 00:00:00 2001 From: Ivan7702 Date: Mon, 2 Sep 2013 19:58:45 -0400 Subject: [PATCH 13/18] gofmt --- raft_server.go | 4 ++-- transporter.go | 14 +++++++------- transporter_test.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/raft_server.go b/raft_server.go index c123d5c80..c8b86021c 100644 --- a/raft_server.go +++ b/raft_server.go @@ -31,7 +31,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Create transporter for raft raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) - + // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) @@ -198,7 +198,7 @@ func joinCluster(cluster []string) bool { if _, ok := err.(etcdErr.Error); ok { fatal(err) } - + debugf("cannot join to cluster via machine %s %s", machine, err) } } diff --git a/transporter.go b/transporter.go index 0a871ad09..b4564742c 100644 --- a/transporter.go +++ b/transporter.go @@ -21,7 +21,7 @@ type transporter struct { // response struct type transporterResponse struct { resp *http.Response - err error + err error } // Create transporter using by raft server @@ -162,11 +162,11 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) { c := make(chan *transporterResponse, 1) - + go func() { tr := new(transporterResponse) tr.resp, tr.err = t.client.Post(path, "application/json", body) - c <-tr + c <- tr }() return t.waitResponse(c) @@ -177,11 +177,11 @@ func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) func (t *transporter) Get(path string) (*http.Response, error) { c := make(chan *transporterResponse, 1) - + go func() { tr := new(transporterResponse) tr.resp, tr.err = t.client.Get(path) - c <-tr + c <- tr }() return t.waitResponse(c) @@ -195,10 +195,10 @@ func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*htt case <-timeoutChan: return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout) - case r:= <-responseChan: + case r := <-responseChan: return r.resp, r.err } // for complier return nil, nil -} \ No newline at end of file +} diff --git a/transporter_test.go b/transporter_test.go index ac1ecc975..e440a094f 100644 --- a/transporter_test.go +++ b/transporter_test.go @@ -15,7 +15,7 @@ func TestTransporterTimeout(t *testing.T) { ts.Get("http://google.com") _, err := ts.Get("http://google.com:9999") // it doesn't exisit if err == nil || err.Error() != "Wait Response Timeout: 1s" { - t.Fatal("timeout error: ", err.Error( )) + t.Fatal("timeout error: ", err.Error()) } _, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit From 2f5015552e8db84d04cb75b0adb3f7b9d3be5c56 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 2 Sep 2013 22:17:39 -0700 Subject: [PATCH 14/18] feat(etcd_handlers): enable CORS When developing or using web frontends for etcd it will be necessary to enable Cross-Origin Resource Sharing. Add a flag that lets the user enable this feature via a whitelist. --- etcd.go | 27 +++++++++++++++++++++++++++ etcd_handlers.go | 19 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/etcd.go b/etcd.go index 46546e8cc..5a0b4dee6 100644 --- a/etcd.go +++ b/etcd.go @@ -3,9 +3,11 @@ package main import ( "crypto/tls" "flag" + "fmt" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "io/ioutil" + "net/url" "os" "strings" "time" @@ -40,6 +42,9 @@ var ( maxClusterSize int cpuprofile string + + cors string + corsList map[string]bool ) func init() { @@ -77,6 +82,8 @@ func init() { flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster") flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") + + flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')") } const ( @@ -152,6 +159,8 @@ func main() { raft.SetLogLevel(raft.Debug) } + parseCorsFlag() + if machines != "" { cluster = strings.Split(machines, ",") } else if machinesFile != "" { @@ -206,3 +215,21 @@ func main() { e.ListenAndServe() } + +// parseCorsFlag gathers up the cors whitelist and puts it into the corsList. +func parseCorsFlag() { + if cors != "" { + corsList = make(map[string]bool) + list := strings.Split(cors, ",") + for _, v := range list { + fmt.Println(v) + if v != "*" { + _, err := url.Parse(v) + if err != nil { + panic(fmt.Sprintf("bad cors url: %s", err)) + } + } + corsList[v] = true + } + } +} diff --git a/etcd_handlers.go b/etcd_handlers.go index 60e7b35b5..02cb2316a 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -29,7 +29,26 @@ func NewEtcdMuxer() *http.ServeMux { type errorHandler func(http.ResponseWriter, *http.Request) error +// addCorsHeader parses the request Origin header and loops through the user +// provided allowed origins and sets the Access-Control-Allow-Origin header if +// there is a match. +func addCorsHeader(w http.ResponseWriter, r *http.Request) { + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } + + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } +} + func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + addCorsHeader(w, r) if e := fn(w, r); e != nil { if etcdErr, ok := e.(etcdErr.Error); ok { debug("Return error: ", etcdErr.Error()) From b300d2877ee97467a89769fd3e63829035bc404a Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 4 Sep 2013 17:31:27 -0700 Subject: [PATCH 15/18] README: add justinsb/jetcd project --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index cf883b0d1..4d4a0d765 100644 --- a/README.md +++ b/README.md @@ -465,6 +465,10 @@ If you are using SSL for server to server communication, you must use it on all - [go-etcd](https://github.com/coreos/go-etcd) +**Java libraries** + +- [justinsb/jetcd](https://github.com/justinsb/jetcd) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) From adbcbefe92087c30736652bef3036a1e81ac5a13 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Thu, 5 Sep 2013 08:42:30 -0700 Subject: [PATCH 16/18] feat(README): add python library transitorykris/etcd-py --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 4d4a0d765..a4ad5f1d4 100644 --- a/README.md +++ b/README.md @@ -469,6 +469,10 @@ If you are using SSL for server to server communication, you must use it on all - [justinsb/jetcd](https://github.com/justinsb/jetcd) +**Python libraries** + +- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) From b366f1044688e3ee8cacc09c4fd1b9ffeb399167 Mon Sep 17 00:00:00 2001 From: Geoff Hayes Date: Thu, 5 Sep 2013 21:08:43 -0700 Subject: [PATCH 17/18] Blank prevValue in POST should be interpreted as a blank test-and-set, not a normal set --- etcd_handlers.go | 12 ++++++------ etcd_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index 02cb2316a..ac523cc89 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -92,15 +92,15 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) - value := req.FormValue("value") + req.ParseForm() + + value := req.Form.Get("value") if len(value) == 0 { return etcdErr.NewError(200, "Set") } - prevValue := req.FormValue("prevValue") - - strDuration := req.FormValue("ttl") + strDuration := req.Form.Get("ttl") expireTime, err := durationToExpireTime(strDuration) @@ -108,11 +108,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { return etcdErr.NewError(202, "Set") } - if len(prevValue) != 0 { + if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { command := &TestAndSetCommand{ Key: key, Value: value, - PrevValue: prevValue, + PrevValue: prevValueArr[0], ExpireTime: expireTime, } diff --git a/etcd_test.go b/etcd_test.go index e61e7e4a8..61db07023 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -54,6 +54,32 @@ func TestSingleNode(t *testing.T) { } t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) } + + // Add a test-and-set test + + // First, we'll test we can change the value if we get it write + result, match, err := c.TestAndSet("foo", "bar", "foobar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Next, we'll make sure we can't set it without the correct prior value + _, _, err = c.TestAndSet("foo", "bar", "foofoo", 100) + + if err == nil { + t.Fatalf("Set 4 expecting error when setting key with incorrect previous value") + } + + // Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match + _, _, err = c.TestAndSet("foo", "", "barbar", 100) + + if err == nil { + t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value") + } } // TestInternalVersionFail will ensure that etcd does not come up if the internal raft From a623effaf14bb3a035f749df0ec180e0eb6ebb4e Mon Sep 17 00:00:00 2001 From: Diwaker Gupta Date: Fri, 6 Sep 2013 19:34:22 -0700 Subject: [PATCH 18/18] Add another Java library for etcd https://github.com/diwakergupta/jetcd --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index a4ad5f1d4..4514c4edf 100644 --- a/README.md +++ b/README.md @@ -468,6 +468,8 @@ If you are using SSL for server to server communication, you must use it on all **Java libraries** - [justinsb/jetcd](https://github.com/justinsb/jetcd) +- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd) + **Python libraries**