package etcdhttp import ( "encoding/json" "errors" "fmt" "io/ioutil" "log" "net/http" "net/url" "strconv" "strings" "time" "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) const ( keysPrefix = "/v2/keys" deprecatedMachinesPrefix = "/v2/machines" adminMembersPrefix = "/v2/admin/members/" raftPrefix = "/raft" statsPrefix = "/v2/stats" // time to wait for response from EtcdServer requests defaultServerTimeout = 500 * time.Millisecond // time to wait for a Watch request defaultWatchTimeout = 5 * time.Minute ) var errClosed = errors.New("etcdhttp: client closed connection") // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { sh := &serverHandler{ server: server, clusterStore: server.ClusterStore, stats: server, timer: server, timeout: defaultServerTimeout, } mux := http.NewServeMux() mux.HandleFunc(keysPrefix, sh.serveKeys) mux.HandleFunc(keysPrefix+"/", sh.serveKeys) mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats) mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats) mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats) // TODO: dynamic configuration may make this outdated. take care of it. // TODO: dynamic configuration may introduce race also. // TODO: add serveMembers mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines) mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers) mux.HandleFunc("/", http.NotFound) return mux } // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { sh := &serverHandler{ server: server, stats: server, } mux := http.NewServeMux() mux.HandleFunc(raftPrefix, sh.serveRaft) mux.HandleFunc("/", http.NotFound) return mux } // serverHandler provides http.Handlers for etcd client and raft communication. type serverHandler struct { timeout time.Duration server etcdserver.Server stats etcdserver.ServerStats storestats etcdserver.StoreStats timer etcdserver.RaftTimer clusterStore etcdserver.ClusterStore } func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") { return } ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() rr, err := parseRequest(r, etcdserver.GenID()) if err != nil { writeError(w, err) return } resp, err := h.server.Do(ctx, rr) if err != nil { writeError(w, err) return } switch { case resp.Event != nil: if err := writeEvent(w, resp.Event, h.timer); err != nil { // Should never be reached log.Printf("error writing event: %v", err) } case resp.Watcher != nil: ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) defer cancel() handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) default: writeError(w, errors.New("received response with no Event/Watcher!")) } } // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } endpoints := h.clusterStore.Get().ClientURLs() w.Write([]byte(strings.Join(endpoints, ", "))) } func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "PUT", "DELETE") { return } ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) defer cancel() idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix) id, err := strconv.ParseUint(idStr, 16, 64) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } switch r.Method { case "PUT": if err := r.ParseForm(); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } peerURLs := r.PostForm["PeerURLs"] log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs) m := etcdserver.Member{ ID: id, RaftAttributes: etcdserver.RaftAttributes{ PeerURLs: peerURLs, }, } if err := h.server.AddMember(ctx, m); err != nil { log.Printf("etcdhttp: error adding node %x: %v", id, err) writeError(w, err) return } w.WriteHeader(http.StatusCreated) case "DELETE": log.Printf("etcdhttp: remove node %x", id) if err := h.server.RemoveMember(ctx, id); err != nil { log.Printf("etcdhttp: error removing node %x: %v", id, err) writeError(w, err) return } w.WriteHeader(http.StatusNoContent) } } func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } w.Header().Set("Content-Type", "application/json") w.Write(h.storestats.JSON()) } func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } s := h.stats.SelfStats() b, err := json.Marshal(s) if err != nil { log.Printf("error marshalling stats: %v\n", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(b) } func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } s := h.stats.LeaderStats() b, err := json.Marshal(s) if err != nil { log.Printf("error marshalling stats: %v\n", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(b) } func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "POST") { return } b, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("etcdhttp: error reading raft message:", err) http.Error(w, "error reading raft message", http.StatusBadRequest) return } var m raftpb.Message if err := m.Unmarshal(b); err != nil { log.Println("etcdhttp: error unmarshaling raft message:", err) http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) return } log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m) if m.Type == raftpb.MsgApp { // TODO(jonboulle): standardize id uint-->string process: always base 16? h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength)) } if err := h.server.Process(context.TODO(), m); err != nil { log.Println("etcdhttp: error processing raft message:", err) writeError(w, err) return } w.WriteHeader(http.StatusNoContent) } // parseRequest converts a received http.Request to a server Request, // performing validation of supplied fields as appropriate. // If any validation fails, an empty Request and non-nil error is returned. func parseRequest(r *http.Request, id uint64) (etcdserverpb.Request, error) { emptyReq := etcdserverpb.Request{} err := r.ParseForm() if err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidForm, err.Error(), ) } if !strings.HasPrefix(r.URL.Path, keysPrefix) { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidForm, "incorrect key prefix", ) } p := r.URL.Path[len(keysPrefix):] var pIdx, wIdx uint64 if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeIndexNaN, `invalid value for "prevIndex"`, ) } if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeIndexNaN, `invalid value for "waitIndex"`, ) } var rec, sort, wait, dir, stream bool if rec, err = getBool(r.Form, "recursive"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `invalid value for "recursive"`, ) } if sort, err = getBool(r.Form, "sorted"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `invalid value for "sorted"`, ) } if wait, err = getBool(r.Form, "wait"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `invalid value for "wait"`, ) } // TODO(jonboulle): define what parameters dir is/isn't compatible with? if dir, err = getBool(r.Form, "dir"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `invalid value for "dir"`, ) } if stream, err = getBool(r.Form, "stream"); err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `invalid value for "stream"`, ) } if wait && r.Method != "GET" { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `"wait" can only be used with GET requests`, ) } pV := r.FormValue("prevValue") if _, ok := r.Form["prevValue"]; ok && pV == "" { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, `"prevValue" cannot be empty`, ) } // TTL is nullable, so leave it null if not specified // or an empty string var ttl *uint64 if len(r.FormValue("ttl")) > 0 { i, err := getUint64(r.Form, "ttl") if err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeTTLNaN, `invalid value for "ttl"`, ) } ttl = &i } // prevExist is nullable, so leave it null if not specified var pe *bool if _, ok := r.Form["prevExist"]; ok { bv, err := getBool(r.Form, "prevExist") if err != nil { return emptyReq, etcdErr.NewRequestError( etcdErr.EcodeInvalidField, "invalid value for prevExist", ) } pe = &bv } rr := etcdserverpb.Request{ ID: id, Method: r.Method, Path: p, Val: r.FormValue("value"), Dir: dir, PrevValue: pV, PrevIndex: pIdx, PrevExist: pe, Recursive: rec, Since: wIdx, Sorted: sort, Stream: stream, Wait: wait, } if pe != nil { rr.PrevExist = pe } // Null TTL is equivalent to unset Expiration // TODO(jonboulle): use fake clock instead of time module // https://github.com/coreos/etcd/issues/1021 if ttl != nil { expr := time.Duration(*ttl) * time.Second rr.Expiration = time.Now().Add(expr).UnixNano() } return rr, nil } // getUint64 extracts a uint64 by the given key from a Form. If the key does // not exist in the form, 0 is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the // first is considered. func getUint64(form url.Values, key string) (i uint64, err error) { if vals, ok := form[key]; ok { i, err = strconv.ParseUint(vals[0], 10, 64) } return } // getBool extracts a bool by the given key from a Form. If the key does not // exist in the form, false is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the // first is considered. func getBool(form url.Values, key string) (b bool, err error) { if vals, ok := form[key]; ok { b, err = strconv.ParseBool(vals[0]) } return } // writeError logs and writes the given Error to the ResponseWriter // If Error is an etcdErr, it is rendered to the ResponseWriter // Otherwise, it is assumed to be an InternalServerError func writeError(w http.ResponseWriter, err error) { if err == nil { return } log.Println(err) if e, ok := err.(*etcdErr.Error); ok { e.Write(w) } else { http.Error(w, "Internal Server Error", http.StatusInternalServerError) } } // writeEvent serializes a single Event and writes the resulting // JSON to the given ResponseWriter, along with the appropriate // headers func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error { if ev == nil { return errors.New("cannot write empty Event!") } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) if ev.IsCreated() { w.WriteHeader(http.StatusCreated) } return json.NewEncoder(w).Encode(ev) } func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { defer wa.Remove() ech := wa.EventChan() var nch <-chan bool if x, ok := w.(http.CloseNotifier); ok { nch = x.CloseNotify() } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) w.WriteHeader(http.StatusOK) // Ensure headers are flushed early, in case of long polling w.(http.Flusher).Flush() for { select { case <-nch: // Client closed connection. Nothing to do. return case <-ctx.Done(): // Timed out. net/http will close the connection for us, so nothing to do. return case ev, ok := <-ech: if !ok { // If the channel is closed this may be an indication of // that notifications are much more than we are able to // send to the client in time. Then we simply end streaming. return } if err := json.NewEncoder(w).Encode(ev); err != nil { // Should never be reached log.Println("error writing event: %v", err) return } if !stream { return } w.(http.Flusher).Flush() } } } // allowMethod verifies that the given method is one of the allowed methods, // and if not, it writes an error to w. A boolean is returned indicating // whether or not the method is allowed. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool { for _, meth := range ms { if m == meth { return true } } w.Header().Set("Allow", strings.Join(ms, ",")) http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return false }