diff --git a/server/server.go b/server/server.go index f1adcbf86..984927110 100644 --- a/server/server.go +++ b/server/server.go @@ -6,10 +6,12 @@ import ( "net/http" "net/url" "strings" + "time" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/server/v1" + "github.com/coreos/etcd/server/v2" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "github.com/gorilla/mux" @@ -44,12 +46,24 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI peerServer: peerServer, } - // Install the routes for each version of the API. + // Install the routes. + s.handleFunc("/version", s.GetVersionHandler).Methods("GET") s.installV1() + s.installV2() return s } +// The current state of the server in the cluster. +func (s *Server) State() string { + return s.peerServer.State() +} + +// The node name of the leader in the cluster. +func (s *Server) Leader() string { + return s.peerServer.Leader() +} + // The current Raft committed index. func (s *Server) CommitIndex() uint64 { return s.peerServer.CommitIndex() @@ -65,6 +79,11 @@ func (s *Server) URL() string { return s.url } +// Retrives the Peer URL for a given node name. +func (s *Server) PeerURL(name string) (string, bool) { + return s.registry.PeerURL(name) +} + // Returns a reference to the Store. func (s *Server) Store() *store.Store { return s.store @@ -77,7 +96,21 @@ func (s *Server) installV1() { s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") s.handleFunc("/v1/leader", s.GetLeaderHandler).Methods("GET") s.handleFunc("/v1/machines", s.GetMachinesHandler).Methods("GET") - s.handleFunc("/v1/stats", s.GetStatsHandler).Methods("GET") + s.handleFunc("/v1/stats/self", s.GetStatsHandler).Methods("GET") + s.handleFunc("/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET") + s.handleFunc("/v1/stats/store", s.GetStoreStatsHandler).Methods("GET") +} + +func (s *Server) installV2() { + s.handleFuncV2("/v2/keys/{key:.*}", v2.GetKeyHandler).Methods("GET") + s.handleFuncV2("/v2/keys/{key:.*}", v2.CreateKeyHandler).Methods("POST") + s.handleFuncV2("/v2/keys/{key:.*}", v2.UpdateKeyHandler).Methods("PUT") + s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteKeyHandler).Methods("DELETE") + s.handleFunc("/v2/leader", s.GetLeaderHandler).Methods("GET") + s.handleFunc("/v2/machines", s.GetMachinesHandler).Methods("GET") + s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET") + s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET") + s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET") } // Adds a v1 server handler to the router. @@ -87,6 +120,13 @@ func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Req }) } +// Adds a v2 server handler to the router. +func (s *Server) handleFuncV2(path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route { + return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error { + return f(w, req, s) + }) +} + // Adds a server handler to the router. func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route { r := s.Handler.(*mux.Router) @@ -181,6 +221,13 @@ func (s *Server) OriginAllowed(origin string) bool { return s.corsOrigins["*"] || s.corsOrigins[origin] } +// Handler to return the current version of etcd. +func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "etcd %s", releaseVersion) + return nil +} + // Handler to return the current leader's raft address func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { leader := s.peerServer.Leader() @@ -228,3 +275,30 @@ func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) w.Write(s.store.JsonStats()) return nil } + +// Executes a speed test to evaluate the performance of update replication. +func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error { + count := 1000 + c := make(chan bool, count) + for i := 0; i < count; i++ { + go func() { + for j := 0; j < 10; j++ { + c := &store.UpdateCommand{ + Key: "foo", + Value: "bar", + ExpireTime: time.Unix(0, 0), + } + s.peerServer.Do(c) + } + c <- true + }() + } + + for i := 0; i < count; i++ { + <-c + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("speed test success")) + return nil +} diff --git a/server/v2/create_key_handler.go b/server/v2/create_key_handler.go new file mode 100644 index 000000000..8a9fbf50e --- /dev/null +++ b/server/v2/create_key_handler.go @@ -0,0 +1,29 @@ +package v2 + +import ( + "net/http" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" +) + +func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { + vars := mux.Vars(req) + key := "/" + vars["key"] + + value := req.FormValue("value") + expireTime, err := store.TTL(req.FormValue("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) + } + + c := &store.CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + IncrementalSuffix: (req.FormValue("incremental") == "true"), + } + + return s.Dispatch(c, w, req) +} diff --git a/server/v2/delete_key_handler.go b/server/v2/delete_key_handler.go new file mode 100644 index 000000000..e3bdf2b2d --- /dev/null +++ b/server/v2/delete_key_handler.go @@ -0,0 +1,20 @@ +package v2 + +import ( + "net/http" + + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" +) + +func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { + vars := mux.Vars(req) + key := "/" + vars["key"] + + c := &store.DeleteCommand{ + Key: key, + Recursive: (req.FormValue("recursive") == "true"), + } + + return s.Dispatch(c, w, req) +} diff --git a/server/v2/get_key_handler.go b/server/v2/get_key_handler.go new file mode 100644 index 000000000..e4d9b7207 --- /dev/null +++ b/server/v2/get_key_handler.go @@ -0,0 +1,69 @@ +package v2 + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" +) + +func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { + var err error + var event *store.Event + + vars := mux.Vars(req) + key := "/" + vars["key"] + + // Help client to redirect the request to the current leader + if req.FormValue("consistent") == "true" && s.State() != raft.Leader { + leader := s.Leader() + hostname, _ := s.PeerURL(leader) + url := hostname + req.URL.Path + log.Debugf("Redirect to %s", url) + http.Redirect(w, req, url, http.StatusTemporaryRedirect) + return nil + } + + recursive := (req.FormValue("recursive") == "true") + sorted := (req.FormValue("sorted") == "true") + + if req.FormValue("wait") == "true" { // watch + // Create a command to watch from a given index (default 0). + var sinceIndex uint64 = 0 + if req.Method == "POST" { + sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) + } + } + + // Start the watcher on the store. + c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term()) + if err != nil { + return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) + } + event = <-c + + } else { //get + // Retrieve the key from the store. + event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term()) + if err != nil { + return err + } + } + + w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) + w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) + w.WriteHeader(http.StatusOK) + + b, _ := json.Marshal(event) + w.Write(b) + + return nil +} diff --git a/server/v2/handlers.go b/server/v2/handlers.go deleted file mode 100644 index d49542005..000000000 --- a/server/v2/handlers.go +++ /dev/null @@ -1,336 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - "strings" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" -) - -func NewEtcdMuxer() *http.ServeMux { - // external commands - router := mux.NewRouter() - etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer)) - etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler)) - etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler)) - etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler)) - etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler)) - etcdMux.HandleFunc("/test/", TestHttpHandler) - - // backward support - etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1)) - etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler)) - etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler)) - etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler)) - - return etcdMux -} - -type errorHandler func(http.ResponseWriter, *http.Request) error - -// addCorsHeader parses the request Origin header and loops through the user -// provided allowed origins and sets the Access-Control-Allow-Origin header if -// there is a match. -func addCorsHeader(w http.ResponseWriter, r *http.Request) { - val, ok := corsList["*"] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", "*") - return - } - - requestOrigin := r.Header.Get("Origin") - val, ok = corsList[requestOrigin] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", requestOrigin) - return - } -} - -func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - addCorsHeader(w, r) - if e := fn(w, r); e != nil { - if etcdErr, ok := e.(*etcdErr.Error); ok { - debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, e.Error(), http.StatusInternalServerError) - } - } -} - -// Multiplex GET/POST/DELETE request to corresponding handlers -func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error { - - switch req.Method { - case "GET": - return e.GetHttpHandler(w, req) - case "POST": - return e.CreateHttpHandler(w, req) - case "PUT": - return e.UpdateHttpHandler(w, req) - case "DELETE": - return e.DeleteHttpHandler(w, req) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - return nil - } - - return nil -} - -//-------------------------------------- -// State sensitive handlers -// Set/Delete will dispatch to leader -//-------------------------------------- - -func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) - - debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - - value := req.FormValue("value") - - expireTime, err := store.TTL(req.FormValue("ttl")) - - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) - } - - command := &CreateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } - - if req.FormValue("incremental") == "true" { - command.IncrementalSuffix = true - } - - return e.dispatchEtcdCommand(command, w, req) - -} - -func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) - - debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - - req.ParseForm() - - value := req.Form.Get("value") - - expireTime, err := store.TTL(req.Form.Get("ttl")) - - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) - } - - // update should give at least one option - if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) - } - - prevValue, valueOk := req.Form["prevValue"] - - prevIndexStr, indexOk := req.Form["prevIndex"] - - if !valueOk && !indexOk { // update without test - command := &UpdateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } - - return e.dispatchEtcdCommand(command, w, req) - - } else { // update with test - var prevIndex uint64 - - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) - - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) - } - } else { - prevIndex = 0 - } - - command := &TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue[0], - PrevIndex: prevIndex, - } - - return e.dispatchEtcdCommand(command, w, req) - } - -} - -// Delete Handler -func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) - - debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - - command := &DeleteCommand{ - Key: key, - } - - if req.FormValue("recursive") == "true" { - command.Recursive = true - } - - return e.dispatchEtcdCommand(command, w, req) -} - -// Dispatch the command to leader -func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error { - return e.raftServer.dispatch(c, w, req, nameToEtcdURL) -} - -//-------------------------------------- -// State non-sensitive handlers -// command with consistent option will -// still dispatch to the leader -//-------------------------------------- - -// Handler to return the current version of etcd -func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "etcd %s", releaseVersion) - - return nil -} - -func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error { - var err error - var event interface{} - - r := e.raftServer - - debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - - if req.FormValue("consistent") == "true" && r.State() != raft.Leader { - // help client to redirect the request to the current leader - leader := r.Leader() - hostname, _ := nameToEtcdURL(leader) - redirect(hostname, w, req) - return nil - } - - key := getNodePath(req.URL.Path) - - recursive := req.FormValue("recursive") - - if req.FormValue("wait") == "true" { // watch - command := &WatchCommand{ - Key: key, - } - - if recursive == "true" { - command.Recursive = true - } - - indexStr := req.FormValue("wait_index") - if indexStr != "" { - sinceIndex, err := strconv.ParseUint(indexStr, 10, 64) - - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) - } - - command.SinceIndex = sinceIndex - } - - event, err = command.Apply(r.Server) - - } else { //get - - command := &GetCommand{ - Key: key, - } - - sorted := req.FormValue("sorted") - if sorted == "true" { - command.Sorted = true - } - - if recursive == "true" { - command.Recursive = true - } - - event, err = command.Apply(r.Server) - } - - if err != nil { - return err - - } else { - event, _ := event.(*store.Event) - bytes, _ := json.Marshal(event) - - w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) - w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) - w.WriteHeader(http.StatusOK) - - w.Write(bytes) - - return nil - } - -} - -func getNodePath(urlPath string) string { - pathPrefixLen := len("/" + version + "/keys") - return urlPath[pathPrefixLen:] -} - - -//-------------------------------------- -// Testing -//-------------------------------------- - -// TestHandler -func TestHttpHandler(w http.ResponseWriter, req *http.Request) { - testType := req.URL.Path[len("/test/"):] - - if testType == "speed" { - directSet() - w.WriteHeader(http.StatusOK) - w.Write([]byte("speed test success")) - - return - } - - w.WriteHeader(http.StatusBadRequest) -} - -func directSet() { - c := make(chan bool, 1000) - for i := 0; i < 1000; i++ { - go send(c) - } - - for i := 0; i < 1000; i++ { - <-c - } -} - -func send(c chan bool) { - for i := 0; i < 10; i++ { - command := &UpdateCommand{} - command.Key = "foo" - command.Value = "bar" - command.ExpireTime = time.Unix(0, 0) - //r.Do(command) - } - c <- true -} diff --git a/server/v2/update_key_handler.go b/server/v2/update_key_handler.go new file mode 100644 index 000000000..64e60cca5 --- /dev/null +++ b/server/v2/update_key_handler.go @@ -0,0 +1,64 @@ +package v2 + +import ( + "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" +) + +func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { + vars := mux.Vars(req) + key := "/" + vars["key"] + + req.ParseForm() + + value := req.Form.Get("value") + expireTime, err := store.TTL(req.Form.Get("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) + } + + // Update should give at least one option + if value == "" && expireTime.Sub(store.Permanent) == 0 { + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) + } + + prevValue, valueOk := req.Form["prevValue"] + prevIndexStr, indexOk := req.Form["prevIndex"] + + var c raft.Command + if !valueOk && !indexOk { // update without test + c = &store.UpdateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } + + } else { // update with test + var prevIndex uint64 + + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) + } + } else { + prevIndex = 0 + } + + c = &store.TestAndSetCommand{ + Key: key, + Value: value, + PrevValue: prevValue[0], + PrevIndex: prevIndex, + } + } + + return s.Dispatch(c, w, req) +} diff --git a/server/v2/v2.go b/server/v2/v2.go new file mode 100644 index 000000000..439f6078e --- /dev/null +++ b/server/v2/v2.go @@ -0,0 +1,18 @@ +package v2 + +import ( + "net/http" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +// The Server interface provides all the methods required for the v2 API. +type Server interface { + State() string + Leader() string + CommitIndex() uint64 + Term() uint64 + PeerURL(string) (string, bool) + Store() *store.Store + Dispatch(raft.Command, http.ResponseWriter, *http.Request) error +}