Merge pull request #17 from xiangli-cmu/master

merge get and list operation
This commit is contained in:
Xiang Li 2013-07-14 15:30:50 -07:00
commit 82d0ad007f
11 changed files with 225 additions and 114 deletions

View File

@ -205,19 +205,19 @@ We already have `/foo/foo=barbar`
We create another one `/foo/foo_dir/foo=barbarbar`
```sh
http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar
curl http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar
```
Let us list them next.
```sh
curl http://127.0.0.1:4001/v1/list/foo/
curl http://127.0.0.1:4001/v1/get/foo/
```
We should see the response as
We should see the response as an array of items
```json
{"Key":"foo","Value":"barbar","Type":"f"} {"Key":"foo_dir","Value":".","Type":"d"}
[{"action":"GET","key":"/foo/foo","value":"barbar","index":10},{"action":"GET","key":"/foo/foo_dir","dir":true,"index":10}]
```
which meas `foo=barbar` is a key-value pair under `/foo` and `foo_dir` is a directory.
@ -241,6 +241,29 @@ Let the join two more nodes to this cluster using the -C argument:
./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nod/node3
```
Get the machines in the cluster
```sh
curl http://127.0.0.1:4001/machines
```
We should see there are three nodes in the cluster
```
0.0.0.0:7001,0.0.0.0:7002,0.0.0.0:7003
```
Also try to get the current leader in the cluster
```
curl http://127.0.0.1:4001/leader
```
The first server we set up should be the leader, if it has not dead during these commands.
```
0.0.0.0:7001
```
Now we can do normal SET and GET operations on keys as we explored earlier.
```sh
@ -259,6 +282,16 @@ Let's kill the leader of the cluster and get the value from the other machine:
curl http://127.0.0.1:4002/v1/keys/foo
```
A new leader should have been elected.
```
curl http://127.0.0.1:4001/leader
```
```
0.0.0.0:7002 or 0.0.0.0:7003
```
You should be able to see this:
```json

View File

@ -135,19 +135,24 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
(*w).Write(newJsonError(101, err.Error()))
return
}
if _, ok := err.(store.NotFile); ok {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(102, err.Error()))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, "No Leader"))
return
} else {
body, ok := body.([]byte)
if !ok {
panic("wrong type")
}
if body == nil {
http.NotFound((*w), req)
} else {
body, ok := body.([]byte)
if !ok {
panic("wrong type")
}
(*w).WriteHeader(http.StatusOK)
(*w).Write(body)
}
@ -174,7 +179,8 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
var url string
if client {
url = scheme + raftTransporter.GetLeaderClientAddress() + path
clientAddr, _ := getClientAddr(raftServer.Leader())
url = scheme + clientAddr + path
} else {
url = scheme + raftServer.Leader() + path
}
@ -198,8 +204,40 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
// Handler to return the current leader name
func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
leader := raftServer.Leader()
if leader != "" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(raftServer.Leader()))
} else {
// not likely, but it may happen
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(301, ""))
}
}
// Handler to return all the known machines in the current cluster
func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
peers := raftServer.Peers()
// Add itself to the machine list first
// Since peer map does not contain the server itself
machines, _ := getClientAddr(raftServer.Name())
// Add all peers to the list and sepearte by comma
// We do not use json here since we accept machines list
// in the command line seperate by comma.
for peerName, _ := range peers {
if addr, ok := getClientAddr(peerName); ok {
machines = machines + "," + addr
}
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(raftServer.Leader()))
w.Write([]byte(machines))
}
// Get Handler
@ -235,37 +273,6 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
}
// List Handler
func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
prefix := req.URL.Path[len("/v1/list/"):]
debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix)
command := &ListCommand{}
command.Prefix = prefix
if body, err := command.Apply(raftServer); err != nil {
if _, ok := err.(store.NotFoundError); ok {
http.NotFound(w, req)
return
}
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(300, ""))
return
} else {
w.WriteHeader(http.StatusOK)
body, ok := body.([]byte)
if !ok {
panic("wrong type")
}
w.Write(body)
return
}
}
// Watch handler
func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/watch/"):]

View File

@ -64,21 +64,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
return etcdStore.Get(c.Key)
}
// List command
type ListCommand struct {
Prefix string `json:"prefix"`
}
// The name of the list command in the log
func (c *ListCommand) CommandName() string {
return "list"
}
// List all the keys have the given prefix path
func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) {
return etcdStore.List(c.Prefix)
}
// Delete command
type DeleteCommand struct {
Key string `json:"key"`
@ -120,7 +105,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
// JoinCommand
type JoinCommand struct {
Name string `json:"name"`
Name string `json:"name"`
Hostname string `json:"hostName"`
RaftPort int `json:"raftPort"`
ClientPort int `json:"clientPort"`
}
// The name of the join command in the log
@ -129,8 +117,9 @@ func (c *JoinCommand) CommandName() string {
}
// Join a server to the cluster
func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
err := server.AddPeer(c.Name)
// no result will be returned
return nil, err
func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
err := raftServer.AddPeer(c.Name)
addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
return []byte("join success"), err
}

View File

