From 435611cf0d50cd6ee0eae7486fd3719accf59aa1 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Sat, 25 Oct 2014 10:54:41 -0700 Subject: [PATCH] etcdserver/etcdhttp: break apart http.go --- etcdserver/etcdhttp/client.go | 482 ++++++++ etcdserver/etcdhttp/client_test.go | 1503 +++++++++++++++++++++++ etcdserver/etcdhttp/http.go | 532 --------- etcdserver/etcdhttp/http_test.go | 1780 +--------------------------- etcdserver/etcdhttp/peer.go | 111 ++ etcdserver/etcdhttp/peer_test.go | 264 +++++ 6 files changed, 2389 insertions(+), 2283 deletions(-) create mode 100644 etcdserver/etcdhttp/client.go create mode 100644 etcdserver/etcdhttp/client_test.go create mode 100644 etcdserver/etcdhttp/peer.go create mode 100644 etcdserver/etcdhttp/peer_test.go diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go new file mode 100644 index 000000000..8c030fa75 --- /dev/null +++ b/etcdserver/etcdhttp/client.go @@ -0,0 +1,482 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdhttp + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/store" + "github.com/coreos/etcd/version" +) + +const ( + keysPrefix = "/v2/keys" + deprecatedMachinesPrefix = "/v2/machines" + adminMembersPrefix = "/v2/admin/members/" + statsPrefix = "/v2/stats" + versionPrefix = "/version" +) + +// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. +func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { + sh := &serverHandler{ + server: server, + clusterInfo: server.Cluster, + stats: server, + timer: server, + timeout: defaultServerTimeout, + clock: clockwork.NewRealClock(), + } + mux := http.NewServeMux() + mux.HandleFunc(keysPrefix, sh.serveKeys) + mux.HandleFunc(keysPrefix+"/", sh.serveKeys) + mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats) + mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats) + mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats) + mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines) + mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers) + mux.HandleFunc(versionPrefix, sh.serveVersion) + mux.HandleFunc("/", http.NotFound) + return mux +} + +func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + + rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock()) + if err != nil { + writeError(w, err) + return + } + + resp, err := h.server.Do(ctx, rr) + if err != nil { + err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) + writeError(w, err) + return + } + + switch { + case resp.Event != nil: + if err := writeKeyEvent(w, resp.Event, h.timer); err != nil { + // Should never be reached + log.Printf("error writing event: %v", err) + } + case resp.Watcher != nil: + ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) + defer cancel() + handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) + default: + writeError(w, errors.New("received response with no Event/Watcher!")) + } +} + +// serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'. +func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET", "HEAD") { + return + } + endpoints := h.clusterInfo.ClientURLs() + w.Write([]byte(strings.Join(endpoints, ", "))) +} + +func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { + return + } + ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) + defer cancel() + + switch r.Method { + case "GET": + if s := strings.TrimPrefix(r.URL.Path, adminMembersPrefix); s != "" { + http.NotFound(w, r) + return + } + ms := struct { + Members []*etcdserver.Member `json:"members"` + }{ + Members: h.clusterInfo.Members(), + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(ms); err != nil { + log.Printf("etcdhttp: %v", err) + } + case "POST": + ctype := r.Header.Get("Content-Type") + if ctype != "application/json" { + http.Error(w, fmt.Sprintf("bad Content-Type %s, accept application/json", ctype), http.StatusBadRequest) + return + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + raftAttr := etcdserver.RaftAttributes{} + if err := json.Unmarshal(b, &raftAttr); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + validURLs, err := types.NewURLs(raftAttr.PeerURLs) + if err != nil { + http.Error(w, "bad peer urls", http.StatusBadRequest) + return + } + now := h.clock.Now() + m := etcdserver.NewMember("", validURLs, "", &now) + if err := h.server.AddMember(ctx, *m); err != nil { + log.Printf("etcdhttp: error adding node %x: %v", m.ID, err) + writeError(w, err) + return + } + log.Printf("etcdhttp: added node %x with peer urls %v", m.ID, raftAttr.PeerURLs) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + if err := json.NewEncoder(w).Encode(m); err != nil { + log.Printf("etcdhttp: %v", err) + } + case "DELETE": + idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix) + id, err := strconv.ParseUint(idStr, 16, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + log.Printf("etcdhttp: remove node %x", id) + if err := h.server.RemoveMember(ctx, id); err != nil { + log.Printf("etcdhttp: error removing node %x: %v", id, err) + writeError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + +func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET") { + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(h.stats.StoreStats()) +} + +func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET") { + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(h.stats.SelfStats()) +} + +func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET") { + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(h.stats.LeaderStats()) +} + +func (h serverHandler) serveVersion(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET") { + return + } + w.Write([]byte("etcd " + version.Version)) +} + +// parseKeyRequest converts a received http.Request on keysPrefix to +// a server Request, performing validation of supplied fields as appropriate. +// If any validation fails, an empty Request and non-nil error is returned. +func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) { + emptyReq := etcdserverpb.Request{} + + err := r.ParseForm() + if err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidForm, + err.Error(), + ) + } + + if !strings.HasPrefix(r.URL.Path, keysPrefix) { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidForm, + "incorrect key prefix", + ) + } + p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) + + var pIdx, wIdx uint64 + if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeIndexNaN, + `invalid value for "prevIndex"`, + ) + } + if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeIndexNaN, + `invalid value for "waitIndex"`, + ) + } + + var rec, sort, wait, dir, quorum, stream bool + if rec, err = getBool(r.Form, "recursive"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "recursive"`, + ) + } + if sort, err = getBool(r.Form, "sorted"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "sorted"`, + ) + } + if wait, err = getBool(r.Form, "wait"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "wait"`, + ) + } + // TODO(jonboulle): define what parameters dir is/isn't compatible with? + if dir, err = getBool(r.Form, "dir"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "dir"`, + ) + } + if quorum, err = getBool(r.Form, "quorum"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "quorum"`, + ) + } + if stream, err = getBool(r.Form, "stream"); err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `invalid value for "stream"`, + ) + } + + if wait && r.Method != "GET" { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `"wait" can only be used with GET requests`, + ) + } + + pV := r.FormValue("prevValue") + if _, ok := r.Form["prevValue"]; ok && pV == "" { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + `"prevValue" cannot be empty`, + ) + } + + // TTL is nullable, so leave it null if not specified + // or an empty string + var ttl *uint64 + if len(r.FormValue("ttl")) > 0 { + i, err := getUint64(r.Form, "ttl") + if err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeTTLNaN, + `invalid value for "ttl"`, + ) + } + ttl = &i + } + + // prevExist is nullable, so leave it null if not specified + var pe *bool + if _, ok := r.Form["prevExist"]; ok { + bv, err := getBool(r.Form, "prevExist") + if err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + "invalid value for prevExist", + ) + } + pe = &bv + } + + rr := etcdserverpb.Request{ + ID: id, + Method: r.Method, + Path: p, + Val: r.FormValue("value"), + Dir: dir, + PrevValue: pV, + PrevIndex: pIdx, + PrevExist: pe, + Wait: wait, + Since: wIdx, + Recursive: rec, + Sorted: sort, + Quorum: quorum, + Stream: stream, + } + + if pe != nil { + rr.PrevExist = pe + } + + // Null TTL is equivalent to unset Expiration + if ttl != nil { + expr := time.Duration(*ttl) * time.Second + rr.Expiration = clock.Now().Add(expr).UnixNano() + } + + return rr, nil +} + +// writeKeyEvent trims the prefix of key path in a single Event under +// StoreKeysPrefix, serializes it and writes the resulting JSON to the given +// ResponseWriter, along with the appropriate headers. +func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error { + if ev == nil { + return errors.New("cannot write empty Event!") + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) + w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) + w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) + + if ev.IsCreated() { + w.WriteHeader(http.StatusCreated) + } + + ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) + return json.NewEncoder(w).Encode(ev) +} + +func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { + defer wa.Remove() + ech := wa.EventChan() + var nch <-chan bool + if x, ok := w.(http.CloseNotifier); ok { + nch = x.CloseNotify() + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) + w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) + w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) + w.WriteHeader(http.StatusOK) + + // Ensure headers are flushed early, in case of long polling + w.(http.Flusher).Flush() + + for { + select { + case <-nch: + // Client closed connection. Nothing to do. + return + case <-ctx.Done(): + // Timed out. net/http will close the connection for us, so nothing to do. + return + case ev, ok := <-ech: + if !ok { + // If the channel is closed this may be an indication of + // that notifications are much more than we are able to + // send to the client in time. Then we simply end streaming. + return + } + ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) + if err := json.NewEncoder(w).Encode(ev); err != nil { + // Should never be reached + log.Printf("error writing event: %v\n", err) + return + } + if !stream { + return + } + w.(http.Flusher).Flush() + } + } +} + +func trimEventPrefix(ev *store.Event, prefix string) *store.Event { + if ev == nil { + return nil + } + ev.Node = trimNodeExternPrefix(ev.Node, prefix) + ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix) + return ev +} + +func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern { + if n == nil { + return nil + } + n.Key = strings.TrimPrefix(n.Key, prefix) + for _, nn := range n.Nodes { + nn = trimNodeExternPrefix(nn, prefix) + } + return n +} + +func trimErrorPrefix(err error, prefix string) error { + if e, ok := err.(*etcdErr.Error); ok { + e.Cause = strings.TrimPrefix(e.Cause, prefix) + } + return err +} + +// getUint64 extracts a uint64 by the given key from a Form. If the key does +// not exist in the form, 0 is returned. If the key exists but the value is +// badly formed, an error is returned. If multiple values are present only the +// first is considered. +func getUint64(form url.Values, key string) (i uint64, err error) { + if vals, ok := form[key]; ok { + i, err = strconv.ParseUint(vals[0], 10, 64) + } + return +} + +// getBool extracts a bool by the given key from a Form. If the key does not +// exist in the form, false is returned. If the key exists but the value is +// badly formed, an error is returned. If multiple values are present only the +// first is considered. +func getBool(form url.Values, key string) (b bool, err error) { + if vals, ok := form[key]; ok { + b, err = strconv.ParseBool(vals[0]) + } + return +} diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go new file mode 100644 index 000000000..4845f614d --- /dev/null +++ b/etcdserver/etcdhttp/client_test.go @@ -0,0 +1,1503 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdhttp + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "path" + "reflect" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" + "github.com/coreos/etcd/version" +) + +func mustMarshalEvent(t *testing.T, ev *store.Event) string { + b := new(bytes.Buffer) + if err := json.NewEncoder(b).Encode(ev); err != nil { + t.Fatalf("error marshalling event %#v: %v", ev, err) + } + return b.String() +} + +// mustNewForm takes a set of Values and constructs a PUT *http.Request, +// with a URL constructed from appending the given path to the standard keysPrefix +func mustNewForm(t *testing.T, p string, vals url.Values) *http.Request { + u := mustNewURL(t, path.Join(keysPrefix, p)) + req, err := http.NewRequest("PUT", u.String(), strings.NewReader(vals.Encode())) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + if err != nil { + t.Fatalf("error creating new request: %v", err) + } + return req +} + +// mustNewRequest takes a path, appends it to the standard keysPrefix, and constructs +// a GET *http.Request referencing the resulting URL +func mustNewRequest(t *testing.T, p string) *http.Request { + return mustNewMethodRequest(t, "GET", p) +} + +func mustNewMethodRequest(t *testing.T, m, p string) *http.Request { + return &http.Request{ + Method: m, + URL: mustNewURL(t, path.Join(keysPrefix, p)), + } +} + +type serverRecorder struct { + actions []action +} + +func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { + s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}}) + return etcdserver.Response{}, nil +} +func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error { + s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}}) + return nil +} +func (s *serverRecorder) Start() {} +func (s *serverRecorder) Stop() {} +func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}}) + return nil +} +func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { + s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}}) + return nil +} + +type action struct { + name string + params []interface{} +} + +// flushingRecorder provides a channel to allow users to block until the Recorder is Flushed() +type flushingRecorder struct { + *httptest.ResponseRecorder + ch chan struct{} +} + +func (fr *flushingRecorder) Flush() { + fr.ResponseRecorder.Flush() + fr.ch <- struct{}{} +} + +// resServer implements the etcd.Server interface for testing. +// It returns the given responsefrom any Do calls, and nil error +type resServer struct { + res etcdserver.Response +} + +func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { + return rs.res, nil +} +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } + +func boolp(b bool) *bool { return &b } + +type dummyRaftTimer struct{} + +func (drt dummyRaftTimer) Index() uint64 { return uint64(100) } +func (drt dummyRaftTimer) Term() uint64 { return uint64(5) } + +type dummyWatcher struct { + echan chan *store.Event + sidx uint64 +} + +func (w *dummyWatcher) EventChan() chan *store.Event { + return w.echan +} +func (w *dummyWatcher) StartIndex() uint64 { return w.sidx } +func (w *dummyWatcher) Remove() {} + +func TestBadParseRequest(t *testing.T) { + tests := []struct { + in *http.Request + wcode int + }{ + { + // parseForm failure + &http.Request{ + Body: nil, + Method: "PUT", + }, + etcdErr.EcodeInvalidForm, + }, + { + // bad key prefix + &http.Request{ + URL: mustNewURL(t, "/badprefix/"), + }, + etcdErr.EcodeInvalidForm, + }, + // bad values for prevIndex, waitIndex, ttl + { + mustNewForm(t, "foo", url.Values{"prevIndex": []string{"garbage"}}), + etcdErr.EcodeIndexNaN, + }, + { + mustNewForm(t, "foo", url.Values{"prevIndex": []string{"1.5"}}), + etcdErr.EcodeIndexNaN, + }, + { + mustNewForm(t, "foo", url.Values{"prevIndex": []string{"-1"}}), + etcdErr.EcodeIndexNaN, + }, + { + mustNewForm(t, "foo", url.Values{"waitIndex": []string{"garbage"}}), + etcdErr.EcodeIndexNaN, + }, + { + mustNewForm(t, "foo", url.Values{"waitIndex": []string{"??"}}), + etcdErr.EcodeIndexNaN, + }, + { + mustNewForm(t, "foo", url.Values{"ttl": []string{"-1"}}), + etcdErr.EcodeTTLNaN, + }, + // bad values for recursive, sorted, wait, prevExist, dir, stream + { + mustNewForm(t, "foo", url.Values{"recursive": []string{"hahaha"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"recursive": []string{"1234"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"recursive": []string{"?"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"sorted": []string{"?"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"sorted": []string{"x"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"wait": []string{"?!"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"wait": []string{"yes"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"prevExist": []string{"yes"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"prevExist": []string{"#2"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"dir": []string{"no"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"dir": []string{"file"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"quorum": []string{"no"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"quorum": []string{"file"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"stream": []string{"zzz"}}), + etcdErr.EcodeInvalidField, + }, + { + mustNewForm(t, "foo", url.Values{"stream": []string{"something"}}), + etcdErr.EcodeInvalidField, + }, + // prevValue cannot be empty + { + mustNewForm(t, "foo", url.Values{"prevValue": []string{""}}), + etcdErr.EcodeInvalidField, + }, + // wait is only valid with GET requests + { + mustNewMethodRequest(t, "HEAD", "foo?wait=true"), + etcdErr.EcodeInvalidField, + }, + // query values are considered + { + mustNewRequest(t, "foo?prevExist=wrong"), + etcdErr.EcodeInvalidField, + }, + { + mustNewRequest(t, "foo?ttl=wrong"), + etcdErr.EcodeTTLNaN, + }, + // but body takes precedence if both are specified + { + mustNewForm( + t, + "foo?ttl=12", + url.Values{"ttl": []string{"garbage"}}, + ), + etcdErr.EcodeTTLNaN, + }, + { + mustNewForm( + t, + "foo?prevExist=false", + url.Values{"prevExist": []string{"yes"}}, + ), + etcdErr.EcodeInvalidField, + }, + } + for i, tt := range tests { + got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock()) + if err == nil { + t.Errorf("#%d: unexpected nil error!", i) + continue + } + ee, ok := err.(*etcdErr.Error) + if !ok { + t.Errorf("#%d: err is not etcd.Error!", i) + continue + } + if ee.ErrorCode != tt.wcode { + t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode) + t.Logf("cause: %#v", ee.Cause) + } + if !reflect.DeepEqual(got, etcdserverpb.Request{}) { + t.Errorf("#%d: unexpected non-empty Request: %#v", i, got) + } + } +} + +func TestGoodParseRequest(t *testing.T) { + fc := clockwork.NewFakeClock() + fc.Advance(1111) + tests := []struct { + in *http.Request + w etcdserverpb.Request + }{ + { + // good prefix, all other values default + mustNewRequest(t, "foo"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // value specified + mustNewForm( + t, + "foo", + url.Values{"value": []string{"some_value"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + Val: "some_value", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // prevIndex specified + mustNewForm( + t, + "foo", + url.Values{"prevIndex": []string{"98765"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevIndex: 98765, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // recursive specified + mustNewForm( + t, + "foo", + url.Values{"recursive": []string{"true"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + Recursive: true, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // sorted specified + mustNewForm( + t, + "foo", + url.Values{"sorted": []string{"true"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + Sorted: true, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // quorum specified + mustNewForm( + t, + "foo", + url.Values{"quorum": []string{"true"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + Quorum: true, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // wait specified + mustNewRequest(t, "foo?wait=true"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Wait: true, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // empty TTL specified + mustNewRequest(t, "foo?ttl="), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + Expiration: 0, + }, + }, + { + // non-empty TTL specified + mustNewRequest(t, "foo?ttl=5678"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + Expiration: fc.Now().Add(5678 * time.Second).UnixNano(), + }, + }, + { + // zero TTL specified + mustNewRequest(t, "foo?ttl=0"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + Expiration: fc.Now().UnixNano(), + }, + }, + { + // dir specified + mustNewRequest(t, "foo?dir=true"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Dir: true, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // dir specified negatively + mustNewRequest(t, "foo?dir=false"), + etcdserverpb.Request{ + ID: 1234, + Method: "GET", + Dir: false, + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // prevExist should be non-null if specified + mustNewForm( + t, + "foo", + url.Values{"prevExist": []string{"true"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevExist: boolp(true), + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + { + // prevExist should be non-null if specified + mustNewForm( + t, + "foo", + url.Values{"prevExist": []string{"false"}}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevExist: boolp(false), + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + // mix various fields + { + mustNewForm( + t, + "foo", + url.Values{ + "value": []string{"some value"}, + "prevExist": []string{"true"}, + "prevValue": []string{"previous value"}, + }, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevExist: boolp(true), + PrevValue: "previous value", + Val: "some value", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + // query parameters should be used if given + { + mustNewForm( + t, + "foo?prevValue=woof", + url.Values{}, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevValue: "woof", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + // but form values should take precedence over query parameters + { + mustNewForm( + t, + "foo?prevValue=woof", + url.Values{ + "prevValue": []string{"miaow"}, + }, + ), + etcdserverpb.Request{ + ID: 1234, + Method: "PUT", + PrevValue: "miaow", + Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), + }, + }, + } + + for i, tt := range tests { + got, err := parseKeyRequest(tt.in, 1234, fc) + if err != nil { + t.Errorf("#%d: err = %v, want %v", i, err, nil) + } + if !reflect.DeepEqual(got, tt.w) { + t.Errorf("#%d: request=%#v, want %#v", i, got, tt.w) + } + } +} + +func TestServeAdminMembers(t *testing.T) { + memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} + memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} + cluster := &fakeCluster{ + members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, + } + h := &serverHandler{ + server: &serverRecorder{}, + clock: clockwork.NewFakeClock(), + clusterInfo: cluster, + } + + msb, err := json.Marshal( + struct { + Members []etcdserver.Member `json:"members"` + }{ + Members: []etcdserver.Member{memb1, memb2}, + }, + ) + if err != nil { + t.Fatal(err) + } + wms := string(msb) + "\n" + + tests := []struct { + path string + wcode int + wct string + wbody string + }{ + {adminMembersPrefix, http.StatusOK, "application/json", wms}, + {path.Join(adminMembersPrefix, "100"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"}, + {path.Join(adminMembersPrefix, "foobar"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"}, + } + + for i, tt := range tests { + req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil) + if err != nil { + t.Fatal(err) + } + rw := httptest.NewRecorder() + h.serveAdminMembers(rw, req) + + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + if gct := rw.Header().Get("Content-Type"); gct != tt.wct { + t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct) + } + if rw.Body.String() != tt.wbody { + t.Errorf("#%d: body = %q, want %q", i, rw.Body.String(), tt.wbody) + } + } +} + +func TestServeAdminMembersPut(t *testing.T) { + u := mustNewURL(t, adminMembersPrefix) + raftAttr := etcdserver.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}} + b, err := json.Marshal(raftAttr) + if err != nil { + t.Fatal(err) + } + body := bytes.NewReader(b) + req, err := http.NewRequest("POST", u.String(), body) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + s := &serverRecorder{} + h := &serverHandler{ + server: s, + clock: clockwork.NewFakeClock(), + } + rw := httptest.NewRecorder() + + h.serveAdminMembers(rw, req) + + wcode := http.StatusCreated + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + wm := etcdserver.Member{ + ID: 3064321551348478165, + RaftAttributes: raftAttr, + } + + wb, err := json.Marshal(wm) + if err != nil { + t.Fatal(err) + } + wct := "application/json" + if gct := rw.Header().Get("Content-Type"); gct != wct { + t.Errorf("content-type = %s, want %s", gct, wct) + } + g := rw.Body.String() + w := string(wb) + "\n" + if g != w { + t.Errorf("got body=%q, want %q", g, w) + } + wactions := []action{{name: "AddMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + +func TestServeAdminMembersDelete(t *testing.T) { + req := &http.Request{ + Method: "DELETE", + URL: mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")), + } + s := &serverRecorder{} + h := &serverHandler{ + server: s, + } + rw := httptest.NewRecorder() + + h.serveAdminMembers(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + g := rw.Body.String() + if g != "" { + t.Errorf("got body=%q, want %q", g, "") + } + wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + +func TestServeAdminMembersFail(t *testing.T) { + tests := []struct { + req *http.Request + server etcdserver.Server + + wcode int + }{ + { + // bad method + &http.Request{ + Method: "CONNECT", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + }, + { + // bad method + &http.Request{ + Method: "TRACE", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, adminMembersPrefix), + Method: "POST", + Body: ioutil.NopCloser(strings.NewReader("bad json")), + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // bad content type + &http.Request{ + URL: mustNewURL(t, adminMembersPrefix), + Method: "POST", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/bad"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // bad url + &http.Request{ + URL: mustNewURL(t, adminMembersPrefix), + Method: "POST", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.AddMember error + &http.Request{ + URL: mustNewURL(t, adminMembersPrefix), + Method: "POST", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.RemoveMember error + &http.Request{ + URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), + Method: "DELETE", + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + } + for i, tt := range tests { + h := &serverHandler{ + server: tt.server, + clock: clockwork.NewFakeClock(), + } + rw := httptest.NewRecorder() + h.serveAdminMembers(rw, tt.req) + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +func TestWriteEvent(t *testing.T) { + // nil event should not panic + rw := httptest.NewRecorder() + writeKeyEvent(rw, nil, dummyRaftTimer{}) + h := rw.Header() + if len(h) > 0 { + t.Fatalf("unexpected non-empty headers: %#v", h) + } + b := rw.Body.String() + if len(b) > 0 { + t.Fatalf("unexpected non-empty body: %q", b) + } + + tests := []struct { + ev *store.Event + idx string + // TODO(jonboulle): check body as well as just status code + code int + err error + }{ + // standard case, standard 200 response + { + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + PrevNode: &store.NodeExtern{}, + }, + "0", + http.StatusOK, + nil, + }, + // check new nodes return StatusCreated + { + &store.Event{ + Action: store.Create, + Node: &store.NodeExtern{}, + PrevNode: &store.NodeExtern{}, + }, + "0", + http.StatusCreated, + nil, + }, + } + + for i, tt := range tests { + rw := httptest.NewRecorder() + writeKeyEvent(rw, tt.ev, dummyRaftTimer{}) + if gct := rw.Header().Get("Content-Type"); gct != "application/json" { + t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct) + } + if gri := rw.Header().Get("X-Raft-Index"); gri != "100" { + t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100") + } + if grt := rw.Header().Get("X-Raft-Term"); grt != "5" { + t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5") + } + if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx { + t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx) + } + if rw.Code != tt.code { + t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code) + } + + } +} + +func TestV2DeprecatedMachinesEndpoint(t *testing.T) { + tests := []struct { + method string + wcode int + }{ + {"GET", http.StatusOK}, + {"HEAD", http.StatusOK}, + {"POST", http.StatusMethodNotAllowed}, + } + + m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}}) + s := httptest.NewServer(m) + defer s.Close() + + for _, tt := range tests { + req, err := http.NewRequest(tt.method, s.URL+deprecatedMachinesPrefix, nil) + if err != nil { + t.Fatal(err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + + if resp.StatusCode != tt.wcode { + t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, tt.wcode) + } + } +} + +func TestServeMachines(t *testing.T) { + cluster := &fakeCluster{ + clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"}, + } + writer := httptest.NewRecorder() + req, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } + h := &serverHandler{clusterInfo: cluster} + h.serveMachines(writer, req) + w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" + if g := writer.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) + } + if writer.Code != http.StatusOK { + t.Errorf("code = %d, want %d", writer.Code, http.StatusOK) + } +} + +type dummyStats struct { + data []byte +} + +func (ds *dummyStats) SelfStats() []byte { return ds.data } +func (ds *dummyStats) LeaderStats() []byte { return ds.data } +func (ds *dummyStats) StoreStats() []byte { return ds.data } +func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {} + +func TestServeSelfStats(t *testing.T) { + wb := []byte("some statistics") + w := string(wb) + sh := &serverHandler{ + stats: &dummyStats{data: wb}, + } + rw := httptest.NewRecorder() + sh.serveSelfStats(rw, &http.Request{Method: "GET"}) + if rw.Code != http.StatusOK { + t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) + } + wct := "application/json" + if gct := rw.Header().Get("Content-Type"); gct != wct { + t.Errorf("Content-Type = %q, want %q", gct, wct) + } + if g := rw.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) + } +} + +func TestSelfServeStatsBad(t *testing.T) { + for _, m := range []string{"PUT", "POST", "DELETE"} { + sh := &serverHandler{} + rw := httptest.NewRecorder() + sh.serveSelfStats( + rw, + &http.Request{ + Method: m, + }, + ) + if rw.Code != http.StatusMethodNotAllowed { + t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) + } + } +} + +func TestLeaderServeStatsBad(t *testing.T) { + for _, m := range []string{"PUT", "POST", "DELETE"} { + sh := &serverHandler{} + rw := httptest.NewRecorder() + sh.serveLeaderStats( + rw, + &http.Request{ + Method: m, + }, + ) + if rw.Code != http.StatusMethodNotAllowed { + t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) + } + } +} + +func TestServeLeaderStats(t *testing.T) { + wb := []byte("some statistics") + w := string(wb) + sh := &serverHandler{ + stats: &dummyStats{data: wb}, + } + rw := httptest.NewRecorder() + sh.serveLeaderStats(rw, &http.Request{Method: "GET"}) + if rw.Code != http.StatusOK { + t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) + } + wct := "application/json" + if gct := rw.Header().Get("Content-Type"); gct != wct { + t.Errorf("Content-Type = %q, want %q", gct, wct) + } + if g := rw.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) + } +} + +func TestServeStoreStats(t *testing.T) { + wb := []byte("some statistics") + w := string(wb) + sh := &serverHandler{ + stats: &dummyStats{data: wb}, + } + rw := httptest.NewRecorder() + sh.serveStoreStats(rw, &http.Request{Method: "GET"}) + if rw.Code != http.StatusOK { + t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) + } + wct := "application/json" + if gct := rw.Header().Get("Content-Type"); gct != wct { + t.Errorf("Content-Type = %q, want %q", gct, wct) + } + if g := rw.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) + } + +} + +func TestServeVersion(t *testing.T) { + req, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatalf("error creating request: %v", err) + } + h := &serverHandler{} + rw := httptest.NewRecorder() + h.serveVersion(rw, req) + if rw.Code != http.StatusOK { + t.Errorf("code=%d, want %d", rw.Code, http.StatusOK) + } + w := fmt.Sprintf("etcd %s", version.Version) + if g := rw.Body.String(); g != w { + t.Fatalf("body = %q, want %q", g, w) + } +} + +func TestServeVersionFails(t *testing.T) { + for _, m := range []string{ + "CONNECT", "TRACE", "PUT", "POST", "HEAD", + } { + req, err := http.NewRequest(m, "", nil) + if err != nil { + t.Fatalf("error creating request: %v", err) + } + h := &serverHandler{} + rw := httptest.NewRecorder() + h.serveVersion(rw, req) + if rw.Code != http.StatusMethodNotAllowed { + t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) + } + } +} + +func TestBadServeKeys(t *testing.T) { + testBadCases := []struct { + req *http.Request + server etcdserver.Server + + wcode int + wbody string + }{ + { + // bad method + &http.Request{ + Method: "CONNECT", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + "Method Not Allowed", + }, + { + // bad method + &http.Request{ + Method: "TRACE", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + "Method Not Allowed", + }, + { + // parseRequest error + &http.Request{ + Body: nil, + Method: "PUT", + }, + &resServer{}, + + http.StatusBadRequest, + `{"errorCode":210,"message":"Invalid POST form","cause":"missing form body","index":0}`, + }, + { + // etcdserver.Server error + mustNewRequest(t, "foo"), + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + "Internal Server Error", + }, + { + // etcdserver.Server etcd error + mustNewRequest(t, "foo"), + &errServer{ + etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0), + }, + + http.StatusNotFound, + `{"errorCode":100,"message":"Key not found","cause":"/pant","index":0}`, + }, + { + // non-event/watcher response from etcdserver.Server + mustNewRequest(t, "foo"), + &resServer{ + etcdserver.Response{}, + }, + + http.StatusInternalServerError, + "Internal Server Error", + }, + } + for i, tt := range testBadCases { + h := &serverHandler{ + timeout: 0, // context times out immediately + server: tt.server, + } + rw := httptest.NewRecorder() + h.serveKeys(rw, tt.req) + if rw.Code != tt.wcode { + t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) + } + if g := strings.TrimSuffix(rw.Body.String(), "\n"); g != tt.wbody { + t.Errorf("#%d: body = %s, want %s", i, g, tt.wbody) + } + } +} + +func TestServeKeysEvent(t *testing.T) { + req := mustNewRequest(t, "foo") + server := &resServer{ + etcdserver.Response{ + Event: &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + }, + } + h := &serverHandler{ + timeout: time.Hour, + server: server, + timer: &dummyRaftTimer{}, + } + rw := httptest.NewRecorder() + + h.serveKeys(rw, req) + + wcode := http.StatusOK + wbody := mustMarshalEvent( + t, + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + ) + + if rw.Code != wcode { + t.Errorf("got code=%d, want %d", rw.Code, wcode) + } + g := rw.Body.String() + if g != wbody { + t.Errorf("got body=%#v, want %#v", g, wbody) + } +} + +func TestServeKeysWatch(t *testing.T) { + req := mustNewRequest(t, "/foo/bar") + ec := make(chan *store.Event) + dw := &dummyWatcher{ + echan: ec, + } + server := &resServer{ + etcdserver.Response{ + Watcher: dw, + }, + } + h := &serverHandler{ + timeout: time.Hour, + server: server, + timer: &dummyRaftTimer{}, + } + go func() { + ec <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + } + }() + rw := httptest.NewRecorder() + + h.serveKeys(rw, req) + + wcode := http.StatusOK + wbody := mustMarshalEvent( + t, + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + ) + + if rw.Code != wcode { + t.Errorf("got code=%d, want %d", rw.Code, wcode) + } + g := rw.Body.String() + if g != wbody { + t.Errorf("got body=%#v, want %#v", g, wbody) + } +} + +type recordingCloseNotifier struct { + *httptest.ResponseRecorder + cn chan bool +} + +func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool { + return rcn.cn +} + +func TestHandleWatch(t *testing.T) { + defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) { + r := httptest.NewRecorder() + return r, r + } + noopEv := func(chan *store.Event) {} + + tests := []struct { + getCtx func() context.Context + getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder) + doToChan func(chan *store.Event) + + wbody string + }{ + { + // Normal case: one event + context.Background, + defaultRwRr, + func(ch chan *store.Event) { + ch <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + } + }, + + mustMarshalEvent( + t, + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + ), + }, + { + // Channel is closed, no event + context.Background, + defaultRwRr, + func(ch chan *store.Event) { + close(ch) + }, + + "", + }, + { + // Simulate a timed-out context + func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + defaultRwRr, + noopEv, + + "", + }, + { + // Close-notifying request + context.Background, + func() (http.ResponseWriter, *httptest.ResponseRecorder) { + rw := &recordingCloseNotifier{ + ResponseRecorder: httptest.NewRecorder(), + cn: make(chan bool, 1), + } + rw.cn <- true + return rw, rw.ResponseRecorder + }, + noopEv, + + "", + }, + } + + for i, tt := range tests { + rw, rr := tt.getRwRr() + wa := &dummyWatcher{ + echan: make(chan *store.Event, 1), + sidx: 10, + } + tt.doToChan(wa.echan) + + handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{}) + + wcode := http.StatusOK + wct := "application/json" + wei := "10" + wri := "100" + wrt := "5" + + if rr.Code != wcode { + t.Errorf("#%d: got code=%d, want %d", i, rr.Code, wcode) + } + h := rr.Header() + if ct := h.Get("Content-Type"); ct != wct { + t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct) + } + if ei := h.Get("X-Etcd-Index"); ei != wei { + t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei) + } + if ri := h.Get("X-Raft-Index"); ri != wri { + t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri) + } + if rt := h.Get("X-Raft-Term"); rt != wrt { + t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt) + } + g := rr.Body.String() + if g != tt.wbody { + t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody) + } + } +} + +func TestHandleWatchStreaming(t *testing.T) { + rw := &flushingRecorder{ + httptest.NewRecorder(), + make(chan struct{}, 1), + } + wa := &dummyWatcher{ + echan: make(chan *store.Event), + } + + // Launch the streaming handler in the background with a cancellable context + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{}) + close(done) + }() + + // Expect one Flush for the headers etc. + select { + case <-rw.ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for flush") + } + + // Expect headers but no body + wcode := http.StatusOK + wct := "application/json" + wbody := "" + + if rw.Code != wcode { + t.Errorf("got code=%d, want %d", rw.Code, wcode) + } + h := rw.Header() + if ct := h.Get("Content-Type"); ct != wct { + t.Errorf("Content-Type=%q, want %q", ct, wct) + } + g := rw.Body.String() + if g != wbody { + t.Errorf("got body=%#v, want %#v", g, wbody) + } + + // Now send the first event + select { + case wa.echan <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }: + case <-time.After(time.Second): + t.Fatal("timed out waiting for send") + } + + // Wait for it to be flushed... + select { + case <-rw.ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for flush") + } + + // And check the body is as expected + wbody = mustMarshalEvent( + t, + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + ) + g = rw.Body.String() + if g != wbody { + t.Errorf("got body=%#v, want %#v", g, wbody) + } + + // Rinse and repeat + select { + case wa.echan <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }: + case <-time.After(time.Second): + t.Fatal("timed out waiting for send") + } + + select { + case <-rw.ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for flush") + } + + // This time, we expect to see both events + wbody = wbody + wbody + g = rw.Body.String() + if g != wbody { + t.Errorf("got body=%#v, want %#v", g, wbody) + } + + // Finally, time out the connection and ensure the serving goroutine returns + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for done") + } +} + +func TestTrimEventPrefix(t *testing.T) { + pre := "/abc" + tests := []struct { + ev *store.Event + wev *store.Event + }{ + { + nil, + nil, + }, + { + &store.Event{}, + &store.Event{}, + }, + { + &store.Event{Node: &store.NodeExtern{Key: "/abc/def"}}, + &store.Event{Node: &store.NodeExtern{Key: "/def"}}, + }, + { + &store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}}, + &store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}}, + }, + { + &store.Event{ + Node: &store.NodeExtern{Key: "/abc/def"}, + PrevNode: &store.NodeExtern{Key: "/abc/ghi"}, + }, + &store.Event{ + Node: &store.NodeExtern{Key: "/def"}, + PrevNode: &store.NodeExtern{Key: "/ghi"}, + }, + }, + } + for i, tt := range tests { + ev := trimEventPrefix(tt.ev, pre) + if !reflect.DeepEqual(ev, tt.wev) { + t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev) + } + } +} + +func TestTrimNodeExternPrefix(t *testing.T) { + pre := "/abc" + tests := []struct { + n *store.NodeExtern + wn *store.NodeExtern + }{ + { + nil, + nil, + }, + { + &store.NodeExtern{Key: "/abc/def"}, + &store.NodeExtern{Key: "/def"}, + }, + { + &store.NodeExtern{ + Key: "/abc/def", + Nodes: []*store.NodeExtern{ + {Key: "/abc/def/1"}, + {Key: "/abc/def/2"}, + }, + }, + &store.NodeExtern{ + Key: "/def", + Nodes: []*store.NodeExtern{ + {Key: "/def/1"}, + {Key: "/def/2"}, + }, + }, + }, + } + for i, tt := range tests { + n := trimNodeExternPrefix(tt.n, pre) + if !reflect.DeepEqual(n, tt.wn) { + t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn) + } + } +} diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 760391f8f..5794fb82d 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -17,40 +17,18 @@ package etcdhttp import ( - "encoding/json" "errors" - "fmt" - "io/ioutil" "log" "net/http" - "net/url" - "path" - "strconv" "strings" "time" - "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/version" ) const ( - // prefixes of client endpoint - keysPrefix = "/v2/keys" - deprecatedMachinesPrefix = "/v2/machines" - adminMembersPrefix = "/v2/admin/members/" - statsPrefix = "/v2/stats" - versionPrefix = "/version" - // prefixes of peer endpoint - raftPrefix = "/raft" - membersPrefix = "/members" - // time to wait for response from EtcdServer requests defaultServerTimeout = 500 * time.Millisecond @@ -60,44 +38,6 @@ const ( var errClosed = errors.New("etcdhttp: client closed connection") -// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { - sh := &serverHandler{ - server: server, - clusterInfo: server.Cluster, - stats: server, - timer: server, - timeout: defaultServerTimeout, - clock: clockwork.NewRealClock(), - } - mux := http.NewServeMux() - mux.HandleFunc(keysPrefix, sh.serveKeys) - mux.HandleFunc(keysPrefix+"/", sh.serveKeys) - mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats) - mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats) - mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats) - mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines) - mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers) - mux.HandleFunc(versionPrefix, sh.serveVersion) - mux.HandleFunc("/", http.NotFound) - return mux -} - -// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. -func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - sh := &serverHandler{ - server: server, - stats: server, - clusterInfo: server.Cluster, - clock: clockwork.NewRealClock(), - } - mux := http.NewServeMux() - mux.HandleFunc(raftPrefix, sh.serveRaft) - mux.HandleFunc(membersPrefix, sh.serveMembers) - mux.HandleFunc("/", http.NotFound) - return mux -} - // serverHandler provides http.Handlers for etcd client and raft communication. type serverHandler struct { timeout time.Duration @@ -108,385 +48,6 @@ type serverHandler struct { clock clockwork.Clock } -func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), h.timeout) - defer cancel() - - rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock()) - if err != nil { - writeError(w, err) - return - } - - resp, err := h.server.Do(ctx, rr) - if err != nil { - err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) - writeError(w, err) - return - } - - switch { - case resp.Event != nil: - if err := writeKeyEvent(w, resp.Event, h.timer); err != nil { - // Should never be reached - log.Printf("error writing event: %v", err) - } - case resp.Watcher != nil: - ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) - defer cancel() - handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) - default: - writeError(w, errors.New("received response with no Event/Watcher!")) - } -} - -// serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'. -func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "HEAD") { - return - } - endpoints := h.clusterInfo.ClientURLs() - w.Write([]byte(strings.Join(endpoints, ", "))) -} - -func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { - return - } - ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) - defer cancel() - - switch r.Method { - case "GET": - if s := strings.TrimPrefix(r.URL.Path, adminMembersPrefix); s != "" { - http.NotFound(w, r) - return - } - ms := struct { - Members []*etcdserver.Member `json:"members"` - }{ - Members: h.clusterInfo.Members(), - } - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(ms); err != nil { - log.Printf("etcdhttp: %v", err) - } - case "POST": - ctype := r.Header.Get("Content-Type") - if ctype != "application/json" { - http.Error(w, fmt.Sprintf("bad Content-Type %s, accept application/json", ctype), http.StatusBadRequest) - return - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - raftAttr := etcdserver.RaftAttributes{} - if err := json.Unmarshal(b, &raftAttr); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - validURLs, err := types.NewURLs(raftAttr.PeerURLs) - if err != nil { - http.Error(w, "bad peer urls", http.StatusBadRequest) - return - } - now := h.clock.Now() - m := etcdserver.NewMember("", validURLs, "", &now) - if err := h.server.AddMember(ctx, *m); err != nil { - log.Printf("etcdhttp: error adding node %x: %v", m.ID, err) - writeError(w, err) - return - } - log.Printf("etcdhttp: added node %x with peer urls %v", m.ID, raftAttr.PeerURLs) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusCreated) - if err := json.NewEncoder(w).Encode(m); err != nil { - log.Printf("etcdhttp: %v", err) - } - case "DELETE": - idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix) - id, err := strconv.ParseUint(idStr, 16, 64) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - log.Printf("etcdhttp: remove node %x", id) - if err := h.server.RemoveMember(ctx, id); err != nil { - log.Printf("etcdhttp: error removing node %x: %v", id, err) - writeError(w, err) - return - } - w.WriteHeader(http.StatusNoContent) - } -} - -func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(h.stats.StoreStats()) -} - -func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(h.stats.SelfStats()) -} - -func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(h.stats.LeaderStats()) -} - -func (h serverHandler) serveVersion(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - w.Write([]byte("etcd " + version.Version)) -} - -func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "POST") { - return - } - - wcid := strconv.FormatUint(h.clusterInfo.ID(), 16) - w.Header().Set("X-Etcd-Cluster-ID", wcid) - - gcid := r.Header.Get("X-Etcd-Cluster-ID") - if gcid != wcid { - log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) - http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) - return - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("etcdhttp: error reading raft message:", err) - http.Error(w, "error reading raft message", http.StatusBadRequest) - return - } - var m raftpb.Message - if err := m.Unmarshal(b); err != nil { - log.Println("etcdhttp: error unmarshaling raft message:", err) - http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) - return - } - if err := h.server.Process(context.TODO(), m); err != nil { - log.Println("etcdhttp: error processing raft message:", err) - switch err { - case etcdserver.ErrRemoved: - http.Error(w, "cannot process message from removed node", http.StatusForbidden) - default: - writeError(w, err) - } - return - } - if m.Type == raftpb.MsgApp { - h.stats.UpdateRecvApp(m.From, r.ContentLength) - } - w.WriteHeader(http.StatusNoContent) -} - -func (h serverHandler) serveMembers(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET") { - return - } - cid := strconv.FormatUint(h.clusterInfo.ID(), 16) - w.Header().Set("X-Etcd-Cluster-ID", cid) - - if r.URL.Path != membersPrefix { - http.Error(w, "bad path", http.StatusBadRequest) - return - } - ms := h.clusterInfo.Members() - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(ms); err != nil { - log.Printf("etcdhttp: %v", err) - } -} - -// parseKeyRequest converts a received http.Request on keysPrefix to -// a server Request, performing validation of supplied fields as appropriate. -// If any validation fails, an empty Request and non-nil error is returned. -func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) { - emptyReq := etcdserverpb.Request{} - - err := r.ParseForm() - if err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidForm, - err.Error(), - ) - } - - if !strings.HasPrefix(r.URL.Path, keysPrefix) { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidForm, - "incorrect key prefix", - ) - } - p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) - - var pIdx, wIdx uint64 - if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeIndexNaN, - `invalid value for "prevIndex"`, - ) - } - if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeIndexNaN, - `invalid value for "waitIndex"`, - ) - } - - var rec, sort, wait, dir, quorum, stream bool - if rec, err = getBool(r.Form, "recursive"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "recursive"`, - ) - } - if sort, err = getBool(r.Form, "sorted"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "sorted"`, - ) - } - if wait, err = getBool(r.Form, "wait"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "wait"`, - ) - } - // TODO(jonboulle): define what parameters dir is/isn't compatible with? - if dir, err = getBool(r.Form, "dir"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "dir"`, - ) - } - if quorum, err = getBool(r.Form, "quorum"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "quorum"`, - ) - } - if stream, err = getBool(r.Form, "stream"); err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `invalid value for "stream"`, - ) - } - - if wait && r.Method != "GET" { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `"wait" can only be used with GET requests`, - ) - } - - pV := r.FormValue("prevValue") - if _, ok := r.Form["prevValue"]; ok && pV == "" { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - `"prevValue" cannot be empty`, - ) - } - - // TTL is nullable, so leave it null if not specified - // or an empty string - var ttl *uint64 - if len(r.FormValue("ttl")) > 0 { - i, err := getUint64(r.Form, "ttl") - if err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeTTLNaN, - `invalid value for "ttl"`, - ) - } - ttl = &i - } - - // prevExist is nullable, so leave it null if not specified - var pe *bool - if _, ok := r.Form["prevExist"]; ok { - bv, err := getBool(r.Form, "prevExist") - if err != nil { - return emptyReq, etcdErr.NewRequestError( - etcdErr.EcodeInvalidField, - "invalid value for prevExist", - ) - } - pe = &bv - } - - rr := etcdserverpb.Request{ - ID: id, - Method: r.Method, - Path: p, - Val: r.FormValue("value"), - Dir: dir, - PrevValue: pV, - PrevIndex: pIdx, - PrevExist: pe, - Wait: wait, - Since: wIdx, - Recursive: rec, - Sorted: sort, - Quorum: quorum, - Stream: stream, - } - - if pe != nil { - rr.PrevExist = pe - } - - // Null TTL is equivalent to unset Expiration - if ttl != nil { - expr := time.Duration(*ttl) * time.Second - rr.Expiration = clock.Now().Add(expr).UnixNano() - } - - return rr, nil -} - -// getUint64 extracts a uint64 by the given key from a Form. If the key does -// not exist in the form, 0 is returned. If the key exists but the value is -// badly formed, an error is returned. If multiple values are present only the -// first is considered. -func getUint64(form url.Values, key string) (i uint64, err error) { - if vals, ok := form[key]; ok { - i, err = strconv.ParseUint(vals[0], 10, 64) - } - return -} - -// getBool extracts a bool by the given key from a Form. If the key does not -// exist in the form, false is returned. If the key exists but the value is -// badly formed, an error is returned. If multiple values are present only the -// first is considered. -func getBool(form url.Values, key string) (b bool, err error) { - if vals, ok := form[key]; ok { - b, err = strconv.ParseBool(vals[0]) - } - return -} - // writeError logs and writes the given Error to the ResponseWriter // If Error is an etcdErr, it is rendered to the ResponseWriter // Otherwise, it is assumed to be an InternalServerError @@ -502,72 +63,6 @@ func writeError(w http.ResponseWriter, err error) { } } -// writeKeyEvent trims the prefix of key path in a single Event under -// StoreKeysPrefix, serializes it and writes the resulting JSON to the given -// ResponseWriter, along with the appropriate headers. -func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error { - if ev == nil { - return errors.New("cannot write empty Event!") - } - w.Header().Set("Content-Type", "application/json") - w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) - w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) - w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) - - if ev.IsCreated() { - w.WriteHeader(http.StatusCreated) - } - - ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) - return json.NewEncoder(w).Encode(ev) -} - -func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { - defer wa.Remove() - ech := wa.EventChan() - var nch <-chan bool - if x, ok := w.(http.CloseNotifier); ok { - nch = x.CloseNotify() - } - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) - w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) - w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) - w.WriteHeader(http.StatusOK) - - // Ensure headers are flushed early, in case of long polling - w.(http.Flusher).Flush() - - for { - select { - case <-nch: - // Client closed connection. Nothing to do. - return - case <-ctx.Done(): - // Timed out. net/http will close the connection for us, so nothing to do. - return - case ev, ok := <-ech: - if !ok { - // If the channel is closed this may be an indication of - // that notifications are much more than we are able to - // send to the client in time. Then we simply end streaming. - return - } - ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) - if err := json.NewEncoder(w).Encode(ev); err != nil { - // Should never be reached - log.Printf("error writing event: %v\n", err) - return - } - if !stream { - return - } - w.(http.Flusher).Flush() - } - } -} - // allowMethod verifies that the given method is one of the allowed methods, // and if not, it writes an error to w. A boolean is returned indicating // whether or not the method is allowed. @@ -581,30 +76,3 @@ func allowMethod(w http.ResponseWriter, m string, ms ...string) bool { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return false } - -func trimEventPrefix(ev *store.Event, prefix string) *store.Event { - if ev == nil { - return nil - } - ev.Node = trimNodeExternPrefix(ev.Node, prefix) - ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix) - return ev -} - -func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern { - if n == nil { - return nil - } - n.Key = strings.TrimPrefix(n.Key, prefix) - for _, nn := range n.Nodes { - nn = trimNodeExternPrefix(nn, prefix) - } - return n -} - -func trimErrorPrefix(err error, prefix string) error { - if e, ok := err.(*etcdErr.Error); ok { - e.Cause = strings.TrimPrefix(e.Cause, prefix) - } - return err -} diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index d570ea8d5..4912685ae 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -17,35 +17,20 @@ package etcdhttp import ( - "bytes" - "encoding/json" "errors" - "fmt" - "io" - "io/ioutil" "net/http" "net/http/httptest" "net/url" - "path" - "reflect" "sort" - "strconv" - "strings" "testing" - "time" "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/version" ) -func boolp(b bool) *bool { return &b } - func mustNewURL(t *testing.T, s string) *url.URL { u, err := url.Parse(s) if err != nil { @@ -54,450 +39,45 @@ func mustNewURL(t *testing.T, s string) *url.URL { return u } -// mustNewRequest takes a path, appends it to the standard keysPrefix, and constructs -// a GET *http.Request referencing the resulting URL -func mustNewRequest(t *testing.T, p string) *http.Request { - return mustNewMethodRequest(t, "GET", p) +type fakeCluster struct { + id uint64 + clientURLs []string + members map[uint64]*etcdserver.Member } -func mustNewMethodRequest(t *testing.T, m, p string) *http.Request { - return &http.Request{ - Method: m, - URL: mustNewURL(t, path.Join(keysPrefix, p)), +func (c *fakeCluster) ID() uint64 { return c.id } +func (c *fakeCluster) ClientURLs() []string { return c.clientURLs } +func (c *fakeCluster) Members() []*etcdserver.Member { + var sms etcdserver.SortableMemberSlice + for _, m := range c.members { + sms = append(sms, m) } + sort.Sort(sms) + return []*etcdserver.Member(sms) +} +func (c *fakeCluster) Member(id uint64) *etcdserver.Member { return c.members[id] } + +// errServer implements the etcd.Server interface for testing. +// It returns the given error from any Do/Process/AddMember/RemoveMember calls. +type errServer struct { + err error } -// mustNewForm takes a set of Values and constructs a PUT *http.Request, -// with a URL constructed from appending the given path to the standard keysPrefix -func mustNewForm(t *testing.T, p string, vals url.Values) *http.Request { - u := mustNewURL(t, path.Join(keysPrefix, p)) - req, err := http.NewRequest("PUT", u.String(), strings.NewReader(vals.Encode())) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if err != nil { - t.Fatalf("error creating new request: %v", err) - } - return req +func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { + return etcdserver.Response{}, fs.err } - -func TestBadParseRequest(t *testing.T) { - tests := []struct { - in *http.Request - wcode int - }{ - { - // parseForm failure - &http.Request{ - Body: nil, - Method: "PUT", - }, - etcdErr.EcodeInvalidForm, - }, - { - // bad key prefix - &http.Request{ - URL: mustNewURL(t, "/badprefix/"), - }, - etcdErr.EcodeInvalidForm, - }, - // bad values for prevIndex, waitIndex, ttl - { - mustNewForm(t, "foo", url.Values{"prevIndex": []string{"garbage"}}), - etcdErr.EcodeIndexNaN, - }, - { - mustNewForm(t, "foo", url.Values{"prevIndex": []string{"1.5"}}), - etcdErr.EcodeIndexNaN, - }, - { - mustNewForm(t, "foo", url.Values{"prevIndex": []string{"-1"}}), - etcdErr.EcodeIndexNaN, - }, - { - mustNewForm(t, "foo", url.Values{"waitIndex": []string{"garbage"}}), - etcdErr.EcodeIndexNaN, - }, - { - mustNewForm(t, "foo", url.Values{"waitIndex": []string{"??"}}), - etcdErr.EcodeIndexNaN, - }, - { - mustNewForm(t, "foo", url.Values{"ttl": []string{"-1"}}), - etcdErr.EcodeTTLNaN, - }, - // bad values for recursive, sorted, wait, prevExist, dir, stream - { - mustNewForm(t, "foo", url.Values{"recursive": []string{"hahaha"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"recursive": []string{"1234"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"recursive": []string{"?"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"sorted": []string{"?"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"sorted": []string{"x"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"wait": []string{"?!"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"wait": []string{"yes"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"prevExist": []string{"yes"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"prevExist": []string{"#2"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"dir": []string{"no"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"dir": []string{"file"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"quorum": []string{"no"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"quorum": []string{"file"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"stream": []string{"zzz"}}), - etcdErr.EcodeInvalidField, - }, - { - mustNewForm(t, "foo", url.Values{"stream": []string{"something"}}), - etcdErr.EcodeInvalidField, - }, - // prevValue cannot be empty - { - mustNewForm(t, "foo", url.Values{"prevValue": []string{""}}), - etcdErr.EcodeInvalidField, - }, - // wait is only valid with GET requests - { - mustNewMethodRequest(t, "HEAD", "foo?wait=true"), - etcdErr.EcodeInvalidField, - }, - // query values are considered - { - mustNewRequest(t, "foo?prevExist=wrong"), - etcdErr.EcodeInvalidField, - }, - { - mustNewRequest(t, "foo?ttl=wrong"), - etcdErr.EcodeTTLNaN, - }, - // but body takes precedence if both are specified - { - mustNewForm( - t, - "foo?ttl=12", - url.Values{"ttl": []string{"garbage"}}, - ), - etcdErr.EcodeTTLNaN, - }, - { - mustNewForm( - t, - "foo?prevExist=false", - url.Values{"prevExist": []string{"yes"}}, - ), - etcdErr.EcodeInvalidField, - }, - } - for i, tt := range tests { - got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock()) - if err == nil { - t.Errorf("#%d: unexpected nil error!", i) - continue - } - ee, ok := err.(*etcdErr.Error) - if !ok { - t.Errorf("#%d: err is not etcd.Error!", i) - continue - } - if ee.ErrorCode != tt.wcode { - t.Errorf("#%d: code=%d, want %v", i, ee.ErrorCode, tt.wcode) - t.Logf("cause: %#v", ee.Cause) - } - if !reflect.DeepEqual(got, etcdserverpb.Request{}) { - t.Errorf("#%d: unexpected non-empty Request: %#v", i, got) - } - } +func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { + return fs.err } - -func TestGoodParseRequest(t *testing.T) { - fc := clockwork.NewFakeClock() - fc.Advance(1111) - tests := []struct { - in *http.Request - w etcdserverpb.Request - }{ - { - // good prefix, all other values default - mustNewRequest(t, "foo"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // value specified - mustNewForm( - t, - "foo", - url.Values{"value": []string{"some_value"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - Val: "some_value", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // prevIndex specified - mustNewForm( - t, - "foo", - url.Values{"prevIndex": []string{"98765"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevIndex: 98765, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // recursive specified - mustNewForm( - t, - "foo", - url.Values{"recursive": []string{"true"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - Recursive: true, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // sorted specified - mustNewForm( - t, - "foo", - url.Values{"sorted": []string{"true"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - Sorted: true, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // quorum specified - mustNewForm( - t, - "foo", - url.Values{"quorum": []string{"true"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - Quorum: true, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // wait specified - mustNewRequest(t, "foo?wait=true"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Wait: true, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // empty TTL specified - mustNewRequest(t, "foo?ttl="), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - Expiration: 0, - }, - }, - { - // non-empty TTL specified - mustNewRequest(t, "foo?ttl=5678"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - Expiration: fc.Now().Add(5678 * time.Second).UnixNano(), - }, - }, - { - // zero TTL specified - mustNewRequest(t, "foo?ttl=0"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - Expiration: fc.Now().UnixNano(), - }, - }, - { - // dir specified - mustNewRequest(t, "foo?dir=true"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Dir: true, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // dir specified negatively - mustNewRequest(t, "foo?dir=false"), - etcdserverpb.Request{ - ID: 1234, - Method: "GET", - Dir: false, - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // prevExist should be non-null if specified - mustNewForm( - t, - "foo", - url.Values{"prevExist": []string{"true"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevExist: boolp(true), - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - { - // prevExist should be non-null if specified - mustNewForm( - t, - "foo", - url.Values{"prevExist": []string{"false"}}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevExist: boolp(false), - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - // mix various fields - { - mustNewForm( - t, - "foo", - url.Values{ - "value": []string{"some value"}, - "prevExist": []string{"true"}, - "prevValue": []string{"previous value"}, - }, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevExist: boolp(true), - PrevValue: "previous value", - Val: "some value", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - // query parameters should be used if given - { - mustNewForm( - t, - "foo?prevValue=woof", - url.Values{}, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevValue: "woof", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - // but form values should take precedence over query parameters - { - mustNewForm( - t, - "foo?prevValue=woof", - url.Values{ - "prevValue": []string{"miaow"}, - }, - ), - etcdserverpb.Request{ - ID: 1234, - Method: "PUT", - PrevValue: "miaow", - Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"), - }, - }, - } - - for i, tt := range tests { - got, err := parseKeyRequest(tt.in, 1234, fc) - if err != nil { - t.Errorf("#%d: err = %v, want %v", i, err, nil) - } - if !reflect.DeepEqual(got, tt.w) { - t.Errorf("#%d: request=%#v, want %#v", i, got, tt.w) - } - } +func (fs *errServer) Start() {} +func (fs *errServer) Stop() {} +func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { + return fs.err } - -// eventingWatcher immediately returns a simple event of the given action on its channel -type eventingWatcher struct { - action string +func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { + return fs.err } -func (w *eventingWatcher) EventChan() chan *store.Event { - ch := make(chan *store.Event) - go func() { - ch <- &store.Event{ - Action: w.action, - Node: &store.NodeExtern{}, - } - }() - return ch -} - -func (w *eventingWatcher) Remove() {} - func TestWriteError(t *testing.T) { // nil error should not panic rw := httptest.NewRecorder() @@ -544,274 +124,6 @@ func TestWriteError(t *testing.T) { } } -type dummyRaftTimer struct{} - -func (drt dummyRaftTimer) Index() uint64 { return uint64(100) } -func (drt dummyRaftTimer) Term() uint64 { return uint64(5) } - -func TestWriteEvent(t *testing.T) { - // nil event should not panic - rw := httptest.NewRecorder() - writeKeyEvent(rw, nil, dummyRaftTimer{}) - h := rw.Header() - if len(h) > 0 { - t.Fatalf("unexpected non-empty headers: %#v", h) - } - b := rw.Body.String() - if len(b) > 0 { - t.Fatalf("unexpected non-empty body: %q", b) - } - - tests := []struct { - ev *store.Event - idx string - // TODO(jonboulle): check body as well as just status code - code int - err error - }{ - // standard case, standard 200 response - { - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - PrevNode: &store.NodeExtern{}, - }, - "0", - http.StatusOK, - nil, - }, - // check new nodes return StatusCreated - { - &store.Event{ - Action: store.Create, - Node: &store.NodeExtern{}, - PrevNode: &store.NodeExtern{}, - }, - "0", - http.StatusCreated, - nil, - }, - } - - for i, tt := range tests { - rw := httptest.NewRecorder() - writeKeyEvent(rw, tt.ev, dummyRaftTimer{}) - if gct := rw.Header().Get("Content-Type"); gct != "application/json" { - t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct) - } - if gri := rw.Header().Get("X-Raft-Index"); gri != "100" { - t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100") - } - if grt := rw.Header().Get("X-Raft-Term"); grt != "5" { - t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5") - } - if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx { - t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx) - } - if rw.Code != tt.code { - t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code) - } - - } -} - -type dummyWatcher struct { - echan chan *store.Event - sidx uint64 -} - -func (w *dummyWatcher) EventChan() chan *store.Event { - return w.echan -} -func (w *dummyWatcher) StartIndex() uint64 { return w.sidx } -func (w *dummyWatcher) Remove() {} - -func TestV2DeprecatedMachinesEndpoint(t *testing.T) { - tests := []struct { - method string - wcode int - }{ - {"GET", http.StatusOK}, - {"HEAD", http.StatusOK}, - {"POST", http.StatusMethodNotAllowed}, - } - - m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}}) - s := httptest.NewServer(m) - defer s.Close() - - for _, tt := range tests { - req, err := http.NewRequest(tt.method, s.URL+deprecatedMachinesPrefix, nil) - if err != nil { - t.Fatal(err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } - - if resp.StatusCode != tt.wcode { - t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, tt.wcode) - } - } -} - -func TestServeMachines(t *testing.T) { - cluster := &fakeCluster{ - clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"}, - } - writer := httptest.NewRecorder() - req, err := http.NewRequest("GET", "", nil) - if err != nil { - t.Fatal(err) - } - h := &serverHandler{clusterInfo: cluster} - h.serveMachines(writer, req) - w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" - if g := writer.Body.String(); g != w { - t.Errorf("body = %s, want %s", g, w) - } - if writer.Code != http.StatusOK { - t.Errorf("code = %d, want %d", writer.Code, http.StatusOK) - } -} - -type dummyStats struct { - data []byte -} - -func (ds *dummyStats) SelfStats() []byte { return ds.data } -func (ds *dummyStats) LeaderStats() []byte { return ds.data } -func (ds *dummyStats) StoreStats() []byte { return ds.data } -func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {} - -func TestServeSelfStats(t *testing.T) { - wb := []byte("some statistics") - w := string(wb) - sh := &serverHandler{ - stats: &dummyStats{data: wb}, - } - rw := httptest.NewRecorder() - sh.serveSelfStats(rw, &http.Request{Method: "GET"}) - if rw.Code != http.StatusOK { - t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) - } - wct := "application/json" - if gct := rw.Header().Get("Content-Type"); gct != wct { - t.Errorf("Content-Type = %q, want %q", gct, wct) - } - if g := rw.Body.String(); g != w { - t.Errorf("body = %s, want %s", g, w) - } -} - -func TestSelfServeStatsBad(t *testing.T) { - for _, m := range []string{"PUT", "POST", "DELETE"} { - sh := &serverHandler{} - rw := httptest.NewRecorder() - sh.serveSelfStats( - rw, - &http.Request{ - Method: m, - }, - ) - if rw.Code != http.StatusMethodNotAllowed { - t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) - } - } -} - -func TestLeaderServeStatsBad(t *testing.T) { - for _, m := range []string{"PUT", "POST", "DELETE"} { - sh := &serverHandler{} - rw := httptest.NewRecorder() - sh.serveLeaderStats( - rw, - &http.Request{ - Method: m, - }, - ) - if rw.Code != http.StatusMethodNotAllowed { - t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) - } - } -} - -func TestServeLeaderStats(t *testing.T) { - wb := []byte("some statistics") - w := string(wb) - sh := &serverHandler{ - stats: &dummyStats{data: wb}, - } - rw := httptest.NewRecorder() - sh.serveLeaderStats(rw, &http.Request{Method: "GET"}) - if rw.Code != http.StatusOK { - t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) - } - wct := "application/json" - if gct := rw.Header().Get("Content-Type"); gct != wct { - t.Errorf("Content-Type = %q, want %q", gct, wct) - } - if g := rw.Body.String(); g != w { - t.Errorf("body = %s, want %s", g, w) - } -} - -func TestServeStoreStats(t *testing.T) { - wb := []byte("some statistics") - w := string(wb) - sh := &serverHandler{ - stats: &dummyStats{data: wb}, - } - rw := httptest.NewRecorder() - sh.serveStoreStats(rw, &http.Request{Method: "GET"}) - if rw.Code != http.StatusOK { - t.Errorf("code = %d, want %d", rw.Code, http.StatusOK) - } - wct := "application/json" - if gct := rw.Header().Get("Content-Type"); gct != wct { - t.Errorf("Content-Type = %q, want %q", gct, wct) - } - if g := rw.Body.String(); g != w { - t.Errorf("body = %s, want %s", g, w) - } - -} - -func TestServeVersion(t *testing.T) { - req, err := http.NewRequest("GET", "", nil) - if err != nil { - t.Fatalf("error creating request: %v", err) - } - h := &serverHandler{} - rw := httptest.NewRecorder() - h.serveVersion(rw, req) - if rw.Code != http.StatusOK { - t.Errorf("code=%d, want %d", rw.Code, http.StatusOK) - } - w := fmt.Sprintf("etcd %s", version.Version) - if g := rw.Body.String(); g != w { - t.Fatalf("body = %q, want %q", g, w) - } -} - -func TestServeVersionFails(t *testing.T) { - for _, m := range []string{ - "CONNECT", "TRACE", "PUT", "POST", "HEAD", - } { - req, err := http.NewRequest(m, "", nil) - if err != nil { - t.Fatalf("error creating request: %v", err) - } - h := &serverHandler{} - rw := httptest.NewRecorder() - h.serveVersion(rw, req) - if rw.Code != http.StatusMethodNotAllowed { - t.Errorf("method %s: code=%d, want %d", m, rw.Code, http.StatusMethodNotAllowed) - } - } -} - func TestAllowMethod(t *testing.T) { tests := []struct { m string @@ -877,1037 +189,3 @@ func TestAllowMethod(t *testing.T) { } } } - -// errServer implements the etcd.Server interface for testing. -// It returns the given error from any Do/Process/AddMember/RemoveMember calls. -type errServer struct { - err error -} - -func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { - return etcdserver.Response{}, fs.err -} -func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { - return fs.err -} -func (fs *errServer) Start() {} -func (fs *errServer) Stop() {} -func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { - return fs.err -} -func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { - return fs.err -} - -// errReader implements io.Reader to facilitate a broken request. -type errReader struct{} - -func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } - -func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte { - json, err := m.Marshal() - if err != nil { - t.Fatalf("error marshalling raft Message: %#v", err) - } - return json -} - -func TestServeRaft(t *testing.T) { - testCases := []struct { - method string - body io.Reader - serverErr error - clusterID string - - wcode int - }{ - { - // bad method - "GET", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "PUT", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "DELETE", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad request body - "POST", - &errReader{}, - nil, - "0", - http.StatusBadRequest, - }, - { - // bad request protobuf - "POST", - strings.NewReader("malformed garbage"), - nil, - "0", - http.StatusBadRequest, - }, - { - // good request, etcdserver.Server internal error - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - errors.New("some error"), - "0", - http.StatusInternalServerError, - }, - { - // good request from removed member - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - etcdserver.ErrRemoved, - "0", - http.StatusForbidden, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "1", - http.StatusPreconditionFailed, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusNoContent, - }, - } - for i, tt := range testCases { - req, err := http.NewRequest(tt.method, "foo", tt.body) - if err != nil { - t.Fatalf("#%d: could not create request: %#v", i, err) - } - req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) - h := &serverHandler{ - timeout: time.Hour, - server: &errServer{tt.serverErr}, - clusterInfo: &fakeCluster{id: 0}, - } - rw := httptest.NewRecorder() - h.serveRaft(rw, req) - if rw.Code != tt.wcode { - t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - -func TestServeMembersFails(t *testing.T) { - tests := []struct { - method string - wcode int - }{ - { - "POST", - http.StatusMethodNotAllowed, - }, - { - "DELETE", - http.StatusMethodNotAllowed, - }, - { - "BAD", - http.StatusMethodNotAllowed, - }, - } - for i, tt := range tests { - h := &serverHandler{} - rw := httptest.NewRecorder() - h.serveMembers(rw, &http.Request{Method: tt.method}) - if rw.Code != tt.wcode { - t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - -func TestServeMembersGet(t *testing.T) { - memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} - memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} - cluster := &fakeCluster{ - id: 1, - members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, - } - h := &serverHandler{ - server: &serverRecorder{}, - clusterInfo: cluster, - } - - msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) - if err != nil { - t.Fatal(err) - } - wms := string(msb) + "\n" - - tests := []struct { - path string - wcode int - wct string - wbody string - }{ - {membersPrefix, http.StatusOK, "application/json", wms}, - {path.Join(membersPrefix, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"}, - } - - for i, tt := range tests { - req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil) - if err != nil { - t.Fatal(err) - } - rw := httptest.NewRecorder() - h.serveMembers(rw, req) - - if rw.Code != tt.wcode { - t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) - } - if gct := rw.Header().Get("Content-Type"); gct != tt.wct { - t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct) - } - if rw.Body.String() != tt.wbody { - t.Errorf("#%d: body = %s, want %s", i, rw.Body.String(), tt.wbody) - } - gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := strconv.FormatUint(cluster.ID(), 16) - if gcid != wcid { - t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) - } - } -} - -// resServer implements the etcd.Server interface for testing. -// It returns the given responsefrom any Do calls, and nil error -type resServer struct { - res etcdserver.Response -} - -func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { - return rs.res, nil -} -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } -func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } - -func mustMarshalEvent(t *testing.T, ev *store.Event) string { - b := new(bytes.Buffer) - if err := json.NewEncoder(b).Encode(ev); err != nil { - t.Fatalf("error marshalling event %#v: %v", ev, err) - } - return b.String() -} - -func TestBadServeKeys(t *testing.T) { - testBadCases := []struct { - req *http.Request - server etcdserver.Server - - wcode int - wbody string - }{ - { - // bad method - &http.Request{ - Method: "CONNECT", - }, - &resServer{}, - - http.StatusMethodNotAllowed, - "Method Not Allowed", - }, - { - // bad method - &http.Request{ - Method: "TRACE", - }, - &resServer{}, - - http.StatusMethodNotAllowed, - "Method Not Allowed", - }, - { - // parseRequest error - &http.Request{ - Body: nil, - Method: "PUT", - }, - &resServer{}, - - http.StatusBadRequest, - `{"errorCode":210,"message":"Invalid POST form","cause":"missing form body","index":0}`, - }, - { - // etcdserver.Server error - mustNewRequest(t, "foo"), - &errServer{ - errors.New("blah"), - }, - - http.StatusInternalServerError, - "Internal Server Error", - }, - { - // etcdserver.Server etcd error - mustNewRequest(t, "foo"), - &errServer{ - etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0), - }, - - http.StatusNotFound, - `{"errorCode":100,"message":"Key not found","cause":"/pant","index":0}`, - }, - { - // non-event/watcher response from etcdserver.Server - mustNewRequest(t, "foo"), - &resServer{ - etcdserver.Response{}, - }, - - http.StatusInternalServerError, - "Internal Server Error", - }, - } - for i, tt := range testBadCases { - h := &serverHandler{ - timeout: 0, // context times out immediately - server: tt.server, - } - rw := httptest.NewRecorder() - h.serveKeys(rw, tt.req) - if rw.Code != tt.wcode { - t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) - } - if g := strings.TrimSuffix(rw.Body.String(), "\n"); g != tt.wbody { - t.Errorf("#%d: body = %s, want %s", i, g, tt.wbody) - } - } -} - -func TestServeKeysEvent(t *testing.T) { - req := mustNewRequest(t, "foo") - server := &resServer{ - etcdserver.Response{ - Event: &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - }, - } - h := &serverHandler{ - timeout: time.Hour, - server: server, - timer: &dummyRaftTimer{}, - } - rw := httptest.NewRecorder() - - h.serveKeys(rw, req) - - wcode := http.StatusOK - wbody := mustMarshalEvent( - t, - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - ) - - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } -} - -func TestServeKeysWatch(t *testing.T) { - req := mustNewRequest(t, "/foo/bar") - ec := make(chan *store.Event) - dw := &dummyWatcher{ - echan: ec, - } - server := &resServer{ - etcdserver.Response{ - Watcher: dw, - }, - } - h := &serverHandler{ - timeout: time.Hour, - server: server, - timer: &dummyRaftTimer{}, - } - go func() { - ec <- &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - } - }() - rw := httptest.NewRecorder() - - h.serveKeys(rw, req) - - wcode := http.StatusOK - wbody := mustMarshalEvent( - t, - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - ) - - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } -} - -type recordingCloseNotifier struct { - *httptest.ResponseRecorder - cn chan bool -} - -func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool { - return rcn.cn -} - -func TestHandleWatch(t *testing.T) { - defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) { - r := httptest.NewRecorder() - return r, r - } - noopEv := func(chan *store.Event) {} - - tests := []struct { - getCtx func() context.Context - getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder) - doToChan func(chan *store.Event) - - wbody string - }{ - { - // Normal case: one event - context.Background, - defaultRwRr, - func(ch chan *store.Event) { - ch <- &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - } - }, - - mustMarshalEvent( - t, - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - ), - }, - { - // Channel is closed, no event - context.Background, - defaultRwRr, - func(ch chan *store.Event) { - close(ch) - }, - - "", - }, - { - // Simulate a timed-out context - func() context.Context { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - return ctx - }, - defaultRwRr, - noopEv, - - "", - }, - { - // Close-notifying request - context.Background, - func() (http.ResponseWriter, *httptest.ResponseRecorder) { - rw := &recordingCloseNotifier{ - ResponseRecorder: httptest.NewRecorder(), - cn: make(chan bool, 1), - } - rw.cn <- true - return rw, rw.ResponseRecorder - }, - noopEv, - - "", - }, - } - - for i, tt := range tests { - rw, rr := tt.getRwRr() - wa := &dummyWatcher{ - echan: make(chan *store.Event, 1), - sidx: 10, - } - tt.doToChan(wa.echan) - - handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{}) - - wcode := http.StatusOK - wct := "application/json" - wei := "10" - wri := "100" - wrt := "5" - - if rr.Code != wcode { - t.Errorf("#%d: got code=%d, want %d", i, rr.Code, wcode) - } - h := rr.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct) - } - if ei := h.Get("X-Etcd-Index"); ei != wei { - t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei) - } - if ri := h.Get("X-Raft-Index"); ri != wri { - t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri) - } - if rt := h.Get("X-Raft-Term"); rt != wrt { - t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt) - } - g := rr.Body.String() - if g != tt.wbody { - t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody) - } - } -} - -// flushingRecorder provides a channel to allow users to block until the Recorder is Flushed() -type flushingRecorder struct { - *httptest.ResponseRecorder - ch chan struct{} -} - -func (fr *flushingRecorder) Flush() { - fr.ResponseRecorder.Flush() - fr.ch <- struct{}{} -} - -func TestHandleWatchStreaming(t *testing.T) { - rw := &flushingRecorder{ - httptest.NewRecorder(), - make(chan struct{}, 1), - } - wa := &dummyWatcher{ - echan: make(chan *store.Event), - } - - // Launch the streaming handler in the background with a cancellable context - ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{}) - close(done) - }() - - // Expect one Flush for the headers etc. - select { - case <-rw.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for flush") - } - - // Expect headers but no body - wcode := http.StatusOK - wct := "application/json" - wbody := "" - - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - h := rw.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("Content-Type=%q, want %q", ct, wct) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } - - // Now send the first event - select { - case wa.echan <- &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }: - case <-time.After(time.Second): - t.Fatal("timed out waiting for send") - } - - // Wait for it to be flushed... - select { - case <-rw.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for flush") - } - - // And check the body is as expected - wbody = mustMarshalEvent( - t, - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - ) - g = rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } - - // Rinse and repeat - select { - case wa.echan <- &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }: - case <-time.After(time.Second): - t.Fatal("timed out waiting for send") - } - - select { - case <-rw.ch: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for flush") - } - - // This time, we expect to see both events - wbody = wbody + wbody - g = rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } - - // Finally, time out the connection and ensure the serving goroutine returns - cancel() - - select { - case <-done: - case <-time.After(time.Second): - t.Fatalf("timed out waiting for done") - } -} - -func TestServeAdminMembersFail(t *testing.T) { - tests := []struct { - req *http.Request - server etcdserver.Server - - wcode int - }{ - { - // bad method - &http.Request{ - Method: "CONNECT", - }, - &resServer{}, - - http.StatusMethodNotAllowed, - }, - { - // bad method - &http.Request{ - Method: "TRACE", - }, - &resServer{}, - - http.StatusMethodNotAllowed, - }, - { - // parse body error - &http.Request{ - URL: mustNewURL(t, adminMembersPrefix), - Method: "POST", - Body: ioutil.NopCloser(strings.NewReader("bad json")), - }, - &resServer{}, - - http.StatusBadRequest, - }, - { - // bad content type - &http.Request{ - URL: mustNewURL(t, adminMembersPrefix), - Method: "POST", - Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), - Header: map[string][]string{"Content-Type": []string{"application/bad"}}, - }, - &errServer{}, - - http.StatusBadRequest, - }, - { - // bad url - &http.Request{ - URL: mustNewURL(t, adminMembersPrefix), - Method: "POST", - Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), - Header: map[string][]string{"Content-Type": []string{"application/json"}}, - }, - &errServer{}, - - http.StatusBadRequest, - }, - { - // etcdserver.AddMember error - &http.Request{ - URL: mustNewURL(t, adminMembersPrefix), - Method: "POST", - Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), - Header: map[string][]string{"Content-Type": []string{"application/json"}}, - }, - &errServer{ - errors.New("blah"), - }, - - http.StatusInternalServerError, - }, - { - // etcdserver.RemoveMember error - &http.Request{ - URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), - Method: "DELETE", - }, - &errServer{ - errors.New("blah"), - }, - - http.StatusInternalServerError, - }, - } - for i, tt := range tests { - h := &serverHandler{ - server: tt.server, - clock: clockwork.NewFakeClock(), - } - rw := httptest.NewRecorder() - h.serveAdminMembers(rw, tt.req) - if rw.Code != tt.wcode { - t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - -type action struct { - name string - params []interface{} -} - -type serverRecorder struct { - actions []action -} - -func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { - s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}}) - return etcdserver.Response{}, nil -} -func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error { - s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}}) - return nil -} -func (s *serverRecorder) Start() {} -func (s *serverRecorder) Stop() {} -func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error { - s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}}) - return nil -} -func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { - s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}}) - return nil -} - -func TestServeAdminMembers(t *testing.T) { - memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} - memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} - cluster := &fakeCluster{ - members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, - } - h := &serverHandler{ - server: &serverRecorder{}, - clock: clockwork.NewFakeClock(), - clusterInfo: cluster, - } - - msb, err := json.Marshal( - struct { - Members []etcdserver.Member `json:"members"` - }{ - Members: []etcdserver.Member{memb1, memb2}, - }, - ) - if err != nil { - t.Fatal(err) - } - wms := string(msb) + "\n" - - tests := []struct { - path string - wcode int - wct string - wbody string - }{ - {adminMembersPrefix, http.StatusOK, "application/json", wms}, - {path.Join(adminMembersPrefix, "100"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"}, - {path.Join(adminMembersPrefix, "foobar"), http.StatusNotFound, "text/plain; charset=utf-8", "404 page not found\n"}, - } - - for i, tt := range tests { - req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil) - if err != nil { - t.Fatal(err) - } - rw := httptest.NewRecorder() - h.serveAdminMembers(rw, req) - - if rw.Code != tt.wcode { - t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) - } - if gct := rw.Header().Get("Content-Type"); gct != tt.wct { - t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct) - } - if rw.Body.String() != tt.wbody { - t.Errorf("#%d: body = %q, want %q", i, rw.Body.String(), tt.wbody) - } - } -} - -func TestServeAdminMembersPut(t *testing.T) { - u := mustNewURL(t, adminMembersPrefix) - raftAttr := etcdserver.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}} - b, err := json.Marshal(raftAttr) - if err != nil { - t.Fatal(err) - } - body := bytes.NewReader(b) - req, err := http.NewRequest("POST", u.String(), body) - if err != nil { - t.Fatal(err) - } - req.Header.Set("Content-Type", "application/json") - s := &serverRecorder{} - h := &serverHandler{ - server: s, - clock: clockwork.NewFakeClock(), - } - rw := httptest.NewRecorder() - - h.serveAdminMembers(rw, req) - - wcode := http.StatusCreated - if rw.Code != wcode { - t.Errorf("code=%d, want %d", rw.Code, wcode) - } - wm := etcdserver.Member{ - ID: 3064321551348478165, - RaftAttributes: raftAttr, - } - - wb, err := json.Marshal(wm) - if err != nil { - t.Fatal(err) - } - wct := "application/json" - if gct := rw.Header().Get("Content-Type"); gct != wct { - t.Errorf("content-type = %s, want %s", gct, wct) - } - g := rw.Body.String() - w := string(wb) + "\n" - if g != w { - t.Errorf("got body=%q, want %q", g, w) - } - wactions := []action{{name: "AddMember", params: []interface{}{wm}}} - if !reflect.DeepEqual(s.actions, wactions) { - t.Errorf("actions = %+v, want %+v", s.actions, wactions) - } -} - -func TestServeAdminMembersDelete(t *testing.T) { - req := &http.Request{ - Method: "DELETE", - URL: mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")), - } - s := &serverRecorder{} - h := &serverHandler{ - server: s, - } - rw := httptest.NewRecorder() - - h.serveAdminMembers(rw, req) - - wcode := http.StatusNoContent - if rw.Code != wcode { - t.Errorf("code=%d, want %d", rw.Code, wcode) - } - g := rw.Body.String() - if g != "" { - t.Errorf("got body=%q, want %q", g, "") - } - wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}} - if !reflect.DeepEqual(s.actions, wactions) { - t.Errorf("actions = %+v, want %+v", s.actions, wactions) - } -} - -func TestTrimEventPrefix(t *testing.T) { - pre := "/abc" - tests := []struct { - ev *store.Event - wev *store.Event - }{ - { - nil, - nil, - }, - { - &store.Event{}, - &store.Event{}, - }, - { - &store.Event{Node: &store.NodeExtern{Key: "/abc/def"}}, - &store.Event{Node: &store.NodeExtern{Key: "/def"}}, - }, - { - &store.Event{PrevNode: &store.NodeExtern{Key: "/abc/ghi"}}, - &store.Event{PrevNode: &store.NodeExtern{Key: "/ghi"}}, - }, - { - &store.Event{ - Node: &store.NodeExtern{Key: "/abc/def"}, - PrevNode: &store.NodeExtern{Key: "/abc/ghi"}, - }, - &store.Event{ - Node: &store.NodeExtern{Key: "/def"}, - PrevNode: &store.NodeExtern{Key: "/ghi"}, - }, - }, - } - for i, tt := range tests { - ev := trimEventPrefix(tt.ev, pre) - if !reflect.DeepEqual(ev, tt.wev) { - t.Errorf("#%d: event = %+v, want %+v", i, ev, tt.wev) - } - } -} - -func TestTrimNodeExternPrefix(t *testing.T) { - pre := "/abc" - tests := []struct { - n *store.NodeExtern - wn *store.NodeExtern - }{ - { - nil, - nil, - }, - { - &store.NodeExtern{Key: "/abc/def"}, - &store.NodeExtern{Key: "/def"}, - }, - { - &store.NodeExtern{ - Key: "/abc/def", - Nodes: []*store.NodeExtern{ - {Key: "/abc/def/1"}, - {Key: "/abc/def/2"}, - }, - }, - &store.NodeExtern{ - Key: "/def", - Nodes: []*store.NodeExtern{ - {Key: "/def/1"}, - {Key: "/def/2"}, - }, - }, - }, - } - for i, tt := range tests { - n := trimNodeExternPrefix(tt.n, pre) - if !reflect.DeepEqual(n, tt.wn) { - t.Errorf("#%d: node = %+v, want %+v", i, n, tt.wn) - } - } -} - -type fakeCluster struct { - id uint64 - clientURLs []string - members map[uint64]*etcdserver.Member -} - -func (c *fakeCluster) ID() uint64 { return c.id } -func (c *fakeCluster) ClientURLs() []string { return c.clientURLs } -func (c *fakeCluster) Members() []*etcdserver.Member { - var sms etcdserver.SortableMemberSlice - for _, m := range c.members { - sms = append(sms, m) - } - sort.Sort(sms) - return []*etcdserver.Member(sms) -} -func (c *fakeCluster) Member(id uint64) *etcdserver.Member { return c.members[id] } diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go new file mode 100644 index 000000000..d2f056f12 --- /dev/null +++ b/etcdserver/etcdhttp/peer.go @@ -0,0 +1,111 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdhttp + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "strconv" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/raft/raftpb" +) + +const ( + raftPrefix = "/raft" + membersPrefix = "/members" +) + +// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. +func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { + sh := &serverHandler{ + server: server, + stats: server, + clusterInfo: server.Cluster, + clock: clockwork.NewRealClock(), + } + mux := http.NewServeMux() + mux.HandleFunc(raftPrefix, sh.serveRaft) + mux.HandleFunc(membersPrefix, sh.serveMembers) + mux.HandleFunc("/", http.NotFound) + return mux +} + +func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "POST") { + return + } + + wcid := strconv.FormatUint(h.clusterInfo.ID(), 16) + w.Header().Set("X-Etcd-Cluster-ID", wcid) + + gcid := r.Header.Get("X-Etcd-Cluster-ID") + if gcid != wcid { + log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) + return + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("etcdhttp: error reading raft message:", err) + http.Error(w, "error reading raft message", http.StatusBadRequest) + return + } + var m raftpb.Message + if err := m.Unmarshal(b); err != nil { + log.Println("etcdhttp: error unmarshaling raft message:", err) + http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) + return + } + if err := h.server.Process(context.TODO(), m); err != nil { + log.Println("etcdhttp: error processing raft message:", err) + switch err { + case etcdserver.ErrRemoved: + http.Error(w, "cannot process message from removed node", http.StatusForbidden) + default: + writeError(w, err) + } + return + } + if m.Type == raftpb.MsgApp { + h.stats.UpdateRecvApp(m.From, r.ContentLength) + } + w.WriteHeader(http.StatusNoContent) +} + +func (h serverHandler) serveMembers(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "GET") { + return + } + cid := strconv.FormatUint(h.clusterInfo.ID(), 16) + w.Header().Set("X-Etcd-Cluster-ID", cid) + + if r.URL.Path != membersPrefix { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + ms := h.clusterInfo.Members() + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(ms); err != nil { + log.Printf("etcdhttp: %v", err) + } +} diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go new file mode 100644 index 000000000..7f6138c70 --- /dev/null +++ b/etcdserver/etcdhttp/peer_test.go @@ -0,0 +1,264 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdhttp + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "path" + "strconv" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/raft/raftpb" +) + +func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte { + json, err := m.Marshal() + if err != nil { + t.Fatalf("error marshalling raft Message: %#v", err) + } + return json +} + +// errReader implements io.Reader to facilitate a broken request. +type errReader struct{} + +func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } + +func TestServeRaft(t *testing.T) { + testCases := []struct { + method string + body io.Reader + serverErr error + clusterID string + + wcode int + }{ + { + // bad method + "GET", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + nil, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "PUT", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + nil, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "DELETE", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + nil, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad request body + "POST", + &errReader{}, + nil, + "0", + http.StatusBadRequest, + }, + { + // bad request protobuf + "POST", + strings.NewReader("malformed garbage"), + nil, + "0", + http.StatusBadRequest, + }, + { + // good request, etcdserver.Server internal error + "POST", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + errors.New("some error"), + "0", + http.StatusInternalServerError, + }, + { + // good request from removed member + "POST", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + etcdserver.ErrRemoved, + "0", + http.StatusForbidden, + }, + { + // good request + "POST", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + nil, + "1", + http.StatusPreconditionFailed, + }, + { + // good request + "POST", + bytes.NewReader( + mustMarshalMsg( + t, + raftpb.Message{}, + ), + ), + nil, + "0", + http.StatusNoContent, + }, + } + for i, tt := range testCases { + req, err := http.NewRequest(tt.method, "foo", tt.body) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) + h := &serverHandler{ + timeout: time.Hour, + server: &errServer{tt.serverErr}, + clusterInfo: &fakeCluster{id: 0}, + } + rw := httptest.NewRecorder() + h.serveRaft(rw, req) + if rw.Code != tt.wcode { + t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +func TestServeMembersFails(t *testing.T) { + tests := []struct { + method string + wcode int + }{ + { + "POST", + http.StatusMethodNotAllowed, + }, + { + "DELETE", + http.StatusMethodNotAllowed, + }, + { + "BAD", + http.StatusMethodNotAllowed, + }, + } + for i, tt := range tests { + h := &serverHandler{} + rw := httptest.NewRecorder() + h.serveMembers(rw, &http.Request{Method: tt.method}) + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +func TestServeMembersGet(t *testing.T) { + memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} + memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} + cluster := &fakeCluster{ + id: 1, + members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, + } + h := &serverHandler{ + server: &serverRecorder{}, + clusterInfo: cluster, + } + + msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) + if err != nil { + t.Fatal(err) + } + wms := string(msb) + "\n" + + tests := []struct { + path string + wcode int + wct string + wbody string + }{ + {membersPrefix, http.StatusOK, "application/json", wms}, + {path.Join(membersPrefix, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"}, + } + + for i, tt := range tests { + req, err := http.NewRequest("GET", mustNewURL(t, tt.path).String(), nil) + if err != nil { + t.Fatal(err) + } + rw := httptest.NewRecorder() + h.serveMembers(rw, req) + + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + if gct := rw.Header().Get("Content-Type"); gct != tt.wct { + t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct) + } + if rw.Body.String() != tt.wbody { + t.Errorf("#%d: body = %s, want %s", i, rw.Body.String(), tt.wbody) + } + gcid := rw.Header().Get("X-Etcd-Cluster-ID") + wcid := strconv.FormatUint(cluster.ID(), 16) + if gcid != wcid { + t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) + } + } +}