diff --git a/README.md b/README.md index 7fead80bd..b94e13190 100644 --- a/README.md +++ b/README.md @@ -205,19 +205,19 @@ We already have `/foo/foo=barbar` We create another one `/foo/foo_dir/foo=barbarbar` ```sh -http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar +curl http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar ``` Let us list them next. ```sh -curl http://127.0.0.1:4001/v1/list/foo/ +curl http://127.0.0.1:4001/v1/get/foo/ ``` -We should see the response as +We should see the response as an array of items ```json -{"Key":"foo","Value":"barbar","Type":"f"} {"Key":"foo_dir","Value":".","Type":"d"} +[{"action":"GET","key":"/foo/foo","value":"barbar","index":10},{"action":"GET","key":"/foo/foo_dir","dir":true,"index":10}] ``` which meas `foo=barbar` is a key-value pair under `/foo` and `foo_dir` is a directory. @@ -241,6 +241,29 @@ Let the join two more nodes to this cluster using the -C argument: ./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nod/node3 ``` +Get the machines in the cluster + +```sh +curl http://127.0.0.1:4001/machines +``` + +We should see there are three nodes in the cluster + +``` +0.0.0.0:7001,0.0.0.0:7002,0.0.0.0:7003 +``` + +Also try to get the current leader in the cluster + +``` +curl http://127.0.0.1:4001/leader +``` +The first server we set up should be the leader, if it has not dead during these commands. + +``` +0.0.0.0:7001 +``` + Now we can do normal SET and GET operations on keys as we explored earlier. ```sh @@ -259,6 +282,16 @@ Let's kill the leader of the cluster and get the value from the other machine: curl http://127.0.0.1:4002/v1/keys/foo ``` +A new leader should have been elected. + +``` +curl http://127.0.0.1:4001/leader +``` + +``` +0.0.0.0:7002 or 0.0.0.0:7003 +``` + You should be able to see this: ```json diff --git a/client_handlers.go b/client_handlers.go index 07e360db7..6418307ea 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -135,19 +135,24 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) (*w).Write(newJsonError(101, err.Error())) return } + + if _, ok := err.(store.NotFile); ok { + (*w).WriteHeader(http.StatusBadRequest) + (*w).Write(newJsonError(102, err.Error())) + return + } (*w).WriteHeader(http.StatusInternalServerError) (*w).Write(newJsonError(300, "No Leader")) return } else { - body, ok := body.([]byte) - if !ok { - panic("wrong type") - } - if body == nil { http.NotFound((*w), req) } else { + body, ok := body.([]byte) + if !ok { + panic("wrong type") + } (*w).WriteHeader(http.StatusOK) (*w).Write(body) } @@ -174,7 +179,8 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) var url string if client { - url = scheme + raftTransporter.GetLeaderClientAddress() + path + clientAddr, _ := getClientAddr(raftServer.Leader()) + url = scheme + clientAddr + path } else { url = scheme + raftServer.Leader() + path } @@ -198,8 +204,40 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) // Handler to return the current leader name func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) { + leader := raftServer.Leader() + + if leader != "" { + w.WriteHeader(http.StatusOK) + w.Write([]byte(raftServer.Leader())) + } else { + + // not likely, but it may happen + w.WriteHeader(http.StatusInternalServerError) + w.Write(newJsonError(301, "")) + } +} + +// Handler to return all the known machines in the current cluster +func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { + peers := raftServer.Peers() + + // Add itself to the machine list first + // Since peer map does not contain the server itself + machines, _ := getClientAddr(raftServer.Name()) + + // Add all peers to the list and sepearte by comma + // We do not use json here since we accept machines list + // in the command line seperate by comma. + + for peerName, _ := range peers { + if addr, ok := getClientAddr(peerName); ok { + machines = machines + "," + addr + } + } + w.WriteHeader(http.StatusOK) - w.Write([]byte(raftServer.Leader())) + w.Write([]byte(machines)) + } // Get Handler @@ -235,37 +273,6 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { } -// List Handler -func ListHttpHandler(w http.ResponseWriter, req *http.Request) { - prefix := req.URL.Path[len("/v1/list/"):] - - debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix) - - command := &ListCommand{} - command.Prefix = prefix - - if body, err := command.Apply(raftServer); err != nil { - if _, ok := err.(store.NotFoundError); ok { - http.NotFound(w, req) - return - } - w.WriteHeader(http.StatusInternalServerError) - w.Write(newJsonError(300, "")) - return - } else { - w.WriteHeader(http.StatusOK) - - body, ok := body.([]byte) - if !ok { - panic("wrong type") - } - - w.Write(body) - return - } - -} - // Watch handler func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/watch/"):] diff --git a/command.go b/command.go index 865b308e3..4b92bab16 100644 --- a/command.go +++ b/command.go @@ -64,21 +64,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { return etcdStore.Get(c.Key) } -// List command -type ListCommand struct { - Prefix string `json:"prefix"` -} - -// The name of the list command in the log -func (c *ListCommand) CommandName() string { - return "list" -} - -// List all the keys have the given prefix path -func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) { - return etcdStore.List(c.Prefix) -} - // Delete command type DeleteCommand struct { Key string `json:"key"` @@ -120,7 +105,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // JoinCommand type JoinCommand struct { - Name string `json:"name"` + Name string `json:"name"` + Hostname string `json:"hostName"` + RaftPort int `json:"raftPort"` + ClientPort int `json:"clientPort"` } // The name of the join command in the log @@ -129,8 +117,9 @@ func (c *JoinCommand) CommandName() string { } // Join a server to the cluster -func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { - err := server.AddPeer(c.Name) - // no result will be returned - return nil, err +func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { + err := raftServer.AddPeer(c.Name) + addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort) + + return []byte("join success"), err } diff --git a/error.go b/error.go index 4de0c7494..666187669 100644 --- a/error.go +++ b/error.go @@ -12,6 +12,7 @@ func init() { // command related errors errors[100] = "Key Not Found" errors[101] = "The given PrevValue is not equal to the value of the key" + errors[102] = "Not A File" // Post form related errors errors[200] = "Value is Required in POST form" errors[201] = "PrevValue is Required in POST form" @@ -19,6 +20,7 @@ func init() { errors[203] = "The given index in POST form is not a number" // raft related errors errors[300] = "Raft Internal Error" + errors[301] = "During Leader Election" } type jsonError struct { diff --git a/etcd.go b/etcd.go index 045f8c995..3e3ae4a51 100644 --- a/etcd.go +++ b/etcd.go @@ -33,9 +33,9 @@ var machinesFile string var cluster []string -var address string +var hostname string var clientPort int -var serverPort int +var raftPort int var webPort int var serverCertFile string @@ -58,9 +58,9 @@ func init() { flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") - flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine") + flag.StringVar(&hostname, "h", "0.0.0.0", "the hostname of the local machine") flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients") - flag.IntVar(&serverPort, "s", 7001, "the port to communicate with servers") + flag.IntVar(&raftPort, "s", 7001, "the port to communicate with servers") flag.IntVar(&webPort, "w", -1, "the port of web interface") flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile") @@ -97,7 +97,7 @@ const ( // Timeout for internal raft http connection // The original timeout for http is 45 seconds // which is too long for our usage. - HTTPTIMEOUT = time.Second + HTTPTIMEOUT = 10 * time.Second ) //------------------------------------------------------------------------------ @@ -107,8 +107,8 @@ const ( //------------------------------------------------------------------------------ type Info struct { - Address string `json:"address"` - ServerPort int `json:"serverPort"` + Hostname string `json:"hostname"` + RaftPort int `json:"raftPort"` ClientPort int `json:"clientPort"` WebPort int `json:"webPort"` @@ -194,7 +194,7 @@ func main() { func startRaft(securityType int) { var err error - raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort) + raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort) // Create transporter for raft raftTransporter = createTransporter(securityType) @@ -223,7 +223,7 @@ func startRaft(securityType int) { if raftServer.IsLogEmpty() { // start as a leader in a new cluster - if len(cluster) == 1 && cluster[0] == "" { + if len(cluster) == 0 { raftServer.StartLeader() time.Sleep(time.Millisecond * 20) @@ -232,6 +232,9 @@ func startRaft(securityType int) { for { command := &JoinCommand{} command.Name = raftServer.Name() + command.Hostname = hostname + command.RaftPort = raftPort + command.ClientPort = clientPort _, err := raftServer.Do(command) if err == nil { break @@ -268,7 +271,7 @@ func startRaft(securityType int) { // go server.Snapshot() // start to response to raft requests - go startRaftTransport(info.ServerPort, securityType) + go startRaftTransport(info.RaftPort, securityType) } @@ -338,11 +341,11 @@ func startRaftTransport(port int, st int) { switch st { case HTTP: - fmt.Printf("raft server [%s] listen on http port %v\n", address, port) + fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) case HTTPS: - fmt.Printf("raft server [%s] listen on https port %v\n", address, port) + fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port) log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil)) case HTTPSANDVERIFY: @@ -354,7 +357,7 @@ func startRaftTransport(port int, st int) { }, Addr: fmt.Sprintf(":%d", port), } - fmt.Printf("raft server [%s] listen on https port %v\n", address, port) + fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port) err := server.ListenAndServeTLS(serverCertFile, serverKeyFile) if err != nil { @@ -369,18 +372,18 @@ func startClientTransport(port int, st int) { // external commands http.HandleFunc("/"+version+"/keys/", Multiplexer) http.HandleFunc("/"+version+"/watch/", WatchHttpHandler) - http.HandleFunc("/"+version+"/list/", ListHttpHandler) http.HandleFunc("/"+version+"/testAndSet/", TestAndSetHttpHandler) http.HandleFunc("/leader", LeaderHttpHandler) + http.HandleFunc("/machines", MachinesHttpHandler) switch st { case HTTP: - fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort) + fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) case HTTPS: - fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort) + fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort) http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil) case HTTPSANDVERIFY: @@ -392,7 +395,7 @@ func startClientTransport(port int, st int) { }, Addr: fmt.Sprintf(":%d", port), } - fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort) + fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort) err := server.ListenAndServeTLS(clientCertFile, clientKeyFile) if err != nil { @@ -480,15 +483,15 @@ func getInfo(path string) *Info { } else { // Otherwise ask user for info and write it to file. - if address == "" { + if hostname == "" { fatal("Please give the address of the local machine") } - info.Address = address - info.Address = strings.TrimSpace(info.Address) - fmt.Println("address ", info.Address) + info.Hostname = hostname + info.Hostname = strings.TrimSpace(info.Hostname) + fmt.Println("address ", info.Hostname) - info.ServerPort = serverPort + info.RaftPort = raftPort info.ClientPort = clientPort info.WebPort = webPort @@ -537,6 +540,9 @@ func joinCluster(s *raft.Server, serverName string) error { command := &JoinCommand{} command.Name = s.Name() + command.Hostname = info.Hostname + command.RaftPort = info.RaftPort + command.ClientPort = info.ClientPort json.NewEncoder(&b).Encode(command) @@ -561,7 +567,7 @@ func joinCluster(s *raft.Server, serverName string) error { return nil } if resp.StatusCode == http.StatusTemporaryRedirect { - address = resp.Header.Get("Location") + address := resp.Header.Get("Location") debug("Leader is %s", address) debug("Send Join Request to %s", address) json.NewEncoder(&b).Encode(command) @@ -582,6 +588,5 @@ func registerCommands() { raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) raft.RegisterCommand(&WatchCommand{}) - raft.RegisterCommand(&ListCommand{}) raft.RegisterCommand(&TestAndSetCommand{}) } diff --git a/machines.go b/machines.go new file mode 100644 index 000000000..1a368ee9f --- /dev/null +++ b/machines.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" +) + +type machine struct { + hostname string + raftPort int + clientPort int +} + +var machinesMap = map[string]machine{} + +func addMachine(name string, hostname string, raftPort int, clientPort int) { + + machinesMap[name] = machine{hostname, raftPort, clientPort} + +} + +func getClientAddr(name string) (string, bool) { + machine, ok := machinesMap[name] + if !ok { + return "", false + } + + addr := fmt.Sprintf("%s:%v", machine.hostname, machine.clientPort) + + return addr, true +} diff --git a/raft_handlers.go b/raft_handlers.go index 523c0193a..2a79dece4 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -75,7 +75,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { debug("[recv] Get http://%v/client/ ", raftServer.Name()) w.WriteHeader(http.StatusOK) - client := address + ":" + strconv.Itoa(clientPort) + client := hostname + ":" + strconv.Itoa(clientPort) w.Write([]byte(client)) } diff --git a/store/error.go b/store/error.go index 3f13f83a5..c350f64ba 100644 --- a/store/error.go +++ b/store/error.go @@ -16,4 +16,10 @@ type TestFail string func (e TestFail) Error() string { return string(e) +} + +type Keyword string + +func (e Keyword) Error() string { + return string(e) } \ No newline at end of file diff --git a/store/keywords.go b/store/keywords.go new file mode 100644 index 000000000..7861fba5e --- /dev/null +++ b/store/keywords.go @@ -0,0 +1,7 @@ +package store + +// keywords for internal useage +var keywords = map[string]bool{ + "/acoounts": true, + "/ephemeralNodes": true, +} \ No newline at end of file diff --git a/store/store.go b/store/store.go index 5289208fe..fe27b0b23 100644 --- a/store/store.go +++ b/store/store.go @@ -64,11 +64,12 @@ type Node struct { type Response struct { Action string `json:"action"` Key string `json:"key"` + Dir bool `json:"dir,omitempty"` PrevValue string `json:"prevValue,omitempty"` Value string `json:"value,omitempty"` - // If the key existed before the action, this field should be true - // If the key did not exist before the action, this field should be false + // If the key did not exist before the action, + // this field should be set to true NewKey bool `json:"newKey,omitempty"` Expiration *time.Time `json:"expiration,omitempty"` @@ -241,20 +242,9 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64 s.addToResponseMap(index, &resp) return msg, err } + } -// Get the value of the key -func (s *Store) Get(key string) ([]byte, error) { - resp := s.internalGet(key) - - if resp != nil { - return json.Marshal(resp) - } else { - err := NotFoundError(key) - return nil, err - } -} - // Get the value of the key and return the raw response func (s *Store) internalGet(key string) *Response { @@ -291,21 +281,51 @@ func (s *Store) internalGet(key string) *Response { } -// List all the item in the prefix -func (s *Store) List(prefix string) ([]byte, error) { +// Get all the items under key +// If key is a file return the file +// If key is a directory reuturn an array of files +func (s *Store) Get(key string) ([]byte, error) { - nodes, keys, dirs, ok := s.Tree.list(prefix) + key = path.Clean("/" + key) - var ln []ListNode + nodes, keys, dirs, ok := s.Tree.list(key) if ok { - ln = make([]ListNode, len(nodes)) + resps := make([]Response, len(nodes)) for i := 0; i < len(nodes); i++ { - ln[i] = ListNode{keys[i], nodes[i].Value, dirs[i]} + + var TTL int64 + var isExpire bool = false + + isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) + + resps[i] = Response{ + Action: "GET", + Index: s.Index, + Key: path.Join(key, keys[i]), + } + + if !dirs[i] { + resps[i].Value = nodes[i].Value + } else { + resps[i].Dir = true + } + + // Update ttl + if isExpire { + TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second) + resps[i].Expiration = &nodes[i].ExpireTime + resps[i].TTL = TTL + } + } + if len(resps) == 1 { + return json.Marshal(resps[0]) + } + return json.Marshal(resps) } - err := NotFoundError(prefix) + err := NotFoundError(key) return nil, err } @@ -332,12 +352,13 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) { s.Tree.delete(key) + } else { resp.Expiration = &node.ExpireTime // Kill the expire go routine node.update <- PERMANENT s.Tree.delete(key) - + } msg, err := json.Marshal(resp) diff --git a/store/tree.go b/store/tree.go index 4ff77eef2..5a3a8b0e0 100644 --- a/store/tree.go +++ b/store/tree.go @@ -104,6 +104,9 @@ func (t *tree) set(key string, value Node) bool { nodeMap[nodesName[i]] = tn } else { + if tn.Dir { + return false + } // we change the value of a old Treenode tn.InternalNode = value } @@ -140,32 +143,40 @@ func (t *tree) get(key string) (Node, bool) { tn, ok := t.internalGet(key) if ok { + if tn.Dir { + return emptyNode, false + } return tn.InternalNode, ok } else { return emptyNode, ok } } -// return the nodes information under the directory -func (t *tree) list(directory string) ([]Node, []string, []string, bool) { +// get the internalNode of the key +func (t *tree) list(directory string) ([]Node, []string, []bool, bool) { treeNode, ok := t.internalGet(directory) if !ok { return nil, nil, nil, ok } else { + if !treeNode.Dir { + nodes := make([]Node, 1) + nodes[0] = treeNode.InternalNode + return nodes, make([]string, 1), make([]bool, 1), true + } length := len(treeNode.NodeMap) nodes := make([]Node, length) keys := make([]string, length) - dirs := make([]string, length) + dirs := make([]bool, length) i := 0 for key, node := range treeNode.NodeMap { nodes[i] = node.InternalNode keys[i] = key if node.Dir { - dirs[i] = "d" + dirs[i] = true } else { - dirs[i] = "f" + dirs[i] = false } i++ }