diff --git a/README.md b/README.md index f6a4aa86b..4514c4edf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # etcd +README version 0.1.0 [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd) @@ -272,7 +273,7 @@ Next, lets configure etcd to use this keypair: You can now test the configuration using https: ```sh -curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar +curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v ``` You should be able to see the handshake succeed. @@ -302,7 +303,7 @@ We can also do authentication using CA certs. The clients will provide their cer Try the same request to this server: ```sh -curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar +curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v ``` The request should be rejected by the server. @@ -347,6 +348,9 @@ We use -s to specify server port and -c to specify client port and -d to specify ./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d nodes/node1 -n node1 ``` +**Note:** If you want to run etcd on external IP address and still have access locally you need to add `-cl 0.0.0.0` so that it will listen on both external and localhost addresses. +A similar argument `-sl` is used to setup the listening address for the server port. + Let the join two more nodes to this cluster using the -C argument: ```sh @@ -363,7 +367,7 @@ curl -L http://127.0.0.1:4001/v1/machines We should see there are three nodes in the cluster ``` -0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003 +http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003 ``` The machine list is also available via this API: @@ -373,7 +377,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines ``` ```json -[{"action":"GET","key":"/machines/node1","value":"0.0.0.0,7001,4001","index":4},{"action":"GET","key":"/machines/node3","value":"0.0.0.0,7002,4002","index":4},{"action":"GET","key":"/machines/node4","value":"0.0.0.0,7003,4003","index":4}] +[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003","index":4}] ``` The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```. @@ -386,7 +390,7 @@ curl -L http://127.0.0.1:4001/v1/leader The first server we set up should be the leader, if it has not dead during these commands. ``` -0.0.0.0:7001 +http://127.0.0.1:7001 ``` Now we can do normal SET and GET operations on keys as we explored earlier. @@ -414,7 +418,13 @@ curl -L http://127.0.0.1:4001/v1/leader ``` ``` -0.0.0.0:7002 or 0.0.0.0:7003 +http://127.0.0.1:7002 +``` + +or + +``` +http://127.0.0.1:7003 ``` You should be able to see this: @@ -455,6 +465,16 @@ If you are using SSL for server to server communication, you must use it on all - [go-etcd](https://github.com/coreos/go-etcd) +**Java libraries** + +- [justinsb/jetcd](https://github.com/justinsb/jetcd) +- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd) + + +**Python libraries** + +- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) diff --git a/etcd.go b/etcd.go index 6b6c0dc35..7149a1af2 100644 --- a/etcd.go +++ b/etcd.go @@ -3,7 +3,9 @@ package main import ( "crypto/tls" "flag" + "fmt" "io/ioutil" + "net/url" "os" "strings" "time" @@ -41,6 +43,9 @@ var ( maxClusterSize int cpuprofile string + + cors string + corsList map[string]bool ) func init() { @@ -51,8 +56,10 @@ func init() { flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)") - flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication") - flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication") + flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the advertised public hostname:port for etcd client communication") + flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the advertised public hostname:port for raft server communication") + flag.StringVar(&argInfo.EtcdListenHost, "cl", "", "the listening hostname for etcd client communication (defaults to advertised ip)") + flag.StringVar(&argInfo.RaftListenHost, "sl", "", "the listening hostname for raft server communication (defaults to advertised ip)") flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface") flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile") @@ -76,6 +83,8 @@ func init() { flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster") flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") + + flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')") } const ( @@ -108,6 +117,9 @@ type Info struct { EtcdURL string `json:"etcdURL"` WebURL string `json:"webURL"` + RaftListenHost string `json:"raftListenHost"` + EtcdListenHost string `json:"etcdListenHost"` + RaftTLS TLSInfo `json:"raftTLS"` EtcdTLS TLSInfo `json:"etcdTLS"` } @@ -148,6 +160,8 @@ func main() { raft.SetLogLevel(raft.Debug) } + parseCorsFlag() + if machines != "" { cluster = strings.Split(machines, ",") } else if machinesFile != "" { @@ -179,6 +193,9 @@ func main() { argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme) argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http") + argInfo.RaftListenHost = sanitizeListenHost(argInfo.RaftListenHost, argInfo.RaftURL) + argInfo.EtcdListenHost = sanitizeListenHost(argInfo.EtcdListenHost, argInfo.EtcdURL) + // Read server info from file or grab it from user. if err := os.MkdirAll(dirPath, 0744); err != nil { fatalf("Unable to create path: %s", err) @@ -191,11 +208,29 @@ func main() { snapConf = newSnapshotConf() // Create etcd and raft server - e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS) - r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS) + e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS) + r = newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) startWebInterface() r.ListenAndServe() e.ListenAndServe() } + +// parseCorsFlag gathers up the cors whitelist and puts it into the corsList. +func parseCorsFlag() { + if cors != "" { + corsList = make(map[string]bool) + list := strings.Split(cors, ",") + for _, v := range list { + fmt.Println(v) + if v != "*" { + _, err := url.Parse(v) + if err != nil { + panic(fmt.Sprintf("bad cors url: %s", err)) + } + } + corsList[v] = true + } + } +} diff --git a/etcd_handlers.go b/etcd_handlers.go index e074319f8..b7cf0e791 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -30,7 +30,26 @@ func NewEtcdMuxer() *http.ServeMux { type errorHandler func(http.ResponseWriter, *http.Request) error +// addCorsHeader parses the request Origin header and loops through the user +// provided allowed origins and sets the Access-Control-Allow-Origin header if +// there is a match. +func addCorsHeader(w http.ResponseWriter, r *http.Request) { + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } + + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } +} + func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + addCorsHeader(w, r) if e := fn(w, r); e != nil { if etcdErr, ok := e.(etcdErr.Error); ok { debug("Return error: ", etcdErr.Error()) @@ -74,15 +93,15 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) - value := req.FormValue("value") + req.ParseForm() + + value := req.Form.Get("value") if len(value) == 0 { return etcdErr.NewError(200, "Set") } - prevValue := req.FormValue("prevValue") - - strDuration := req.FormValue("ttl") + strDuration := req.Form.Get("ttl") expireTime, err := durationToExpireTime(strDuration) @@ -90,11 +109,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { return etcdErr.NewError(202, "Set") } - if len(prevValue) != 0 { + if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { command := &TestAndSetCommand{ Key: key, Value: value, - PrevValue: prevValue, + PrevValue: prevValueArr[0], ExpireTime: expireTime, } diff --git a/etcd_server.go b/etcd_server.go index 51ff1e9e8..2cef01558 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -2,7 +2,6 @@ package main import ( "net/http" - "net/url" ) type etcdServer struct { @@ -15,18 +14,12 @@ type etcdServer struct { var e *etcdServer -func newEtcdServer(name string, urlStr string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer { - u, err := url.Parse(urlStr) - - if err != nil { - fatalf("invalid url '%s': %s", e.url, err) - } - +func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer { return &etcdServer{ Server: http.Server{ Handler: NewEtcdMuxer(), TLSConfig: &tlsConf.Server, - Addr: u.Host, + Addr: listenHost, }, name: name, url: urlStr, diff --git a/etcd_test.go b/etcd_test.go index caa6af84d..32c320ad6 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -55,6 +55,32 @@ func TestSingleNode(t *testing.T) { } t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) } + + // Add a test-and-set test + + // First, we'll test we can change the value if we get it write + result, match, err := c.TestAndSet("foo", "bar", "foobar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Next, we'll make sure we can't set it without the correct prior value + _, _, err = c.TestAndSet("foo", "bar", "foofoo", 100) + + if err == nil { + t.Fatalf("Set 4 expecting error when setting key with incorrect previous value") + } + + // Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match + _, _, err = c.TestAndSet("foo", "", "barbar", 100) + + if err == nil { + t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value") + } } // TestInternalVersionFail will ensure that etcd does not come up if the internal raft diff --git a/raft_server.go b/raft_server.go index 3d11d0899..262ffc972 100644 --- a/raft_server.go +++ b/raft_server.go @@ -21,6 +21,7 @@ type raftServer struct { joinIndex uint64 name string url string + listenHost string tlsConf *TLSConfig tlsInfo *TLSInfo peersStats map[string]*raftPeerStats @@ -29,10 +30,10 @@ type raftServer struct { var r *raftServer -func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { +func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client) + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) @@ -147,15 +148,14 @@ func startAsFollower() { // Start to listen and response raft command func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { - u, _ := url.Parse(r.url) - infof("raft server [%s:%s]", r.name, u) + infof("raft server [%s:%s]", r.name, r.listenHost) raftMux := http.NewServeMux() server := &http.Server{ Handler: raftMux, TLSConfig: &tlsConf, - Addr: u.Host, + Addr: r.listenHost, } // internal commands @@ -181,7 +181,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { // getVersion fetches the raft version of a peer. This works for now but we // will need to do something more sophisticated later when we allow mixed // version clusters. -func getVersion(t transporter, versionURL url.URL) (string, error) { +func getVersion(t *transporter, versionURL url.URL) (string, error) { resp, err := t.Get(versionURL.String()) if err != nil { @@ -210,6 +210,7 @@ func joinCluster(cluster []string) bool { if _, ok := err.(etcdErr.Error); ok { fatal(err) } + debugf("cannot join to cluster via machine %s %s", machine, err) } } @@ -221,7 +222,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok - t, _ := r.Transporter().(transporter) + t, _ := r.Transporter().(*transporter) // Our version must match the leaders version versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} diff --git a/raft_stats.go b/raft_stats.go index 3794ef52c..06687d89f 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -168,7 +168,7 @@ func (q *statsQueue) Rate() (float64, float64) { return 0, 0 } - if time.Now.Sub(back.Time()) > time.Second { + if time.Now().Sub(back.Time()) > time.Second { q.Clear() return 0, 0 } diff --git a/transporter.go b/transporter.go index e82d3668c..461741ce6 100644 --- a/transporter.go +++ b/transporter.go @@ -15,13 +15,20 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { - client *http.Client + client *http.Client + timeout time.Duration +} + +// response struct +type transporterResponse struct { + resp *http.Response + err error } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config) transporter { +func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter { t := transporter{} tr := &http.Transport{ @@ -34,8 +41,9 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter { } t.client = &http.Client{Transport: tr} + t.timeout = timeout - return t + return &t } // Dial with timeout @@ -44,7 +52,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { } // Sends AppendEntries RPCs to a peer when the server is the leader. -func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { +func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer @@ -92,7 +100,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe } // Sends RequestVote RPCs to a peer when the server is the candidate. -func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { +func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { var rvrsp *raft.RequestVoteResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -118,7 +126,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { +func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { var aersp *raft.SnapshotResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -146,7 +154,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { +func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -173,11 +181,46 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft } // Send server side POST request -func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - return t.client.Post(path, "application/json", body) +func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) { + + c := make(chan *transporterResponse, 1) + + go func() { + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Post(path, "application/json", body) + c <- tr + }() + + return t.waitResponse(c) + } // Send server side GET request -func (t transporter) Get(path string) (*http.Response, error) { - return t.client.Get(path) +func (t *transporter) Get(path string) (*http.Response, error) { + + c := make(chan *transporterResponse, 1) + + go func() { + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Get(path) + c <- tr + }() + + return t.waitResponse(c) +} + +func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) { + + timeoutChan := time.After(t.timeout) + + select { + case <-timeoutChan: + return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout) + + case r := <-responseChan: + return r.resp, r.err + } + + // for complier + return nil, nil } diff --git a/transporter_test.go b/transporter_test.go new file mode 100644 index 000000000..e440a094f --- /dev/null +++ b/transporter_test.go @@ -0,0 +1,36 @@ +package main + +import ( + "crypto/tls" + "testing" + "time" +) + +func TestTransporterTimeout(t *testing.T) { + + conf := tls.Config{} + + ts := newTransporter("http", conf, time.Second) + + ts.Get("http://google.com") + _, err := ts.Get("http://google.com:9999") // it doesn't exisit + if err == nil || err.Error() != "Wait Response Timeout: 1s" { + t.Fatal("timeout error: ", err.Error()) + } + + _, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit + if err == nil || err.Error() != "Wait Response Timeout: 1s" { + t.Fatal("timeout error: ", err.Error()) + } + + _, err = ts.Get("http://www.google.com") + if err != nil { + t.Fatal("get error") + } + + _, err = ts.Post("http://www.google.com", nil) + if err != nil { + t.Fatal("post error") + } + +} diff --git a/util.go b/util.go index a7745b0d5..579f1c675 100644 --- a/util.go +++ b/util.go @@ -107,6 +107,27 @@ func sanitizeURL(host string, defaultScheme string) string { return p.String() } +// sanitizeListenHost cleans up the ListenHost parameter and appends a port +// if necessary based on the advertised port. +func sanitizeListenHost(listen string, advertised string) string { + aurl, err := url.Parse(advertised) + if err != nil { + fatal(err) + } + + ahost, aport, err := net.SplitHostPort(aurl.Host) + if err != nil { + fatal(err) + } + + // If the listen host isn't set use the advertised host + if listen == "" { + listen = ahost + } + + return net.JoinHostPort(listen, aport) +} + func check(err error) { if err != nil { fatal(err)