@ -12,6 +12,7 @@ func init() {
// command related errors
errors[100] = "Key Not Found"
errors[101] = "The given PrevValue is not equal to the value of the key"
errors[102] = "Not A File"
// Post form related errors
errors[200] = "Value is Required in POST form"
errors[201] = "PrevValue is Required in POST form"
@ -19,6 +20,7 @@ func init() {
errors[203] = "The given index in POST form is not a number"
// raft related errors
errors[300] = "Raft Internal Error"
errors[301] = "During Leader Election"
}
type jsonError struct {

53
etcd.go
View File

@ -33,9 +33,9 @@ var machinesFile string
var cluster []string
var address string
var hostname string
var clientPort int
var serverPort int
var raftPort int
var webPort int
var serverCertFile string
@ -58,9 +58,9 @@ func init() {
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
flag.StringVar(&hostname, "h", "0.0.0.0", "the hostname of the local machine")
flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
flag.IntVar(&serverPort, "s", 7001, "the port to communicate with servers")
flag.IntVar(&raftPort, "s", 7001, "the port to communicate with servers")
flag.IntVar(&webPort, "w", -1, "the port of web interface")
flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile")
@ -97,7 +97,7 @@ const (
// Timeout for internal raft http connection
// The original timeout for http is 45 seconds
// which is too long for our usage.
HTTPTIMEOUT = time.Second
HTTPTIMEOUT = 10 * time.Second
)
//------------------------------------------------------------------------------
@ -107,8 +107,8 @@ const (
//------------------------------------------------------------------------------
type Info struct {
Address string `json:"address"`
ServerPort int `json:"serverPort"`
Hostname string `json:"hostname"`
RaftPort int `json:"raftPort"`
ClientPort int `json:"clientPort"`
WebPort int `json:"webPort"`
@ -194,7 +194,7 @@ func main() {
func startRaft(securityType int) {
var err error
raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort)
// Create transporter for raft
raftTransporter = createTransporter(securityType)
@ -223,7 +223,7 @@ func startRaft(securityType int) {
if raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 1 && cluster[0] == "" {
if len(cluster) == 0 {
raftServer.StartLeader()
time.Sleep(time.Millisecond * 20)
@ -232,6 +232,9 @@ func startRaft(securityType int) {
for {
command := &JoinCommand{}
command.Name = raftServer.Name()
command.Hostname = hostname
command.RaftPort = raftPort
command.ClientPort = clientPort
_, err := raftServer.Do(command)
if err == nil {
break
@ -268,7 +271,7 @@ func startRaft(securityType int) {
// go server.Snapshot()
// start to response to raft requests
go startRaftTransport(info.ServerPort, securityType)
go startRaftTransport(info.RaftPort, securityType)
}
@ -338,11 +341,11 @@ func startRaftTransport(port int, st int) {
switch st {
case HTTP:
fmt.Printf("raft server [%s] listen on http port %v\n", address, port)
fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
case HTTPSANDVERIFY:
@ -354,7 +357,7 @@ func startRaftTransport(port int, st int) {
},
Addr: fmt.Sprintf(":%d", port),
}
fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
if err != nil {
@ -369,18 +372,18 @@ func startClientTransport(port int, st int) {
// external commands
http.HandleFunc("/"+version+"/keys/", Multiplexer)
http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
http.HandleFunc("/"+version+"/list/", ListHttpHandler)
http.HandleFunc("/"+version+"/testAndSet/", TestAndSetHttpHandler)
http.HandleFunc("/leader", LeaderHttpHandler)
http.HandleFunc("/machines", MachinesHttpHandler)
switch st {
case HTTP:
fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort)
fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
case HTTPSANDVERIFY:
@ -392,7 +395,7 @@ func startClientTransport(port int, st int) {
},
Addr: fmt.Sprintf(":%d", port),
}
fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
if err != nil {
@ -480,15 +483,15 @@ func getInfo(path string) *Info {
} else {
// Otherwise ask user for info and write it to file.
if address == "" {
if hostname == "" {
fatal("Please give the address of the local machine")
}
info.Address = address
info.Address = strings.TrimSpace(info.Address)
fmt.Println("address ", info.Address)
info.Hostname = hostname
info.Hostname = strings.TrimSpace(info.Hostname)
fmt.Println("address ", info.Hostname)
info.ServerPort = serverPort
info.RaftPort = raftPort
info.ClientPort = clientPort
info.WebPort = webPort
@ -537,6 +540,9 @@ func joinCluster(s *raft.Server, serverName string) error {
command := &JoinCommand{}
command.Name = s.Name()
command.Hostname = info.Hostname
command.RaftPort = info.RaftPort
command.ClientPort = info.ClientPort
json.NewEncoder(&b).Encode(command)
@ -561,7 +567,7 @@ func joinCluster(s *raft.Server, serverName string) error {
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address = resp.Header.Get("Location")
address := resp.Header.Get("Location")
debug("Leader is %s", address)
debug("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
@ -582,6 +588,5 @@ func registerCommands() {
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&ListCommand{})
raft.RegisterCommand(&TestAndSetCommand{})
}

30
machines.go Normal file
View File

@ -0,0 +1,30 @@
package main
import (
"fmt"
)
type machine struct {
hostname string
raftPort int
clientPort int
}
var machinesMap = map[string]machine{}
func addMachine(name string, hostname string, raftPort int, clientPort int) {
machinesMap[name] = machine{hostname, raftPort, clientPort}
}
func getClientAddr(name string) (string, bool) {
machine, ok := machinesMap[name]
if !ok {
return "", false
}
addr := fmt.Sprintf("%s:%v", machine.hostname, machine.clientPort)
return addr, true
}

View File

@ -75,7 +75,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] Get http://%v/client/ ", raftServer.Name())
w.WriteHeader(http.StatusOK)
client := address + ":" + strconv.Itoa(clientPort)
client := hostname + ":" + strconv.Itoa(clientPort)
w.Write([]byte(client))
}

View File

@ -16,4 +16,10 @@ type TestFail string
func (e TestFail) Error() string {
return string(e)
}
type Keyword string
func (e Keyword) Error() string {
return string(e)
}

7
store/keywords.go Normal file
View File

@ -0,0 +1,7 @@
package store
// keywords for internal useage
var keywords = map[string]bool{
"/acoounts": true,
"/ephemeralNodes": true,
}

View File

@ -64,11 +64,12 @@ type Node struct {
type Response struct {
Action string `json:"action"`
Key string `json:"key"`
Dir bool `json:"dir,omitempty"`
PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value,omitempty"`
// If the key existed before the action, this field should be true
// If the key did not exist before the action, this field should be false
// If the key did not exist before the action,
// this field should be set to true
NewKey bool `json:"newKey,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
@ -241,20 +242,9 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
s.addToResponseMap(index, &resp)
return msg, err
}
}
// Get the value of the key
func (s *Store) Get(key string) ([]byte, error) {
resp := s.internalGet(key)
if resp != nil {
return json.Marshal(resp)
} else {
err := NotFoundError(key)
return nil, err
}
}
// Get the value of the key and return the raw response
func (s *Store) internalGet(key string) *Response {
@ -291,21 +281,51 @@ func (s *Store) internalGet(key string) *Response {
}
// List all the item in the prefix
func (s *Store) List(prefix string) ([]byte, error) {
// Get all the items under key
// If key is a file return the file
// If key is a directory reuturn an array of files
func (s *Store) Get(key string) ([]byte, error) {
nodes, keys, dirs, ok := s.Tree.list(prefix)
key = path.Clean("/" + key)
var ln []ListNode
nodes, keys, dirs, ok := s.Tree.list(key)
if ok {
ln = make([]ListNode, len(nodes))
resps := make([]Response, len(nodes))
for i := 0; i < len(nodes); i++ {
ln[i] = ListNode{keys[i], nodes[i].Value, dirs[i]}
var TTL int64
var isExpire bool = false
isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
resps[i] = Response{
Action: "GET",
Index: s.Index,
Key: path.Join(key, keys[i]),
}
if !dirs[i] {
resps[i].Value = nodes[i].Value
} else {
resps[i].Dir = true
}
// Update ttl
if isExpire {
TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
resps[i].Expiration = &nodes[i].ExpireTime
resps[i].TTL = TTL
}
}
if len(resps) == 1 {
return json.Marshal(resps[0])
}
return json.Marshal(resps)
}
err := NotFoundError(prefix)
err := NotFoundError(key)
return nil, err
}
@ -332,12 +352,13 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
s.Tree.delete(key)
} else {
resp.Expiration = &node.ExpireTime
// Kill the expire go routine
node.update <- PERMANENT
s.Tree.delete(key)
}
msg, err := json.Marshal(resp)

View File

@ -104,6 +104,9 @@ func (t *tree) set(key string, value Node) bool {
nodeMap[nodesName[i]] = tn
} else {
if tn.Dir {
return false
}
// we change the value of a old Treenode
tn.InternalNode = value
}
@ -140,32 +143,40 @@ func (t *tree) get(key string) (Node, bool) {
tn, ok := t.internalGet(key)
if ok {
if tn.Dir {
return emptyNode, false
}
return tn.InternalNode, ok
} else {
return emptyNode, ok
}
}
// return the nodes information under the directory
func (t *tree) list(directory string) ([]Node, []string, []string, bool) {
// get the internalNode of the key
func (t *tree) list(directory string) ([]Node, []string, []bool, bool) {
treeNode, ok := t.internalGet(directory)
if !ok {
return nil, nil, nil, ok
} else {
if !treeNode.Dir {
nodes := make([]Node, 1)
nodes[0] = treeNode.InternalNode
return nodes, make([]string, 1), make([]bool, 1), true
}
length := len(treeNode.NodeMap)
nodes := make([]Node, length)
keys := make([]string, length)
dirs := make([]string, length)
dirs := make([]bool, length)
i := 0
for key, node := range treeNode.NodeMap {
nodes[i] = node.InternalNode
keys[i] = key
if node.Dir {
dirs[i] = "d"
dirs[i] = true
} else {
dirs[i] = "f"
dirs[i] = false
}
i++
}