From 31264e7eb517045e01632f22b9d78772e695be2d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 13 Oct 2014 21:06:03 -0700 Subject: [PATCH 1/3] etcdhttp: add PUT and DELETE on /v2/admin/members/ --- etcdserver/etcdhttp/http.go | 43 ++++++++ etcdserver/etcdhttp/http_test.go | 180 ++++++++++++++++++++++++++++++- etcdserver/server.go | 4 + 3 files changed, 223 insertions(+), 4 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index afc291070..62aa806b3 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -24,6 +24,7 @@ const ( keysPrefix = "/v2/keys" membersPrefix = "/v2/members" deprecatedMachinesPrefix = "/v2/machines" + adminMembersPrefix = "/v2/admin/members/" raftPrefix = "/raft" // time to wait for response from EtcdServer requests @@ -51,6 +52,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { // TODO: add serveMembers mux.HandleFunc(membersPrefix, sh.serveMachines) mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines) + mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers) mux.HandleFunc("/", http.NotFound) return mux } @@ -118,6 +120,47 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { w.Write([]byte(strings.Join(endpoints, ", "))) } +func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r.Method, "PUT", "DELETE") { + return + } + ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) + defer cancel() + idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix) + id, err := strconv.ParseUint(idStr, 16, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + switch r.Method { + case "PUT": + r.ParseForm() + peerURLs := r.PostForm["PeerURLs"] + log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs) + m := etcdserver.Member{ + ID: id, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: peerURLs, + }, + } + if err := h.server.AddMember(ctx, m); err != nil { + log.Printf("etcdhttp: error adding node %x: %v", id, err) + writeError(w, err) + return + } + w.WriteHeader(http.StatusCreated) + case "DELETE": + log.Printf("etcdhttp: remove node %x", id) + if err := h.server.RemoveMember(ctx, id); err != nil { + log.Printf("etcdhttp: error removing node %x: %v", id, err) + writeError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "POST") { return diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 48479df0c..a8ca721a0 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -703,7 +703,7 @@ func TestAllowMethod(t *testing.T) { } // errServer implements the etcd.Server interface for testing. -// It returns the given error from any Do/Process calls. +// It returns the given error from any Do/Process/AddMember/RemoveMember calls. type errServer struct { err error } @@ -716,6 +716,12 @@ func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { } func (fs *errServer) Start() {} func (fs *errServer) Stop() {} +func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { + return fs.err +} +func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { + return fs.err +} // errReader implements io.Reader to facilitate a broken request. type errReader struct{} @@ -839,9 +845,11 @@ type resServer struct { func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } func mustMarshalEvent(t *testing.T, ev *store.Event) string { b := new(bytes.Buffer) @@ -1234,6 +1242,170 @@ func TestHandleWatchStreaming(t *testing.T) { } } +func TestServeAdminMembersFail(t *testing.T) { + tests := []struct { + req *http.Request + server etcdserver.Server + + wcode int + }{ + { + // bad method + &http.Request{ + Method: "CONNECT", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + }, + { + // bad method + &http.Request{ + Method: "TRACE", + }, + &resServer{}, + + http.StatusMethodNotAllowed, + }, + { + // parse id error + &http.Request{ + URL: mustNewURL(t, path.Join(adminMembersPrefix, "badID")), + Method: "PUT", + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.AddMember error + &http.Request{ + URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), + Method: "PUT", + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.RemoveMember error + &http.Request{ + URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), + Method: "DELETE", + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + } + for i, tt := range tests { + h := &serverHandler{ + server: tt.server, + } + rw := httptest.NewRecorder() + h.serveAdminMembers(rw, tt.req) + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +type action struct { + name string + params []interface{} +} + +type serverRecorder struct { + actions []action +} + +func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { + s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}}) + return etcdserver.Response{}, nil +} +func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error { + s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}}) + return nil +} +func (s *serverRecorder) Start() {} +func (s *serverRecorder) Stop() {} +func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}}) + return nil +} +func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { + s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}}) + return nil +} + +func TestServeAdminMembersPut(t *testing.T) { + u := mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")) + form := url.Values{"PeerURLs": []string{"http://a", "http://b"}} + body := strings.NewReader(form.Encode()) + req, err := http.NewRequest("PUT", u.String(), body) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + s := &serverRecorder{} + h := &serverHandler{ + server: s, + } + rw := httptest.NewRecorder() + + h.serveAdminMembers(rw, req) + + wcode := http.StatusCreated + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + g := rw.Body.String() + if g != "" { + t.Errorf("got body=%q, want %q", g, "") + } + wm := etcdserver.Member{ + ID: 0xBEEF, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: []string{"http://a", "http://b"}, + }, + } + wactions := []action{{name: "AddMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + +func TestServeAdminMembersDelete(t *testing.T) { + req := &http.Request{ + Method: "DELETE", + URL: mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")), + } + s := &serverRecorder{} + h := &serverHandler{ + server: s, + } + rw := httptest.NewRecorder() + + h.serveAdminMembers(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + g := rw.Body.String() + if g != "" { + t.Errorf("got body=%q, want %q", g, "") + } + wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + type fakeCluster struct { members []etcdserver.Member } diff --git a/etcdserver/server.go b/etcdserver/server.go index 0f4727776..d1ce7ca48 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -74,6 +74,10 @@ type Server interface { // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. Process(ctx context.Context, m raftpb.Message) error + // AddMember attempts to add a member into the cluster. + AddMember(ctx context.Context, memb Member) error + // RemoveMember attempts to remove a member from the cluster. + RemoveMember(ctx context.Context, id uint64) error } type RaftTimer interface { From 99e35554c0a633124de016ecb967e398062929f8 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 14 Oct 2014 11:09:14 -0700 Subject: [PATCH 2/3] etcdhttp: add doc for member management http endpoint --- etcdserver/etcdhttp/doc.go | 67 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 etcdserver/etcdhttp/doc.go diff --git a/etcdserver/etcdhttp/doc.go b/etcdserver/etcdhttp/doc.go new file mode 100644 index 000000000..bcd7ef405 --- /dev/null +++ b/etcdserver/etcdhttp/doc.go @@ -0,0 +1,67 @@ +// Copyright 2014 CoreOS Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + +## Add Member + +### Request + +`curl http://${remote_client_url}/v2/admin/members/{$id} -XPUT -d 'PeerURLs=${peer_url_1}&PeerURLs=${peer_url_2}'` + +Parameter `remote_client_url` is serving client url of the cluster. +Parameter `id` is the identification of new member in hexadecimal. +Parameter `peer_url_` is peer urls of the new member. + +### Response + +Categorized by HTTP status code. + +#### HTTP 201 + +The member is created successfully. + +#### HTTP 400 + +etcd cannot parse out the request. + +#### HTTP 500 + +etcd fails to create the new member. + +## Remove Member + +### Request + +`curl http://${remote_client_url}/v2/admin/members/{$id} -XDELETE` + +Parameter `remote_client_url` is serving client url of the cluster. +Parameter `id` is the identification of member to be removed in hexadecimal. + +### Response + +#### HTTP 204 + +The member is removed successfully. + +#### HTTP 400 + +etcd cannot parse out the request. + +#### HTTP 500 + +etcd fails to remove the member. + +*/ +package etcdhttp From 6d0658c8caca56cb48be147a0984a95632b6fb9b Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 14 Oct 2014 14:50:11 -0700 Subject: [PATCH 3/3] etcdhttp: check error returned by ParseForm --- etcdserver/etcdhttp/http.go | 5 ++++- etcdserver/etcdhttp/http_test.go | 12 ++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 62aa806b3..7202e2be0 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -135,7 +135,10 @@ func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) switch r.Method { case "PUT": - r.ParseForm() + if err := r.ParseForm(); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } peerURLs := r.PostForm["PeerURLs"] log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs) m := etcdserver.Member{ diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index a8ca721a0..ffb728fb9 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -1277,11 +1278,22 @@ func TestServeAdminMembersFail(t *testing.T) { http.StatusBadRequest, }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), + Method: "PUT", + }, + &resServer{}, + + http.StatusBadRequest, + }, { // etcdserver.AddMember error &http.Request{ URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")), Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader("")), }, &errServer{ errors.New("blah"),