diff --git a/client_handlers.go b/client_handlers.go index b2a893330..6b1142b8e 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -131,7 +131,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) if body == nil { (*w).WriteHeader(http.StatusNotFound) - (*w).Write(newJsonError(100, err.Error())) + (*w).Write(newJsonError(300, "Empty result from raft")) } else { body, ok := body.([]byte) // this should not happen @@ -225,6 +225,18 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { } +// Handler to return the current version of etcd +func VersionHttpHandler(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(releaseVersion)) +} + +// Handler to return the basic stats of etcd +func StatsHttpHandler(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write(etcdStore.Stats()) +} + // Get Handler func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] diff --git a/command.go b/command.go index a0977ae83..aea0caddc 100644 --- a/command.go +++ b/command.go @@ -120,6 +120,13 @@ func (c *JoinCommand) CommandName() string { // Join a server to the cluster func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { + // check if the join command is from a previous machine, who lost all its previous log. + response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name)) + + if response != nil { + return []byte("join success"), nil + } + // check machine number in the cluster num := machineNum() if num == maxClusterSize { @@ -129,12 +136,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // add peer in raft err := raftServer.AddPeer(c.Name) - // add machine in etcd - addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort) - // add machine in etcd storage - nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex()) - key := path.Join("_etcd/machines", nodeName) + key := path.Join("_etcd/machines", c.Name) value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) diff --git a/error.go b/error.go index 6d774949f..86442311a 100644 --- a/error.go +++ b/error.go @@ -20,6 +20,7 @@ func init() { errors[201] = "PrevValue is Required in POST form" errors[202] = "The given TTL in POST form is not a number" errors[203] = "The given index in POST form is not a number" + // raft related errors errors[300] = "Raft Internal Error" errors[301] = "During Leader Election" diff --git a/etcd.go b/etcd.go index ea6f3ecaa..db8d8d45f 100644 --- a/etcd.go +++ b/etcd.go @@ -434,6 +434,8 @@ func startClientTransport(port int, st int) { http.HandleFunc("/"+version+"/watch/", WatchHttpHandler) http.HandleFunc("/leader", LeaderHttpHandler) http.HandleFunc("/machines", MachinesHttpHandler) + http.HandleFunc("/", VersionHttpHandler) + http.HandleFunc("/stats", StatsHttpHandler) switch st { diff --git a/machines.go b/machines.go index d3c0a855d..dc358a8e3 100644 --- a/machines.go +++ b/machines.go @@ -2,34 +2,26 @@ package main import ( "fmt" + "path" + "strings" ) -type machine struct { - hostname string - raftPort int - clientPort int -} - -var machinesMap = map[string]machine{} - -func addMachine(name string, hostname string, raftPort int, clientPort int) { - - machinesMap[name] = machine{hostname, raftPort, clientPort} - -} - func getClientAddr(name string) (string, bool) { - machine, ok := machinesMap[name] - if !ok { - return "", false - } + response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - addr := fmt.Sprintf("%s:%v", machine.hostname, machine.clientPort) + values := strings.Split(response[0].Value, ",") + + hostname := values[0] + clientPort := values[2] + + addr := fmt.Sprintf("%s:%s", hostname, clientPort) return addr, true } // machineNum returns the number of machines in the cluster func machineNum() int { - return len(machinesMap) + response, _ := etcdStore.RawGet("_etcd/machines") + + return len(response) } diff --git a/store/stats.go b/store/stats.go new file mode 100644 index 000000000..15b71e06e --- /dev/null +++ b/store/stats.go @@ -0,0 +1,25 @@ +package store + +import ( + "encoding/json" +) + +type EtcdStats struct { + // Number of get requests + Gets uint64 `json:"gets"` + + // Number of sets requests + Sets uint64 `json:"sets"` + + // Number of delete requests + Deletes uint64 `json:"deletes"` + + // Number of testAndSet requests + TestAndSets uint64 `json:"testAndSets"` +} + +// Stats returns the basic statistics information of etcd storage +func (s *Store) Stats() []byte { + b, _ := json.Marshal(s.BasicStats) + return b +} diff --git a/store/store.go b/store/store.go index 649ec617c..4afbfb868 100644 --- a/store/store.go +++ b/store/store.go @@ -42,6 +42,9 @@ type Store struct { // Current index of the raft machine Index uint64 + + // Basic statistics information of etcd storage + BasicStats EtcdStats } // A Node represents a Value in the Key-Value pair in the store @@ -139,6 +142,9 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64 //Update index s.Index = index + //Update stats + s.BasicStats.Sets++ + key = path.Clean("/" + key) isExpire := !expireTime.Equal(PERMANENT) @@ -284,13 +290,29 @@ func (s *Store) internalGet(key string) *Response { // If key is a file return the file // If key is a directory reuturn an array of files func (s *Store) Get(key string) ([]byte, error) { + resps, err := s.RawGet(key) + + if err != nil { + return nil, err + } + + if len(resps) == 1 { + return json.Marshal(resps[0]) + } + + return json.Marshal(resps) +} + +func (s *Store) RawGet(key string) ([]*Response, error) { + // Update stats + s.BasicStats.Gets++ key = path.Clean("/" + key) nodes, keys, dirs, ok := s.Tree.list(key) if ok { - resps := make([]Response, len(nodes)) + resps := make([]*Response, len(nodes)) for i := 0; i < len(nodes); i++ { var TTL int64 @@ -298,7 +320,7 @@ func (s *Store) Get(key string) ([]byte, error) { isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) - resps[i] = Response{ + resps[i] = &Response{ Action: "GET", Index: s.Index, Key: path.Join(key, keys[i]), @@ -318,10 +340,8 @@ func (s *Store) Get(key string) ([]byte, error) { } } - if len(resps) == 1 { - return json.Marshal(resps[0]) - } - return json.Marshal(resps) + + return resps, nil } err := NotFoundError(key) @@ -331,9 +351,12 @@ func (s *Store) Get(key string) ([]byte, error) { // Delete the key func (s *Store) Delete(key string, index uint64) ([]byte, error) { + // Update stats + s.BasicStats.Deletes++ + key = path.Clean("/" + key) - //Update index + // Update index s.Index = index node, ok := s.Tree.get(key) @@ -381,6 +404,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) { // Set the value of the key to the value if the given prevValue is equal to the value of the key func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { + // Update stats + s.BasicStats.TestAndSets++ + resp := s.internalGet(key) if resp == nil { diff --git a/version.go b/version.go index b64a7814b..35e8fec6c 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,5 @@ package main var version = "v1" + +var releaseVersion = "etcd pre-0.1"