From cf2d6888c2b247280d06b3f456555a1de14e21ca Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 17 Aug 2013 20:41:15 -0700 Subject: [PATCH] add error package --- command.go | 4 +- error.go => error/error.go | 35 +++++-- etcd_handlers.go | 96 +++++++------------ raft_handlers.go | 6 +- raft_server.go | 19 ++-- store/error.go | 25 ----- store/store.go | 17 ++-- .../coreos/go-etcd/examples/mutex/mutex.go | 1 - .../coreos/go-etcd/examples/speed/speed.go | 14 +-- web/web.go | 6 +- 10 files changed, 95 insertions(+), 128 deletions(-) rename error.go => error/error.go (65%) delete mode 100644 store/error.go diff --git a/command.go b/command.go index 46da7329d..480d9db70 100644 --- a/command.go +++ b/command.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "path" @@ -147,7 +148,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // check machine number in the cluster num := machineNum() if num == maxClusterSize { - return []byte("join fail"), fmt.Errorf(errors[103]) + debug("Reject join request from ", c.Name) + return []byte("join fail"), etcdErr.NewError(103, "") } addNameToURL(c.Name, c.RaftURL, c.EtcdURL) diff --git a/error.go b/error/error.go similarity index 65% rename from error.go rename to error/error.go index f4150676e..dc209f208 100644 --- a/error.go +++ b/error/error.go @@ -1,11 +1,14 @@ -package main +package error import ( "encoding/json" + "net/http" ) var errors map[int]string +const () + func init() { errors = make(map[int]string) @@ -33,21 +36,39 @@ func init() { } -type etcdError struct { +type Error struct { ErrorCode int `json:"errorCode"` Message string `json:"message"` Cause string `json:"cause,omitempty"` } -func newEtcdError(errorCode int, cause string) *etcdError { - return &etcdError{ +func NewError(errorCode int, cause string) Error { + return Error{ ErrorCode: errorCode, Message: errors[errorCode], Cause: cause, } } -func (e *etcdError) toJson() []byte { - b, _ := json.Marshal(e) - return b +func Message(code int) string { + return errors[code] +} + +// Only for error interface +func (e Error) Error() string { + return e.Message +} + +func (e Error) toJsonString() string { + b, _ := json.Marshal(e) + return string(b) +} + +func (e Error) Write(w http.ResponseWriter) { + // 3xx is reft internal error + if e.ErrorCode/100 == 3 { + http.Error(w, e.toJsonString(), http.StatusInternalServerError) + } else { + http.Error(w, e.toJsonString(), http.StatusBadRequest) + } } diff --git a/etcd_handlers.go b/etcd_handlers.go index 703db4a4c..951328841 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -2,6 +2,7 @@ package main import ( "fmt" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "net/http" @@ -16,31 +17,32 @@ import ( func NewEtcdMuxer() *http.ServeMux { // external commands etcdMux := http.NewServeMux() - etcdMux.Handle("/"+version+"/keys/", etcdHandler(Multiplexer)) - etcdMux.Handle("/"+version+"/watch/", etcdHandler(WatchHttpHandler)) - etcdMux.Handle("/leader", etcdHandler(LeaderHttpHandler)) - etcdMux.Handle("/machines", etcdHandler(MachinesHttpHandler)) - etcdMux.Handle("/version", etcdHandler(VersionHttpHandler)) - etcdMux.Handle("/stats", etcdHandler(StatsHttpHandler)) + etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer)) + etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) + etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) + etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) + etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) return etcdMux } -type etcdHandler func(http.ResponseWriter, *http.Request) *etcdError +type errorHandler func(http.ResponseWriter, *http.Request) error -func (fn etcdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if e := fn(w, r); e != nil { - // 3xx is reft internal error - if e.ErrorCode/100 == 3 { - http.Error(w, string(e.toJson()), http.StatusInternalServerError) + fmt.Println(e) + if etcdErr, ok := e.(etcdErr.Error); ok { + debug("Return error: ", etcdErr.Error()) + etcdErr.Write(w) } else { - http.Error(w, string(e.toJson()), http.StatusBadRequest) + http.Error(w, e.Error(), http.StatusInternalServerError) } } } // Multiplex GET/POST/DELETE request to corresponding handlers -func Multiplexer(w http.ResponseWriter, req *http.Request) *etcdError { +func Multiplexer(w http.ResponseWriter, req *http.Request) error { switch req.Method { case "GET": @@ -61,11 +63,11 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) *etcdError { //-------------------------------------- // Set Command Handler -func SetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] if store.CheckKeyword(key) { - return newEtcdError(400, "Set") + return etcdErr.NewError(400, "Set") } debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -73,7 +75,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { value := req.FormValue("value") if len(value) == 0 { - return newEtcdError(200, "Set") + return etcdErr.NewError(200, "Set") } prevValue := req.FormValue("prevValue") @@ -83,7 +85,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { expireTime, err := durationToExpireTime(strDuration) if err != nil { - return newEtcdError(202, "Set") + return etcdErr.NewError(202, "Set") } if len(prevValue) != 0 { @@ -108,7 +110,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } // Delete Handler -func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -121,35 +123,14 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } // Dispatch the command to leader -func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) *etcdError { +func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error { if r.State() == raft.Leader { if body, err := r.Do(c); err != nil { - - // store error - if _, ok := err.(store.NotFoundError); ok { - return newEtcdError(100, err.Error()) - } - - if _, ok := err.(store.TestFail); ok { - return newEtcdError(101, err.Error()) - } - - if _, ok := err.(store.NotFile); ok { - return newEtcdError(102, err.Error()) - } - - // join error - if err.Error() == errors[103] { - return newEtcdError(103, "") - } - - // raft internal error - return newEtcdError(300, err.Error()) - + return err } else { if body == nil { - return newEtcdError(300, "Empty result from raft") + return etcdErr.NewError(300, "Empty result from raft") } else { body, _ := body.([]byte) w.WriteHeader(http.StatusOK) @@ -162,7 +143,7 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) *e leader := r.Leader() // current no leader if leader == "" { - return newEtcdError(300, "") + return etcdErr.NewError(300, "") } // tell the client where is the leader @@ -183,7 +164,7 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) *e http.Redirect(w, req, url, http.StatusTemporaryRedirect) return nil } - return newEtcdError(300, "") + return etcdErr.NewError(300, "") } //-------------------------------------- @@ -194,7 +175,7 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) *e //-------------------------------------- // Handler to return the current leader's raft address -func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { leader := r.Leader() if leader != "" { @@ -203,12 +184,12 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { w.Write([]byte(raftURL)) return nil } else { - return newEtcdError(301, "") + return etcdErr.NewError(301, "") } } // Handler to return all the known machines in the current cluster -func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { machines := getMachines() w.WriteHeader(http.StatusOK) @@ -217,22 +198,21 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } // Handler to return the current version of etcd -func VersionHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "etcd %s", releaseVersion) - fmt.Fprintf(w, "etcd API %s", version) return nil } // Handler to return the basic stats of etcd -func StatsHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) w.Write(etcdStore.Stats()) return nil } // Get Handler -func GetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -242,13 +222,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } if body, err := command.Apply(r.Server); err != nil { - - if _, ok := err.(store.NotFoundError); ok { - return newEtcdError(100, err.Error()) - } - - return newEtcdError(300, "") - + return err } else { body, _ := body.([]byte) w.WriteHeader(http.StatusOK) @@ -260,7 +234,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } // Watch handler -func WatchHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { +func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/watch/"):] command := &WatchCommand{ @@ -279,7 +253,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { sinceIndex, err := strconv.ParseUint(string(content), 10, 64) if err != nil { - return newEtcdError(203, "Watch From Index") + return etcdErr.NewError(203, "Watch From Index") } command.SinceIndex = sinceIndex @@ -289,7 +263,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) *etcdError { } if body, err := command.Apply(r.Server); err != nil { - return newEtcdError(500, key) + return etcdErr.NewError(500, key) } else { w.WriteHeader(http.StatusOK) diff --git a/raft_handlers.go b/raft_handlers.go index 606afb1bb..30272d420 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -94,16 +94,16 @@ func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to the join request -func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { +func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { debugf("Receive Join Request from %s", command.Name) - dispatch(command, w, req, false) + return dispatch(command, w, req, false) } else { w.WriteHeader(http.StatusInternalServerError) - return + return nil } } diff --git a/raft_server.go b/raft_server.go index 00d07f59b..bad3732c8 100644 --- a/raft_server.go +++ b/raft_server.go @@ -5,11 +5,11 @@ import ( "crypto/tls" "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" "net/http" "net/url" "time" - - "github.com/coreos/go-raft" ) type raftServer struct { @@ -95,7 +95,7 @@ func (r *raftServer) ListenAndServe() { } err = joinCluster(r.Server, machine, r.tlsConf.Scheme) if err != nil { - if err.Error() == errors[103] { + if _, ok := err.(etcdErr.Error); ok { fatal(err) } debugf("cannot join to cluster via machine %s %s", machine, err) @@ -148,7 +148,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { // internal commands raftMux.HandleFunc("/name", NameHttpHandler) - raftMux.HandleFunc("/join", JoinHttpHandler) + raftMux.Handle("/join", errorHandler(JoinHttpHandler)) raftMux.HandleFunc("/vote", VoteHttpHandler) raftMux.HandleFunc("/log", GetLogHttpHandler) raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) @@ -171,11 +171,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { json.NewEncoder(&b).Encode(newJoinCommand()) // t must be ok - t, ok := r.Transporter().(transporter) - - if !ok { - panic("wrong type") - } + t, _ := r.Transporter().(transporter) joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} @@ -203,7 +199,10 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") - return fmt.Errorf(errors[103]) + decoder := json.NewDecoder(resp.Body) + err := &etcdErr.Error{} + decoder.Decode(err) + return *err } else { return fmt.Errorf("Unable to join") } diff --git a/store/error.go b/store/error.go deleted file mode 100644 index 2ad4b0aef..000000000 --- a/store/error.go +++ /dev/null @@ -1,25 +0,0 @@ -package store - -type NotFoundError string - -func (e NotFoundError) Error() string { - return string(e) -} - -type NotFile string - -func (e NotFile) Error() string { - return string(e) -} - -type TestFail string - -func (e TestFail) Error() string { - return string(e) -} - -type Keyword string - -func (e Keyword) Error() string { - return string(e) -} diff --git a/store/store.go b/store/store.go index 5447649c0..d37345f4d 100644 --- a/store/store.go +++ b/store/store.go @@ -3,6 +3,7 @@ package store import ( "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" "path" "strconv" "sync" @@ -239,8 +240,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde ok := s.Tree.set(key, Node{value, expireTime, update}) if !ok { - err := NotFile(key) - return nil, err + return nil, etcdErr.NewError(102, "set: "+key) } if isExpire { @@ -393,8 +393,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) { return resps, nil } - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "get: "+key) } func (s *Store) Delete(key string, index uint64) ([]byte, error) { @@ -451,8 +450,7 @@ func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { return msg, err } else { - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "delete: "+key) } } @@ -467,8 +465,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim resp := s.internalGet(key) if resp == nil { - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "testandset: "+key) } if resp.Value == prevValue { @@ -478,8 +475,8 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim } else { // If fails, return err - err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue)) - return nil, err + return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s", + resp.Value, prevValue)) } } diff --git a/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go b/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go index 6b9b24c3e..45131195c 100644 --- a/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go +++ b/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go @@ -17,7 +17,6 @@ func main() { c := etcd.NewClient() c.Set("lock", "unlock", 0) - for i := 0; i < 10; i++ { go t(i, ch, etcd.NewClient()) } diff --git a/third_party/github.com/coreos/go-etcd/examples/speed/speed.go b/third_party/github.com/coreos/go-etcd/examples/speed/speed.go index 97a6e9d02..e643e02b7 100644 --- a/third_party/github.com/coreos/go-etcd/examples/speed/speed.go +++ b/third_party/github.com/coreos/go-etcd/examples/speed/speed.go @@ -1,8 +1,8 @@ -package main +package main import ( - "github.com/coreos/go-etcd/etcd" "fmt" + "github.com/coreos/go-etcd/etcd" "time" ) @@ -11,21 +11,21 @@ var count = 0 func main() { ch := make(chan bool, 10) // set up a lock - for i:=0; i < 100; i++ { + for i := 0; i < 100; i++ { go t(i, ch, etcd.NewClient()) } start := time.Now() - for i:=0; i< 100; i++ { + for i := 0; i < 100; i++ { <-ch } - fmt.Println(time.Now().Sub(start), ": ", 100 * 50, "commands") + fmt.Println(time.Now().Sub(start), ": ", 100*50, "commands") } func t(num int, ch chan bool, c *etcd.Client) { c.SyncCluster() for i := 0; i < 50; i++ { - str := fmt.Sprintf("foo_%d",num * i) + str := fmt.Sprintf("foo_%d", num*i) c.Set(str, "10", 0) } - ch<-true + ch <- true } diff --git a/web/web.go b/web/web.go index 0cd2463c0..1ce9d3fe5 100644 --- a/web/web.go +++ b/web/web.go @@ -29,12 +29,12 @@ func Start(raftServer *raft.Server, webURL string) { webMux := http.NewServeMux() server := &http.Server{ - Handler: webMux, - Addr: u.Host, + Handler: webMux, + Addr: u.Host, } mainPage = &MainPage{ - Leader: raftServer.Leader(), + Leader: raftServer.Leader(), Address: u.Host, }