Merge pull request #15 from xiangli-cmu/master

error handling
This commit is contained in:
Xiang Li 2013-07-12 16:00:18 -07:00
commit dac624bd05
7 changed files with 225 additions and 63 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"github.com/coreos/etcd/store"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -40,6 +41,14 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
command.Key = key command.Key = key
command.Value = req.FormValue("value") command.Value = req.FormValue("value")
if len(command.Value) == 0 {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(200, "Set"))
return
}
strDuration := req.FormValue("ttl") strDuration := req.FormValue("ttl")
var err error var err error
@ -47,8 +56,10 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
command.ExpireTime, err = durationToExpireTime(strDuration) command.ExpireTime, err = durationToExpireTime(strDuration)
if err != nil { if err != nil {
warn("The given duration is not a number: %v", err)
(*w).WriteHeader(http.StatusInternalServerError) (*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(202, "Set"))
} }
dispatch(command, w, req, true) dispatch(command, w, req, true)
@ -66,6 +77,22 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
command.PrevValue = req.FormValue("prevValue") command.PrevValue = req.FormValue("prevValue")
command.Value = req.FormValue("value") command.Value = req.FormValue("value")
if len(command.Value) == 0 {
w.WriteHeader(http.StatusBadRequest)
w.Write(newJsonError(200, "TestAndSet"))
return
}
if len(command.PrevValue) == 0 {
w.WriteHeader(http.StatusBadRequest)
w.Write(newJsonError(201, "TestAndSet"))
return
}
strDuration := req.FormValue("ttl") strDuration := req.FormValue("ttl")
var err error var err error
@ -73,8 +100,9 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
command.ExpireTime, err = durationToExpireTime(strDuration) command.ExpireTime, err = durationToExpireTime(strDuration)
if err != nil { if err != nil {
warn("The given duration is not a number: %v", err) w.WriteHeader(http.StatusBadRequest)
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(202, "TestAndSet"))
} }
dispatch(command, &w, req, true) dispatch(command, &w, req, true)
@ -97,28 +125,39 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) { func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
if raftServer.State() == "leader" { if raftServer.State() == "leader" {
if body, err := raftServer.Do(c); err != nil { if body, err := raftServer.Do(c); err != nil {
warn("Commit failed %v", err) if _, ok := err.(store.NotFoundError); ok {
(*w).WriteHeader(http.StatusInternalServerError) http.NotFound((*w), req)
return
} else {
(*w).WriteHeader(http.StatusOK)
if body == nil {
return return
} }
if _, ok := err.(store.TestFail); ok {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(101, err.Error()))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, "No Leader"))
return
} else {
body, ok := body.([]byte) body, ok := body.([]byte)
if !ok { if !ok {
panic("wrong type") panic("wrong type")
} }
if body == nil {
http.NotFound((*w), req)
} else {
(*w).WriteHeader(http.StatusOK)
(*w).Write(body) (*w).Write(body)
}
return return
} }
} else { } else {
// current no leader // current no leader
if raftServer.Leader() == "" { if raftServer.Leader() == "" {
(*w).WriteHeader(http.StatusInternalServerError) (*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
return return
} }
@ -145,9 +184,8 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
http.Redirect(*w, req, url, http.StatusTemporaryRedirect) http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
return return
} }
(*w).WriteHeader(http.StatusInternalServerError) (*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
return return
} }
@ -174,18 +212,24 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
command.Key = key command.Key = key
if body, err := command.Apply(raftServer); err != nil { if body, err := command.Apply(raftServer); err != nil {
warn("raftd: Unable to write file: %v", err)
if _, ok := err.(store.NotFoundError); ok {
http.NotFound((*w), req)
return
}
(*w).WriteHeader(http.StatusInternalServerError) (*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
return return
} else { } else {
(*w).WriteHeader(http.StatusOK)
body, ok := body.([]byte) body, ok := body.([]byte)
if !ok { if !ok {
panic("wrong type") panic("wrong type")
} }
(*w).WriteHeader(http.StatusOK)
(*w).Write(body) (*w).Write(body)
return return
} }
@ -201,8 +245,12 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Prefix = prefix command.Prefix = prefix
if body, err := command.Apply(raftServer); err != nil { if body, err := command.Apply(raftServer); err != nil {
warn("Unable to write file: %v", err) if _, ok := err.(store.NotFoundError); ok {
http.NotFound(w, req)
return
}
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(300, ""))
return return
} else { } else {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@ -238,6 +286,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
sinceIndex, err := strconv.ParseUint(string(content), 10, 64) sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
w.Write(newJsonError(203, "Watch From Index"))
} }
command.SinceIndex = sinceIndex command.SinceIndex = sinceIndex

View File

@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
//"errors"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
"time" "time"
@ -60,8 +61,7 @@ func (c *GetCommand) CommandName() string {
// Get the value of key // Get the value of key
func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
res := etcdStore.Get(c.Key) return etcdStore.Get(c.Key)
return json.Marshal(res)
} }
// List command // List command

37
error.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"encoding/json"
)
var errors map[int]string
func init() {
errors = make(map[int]string)
// command related errors
errors[100] = "Key Not Found"
errors[101] = "The given PrevValue is not equal to the value of the key"
// Post form related errors
errors[200] = "Value is Required in POST form"
errors[201] = "PrevValue is Required in POST form"
errors[202] = "The given TTL in POST form is not a number"
errors[203] = "The given index in POST form is not a number"
// raft related errors
errors[300] = "Raft Internal Error"
}
type jsonError struct {
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
Cause string `json:"cause,omitempty"`
}
func newJsonError(errorCode int, cause string) []byte {
b, _ := json.Marshal(jsonError{
ErrorCode: errorCode,
Message: errors[errorCode],
Cause: cause,
})
return b
}

16
etcd.go
View File

@ -29,6 +29,8 @@ import (
var verbose bool var verbose bool
var machines string var machines string
var machinesFile string
var cluster []string var cluster []string
var address string var address string
@ -53,7 +55,8 @@ var maxSize int
func init() { func init() {
flag.BoolVar(&verbose, "v", false, "verbose logging") flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in cluster, sepearate by comma") flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine") flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients") flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
@ -90,6 +93,7 @@ const (
const ( const (
ELECTIONTIMTOUT = 200 * time.Millisecond ELECTIONTIMTOUT = 200 * time.Millisecond
HEARTBEATTIMEOUT = 50 * time.Millisecond HEARTBEATTIMEOUT = 50 * time.Millisecond
// Timeout for internal raft http connection // Timeout for internal raft http connection
// The original timeout for http is 45 seconds // The original timeout for http is 45 seconds
// which is too long for our usage. // which is too long for our usage.
@ -141,14 +145,22 @@ var info *Info
func main() { func main() {
flag.Parse() flag.Parse()
if machines != "" {
cluster = strings.Split(machines, ",") cluster = strings.Split(machines, ",")
} else if machinesFile != "" {
b, err := ioutil.ReadFile(machinesFile)
if err != nil {
fatal("Unable to read the given machines file: %s", err)
}
cluster = strings.Split(string(b), ",")
}
// Setup commands. // Setup commands.
registerCommands() registerCommands()
// Read server info from file or grab it from user. // Read server info from file or grab it from user.
if err := os.MkdirAll(dirPath, 0744); err != nil { if err := os.MkdirAll(dirPath, 0744); err != nil {
fatal("Unable to create path: %v", err) fatal("Unable to create path: %s", err)
} }
info = getInfo(dirPath) info = getInfo(dirPath)

19
store/error.go Normal file
View File

@ -0,0 +1,19 @@
package store
type NotFoundError string
func (e NotFoundError) Error() string {
return string(e)
}
type NotFile string
func (e NotFile) Error() string {
return string(e)
}
type TestFail string
func (e TestFail) Error() string {
return string(e)
}

View File

@ -64,17 +64,17 @@ type Node struct {
type Response struct { type Response struct {
Action string `json:"action"` Action string `json:"action"`
Key string `json:"key"` Key string `json:"key"`
PrevValue string `json:"prevValue"` PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value"` Value string `json:"value,omitempty"`
// If the key existed before the action, this field should be true // If the key existed before the action, this field should be true
// If the key did not exist before the action, this field should be false // If the key did not exist before the action, this field should be false
Exist bool `json:"exist"` NewKey bool `json:"newKey,omitempty"`
Expiration time.Time `json:"expiration"` Expiration *time.Time `json:"expiration,omitempty"`
// Time to live in second // Time to live in second
TTL int64 `json:"ttl"` TTL int64 `json:"ttl,omitempty"`
// The command index of the raft machine when the command is executed // The command index of the raft machine when the command is executed
Index uint64 `json:"index"` Index uint64 `json:"index"`
@ -142,6 +142,14 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
isExpire := !expireTime.Equal(PERMANENT) isExpire := !expireTime.Equal(PERMANENT)
// base response
resp := Response{
Action: "SET",
Key: key,
Value: value,
Index: index,
}
// When the slow follower receive the set command // When the slow follower receive the set command
// the key may be expired, we should not add the node // the key may be expired, we should not add the node
// also if the node exist, we need to delete the node // also if the node exist, we need to delete the node
@ -154,9 +162,8 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
// Update ttl // Update ttl
if isExpire { if isExpire {
TTL = int64(expireTime.Sub(time.Now()) / time.Second) TTL = int64(expireTime.Sub(time.Now()) / time.Second)
} else { resp.Expiration = &expireTime
// For permanent value, we set ttl to -1 resp.TTL = TTL
TTL = -1
} }
// Get the node // Get the node
@ -186,7 +193,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
// Update the information of the node // Update the information of the node
s.Tree.set(key, Node{value, expireTime, node.update}) s.Tree.set(key, Node{value, expireTime, node.update})
resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index} resp.PrevValue = node.Value
s.watcher.notify(resp) s.watcher.notify(resp)
@ -207,13 +214,18 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
update := make(chan time.Time) update := make(chan time.Time)
s.Tree.set(key, Node{value, expireTime, update}) ok := s.Tree.set(key, Node{value, expireTime, update})
if !ok {
err := NotFile(key)
return nil, err
}
if isExpire { if isExpire {
go s.monitorExpiration(key, update, expireTime) go s.monitorExpiration(key, update, expireTime)
} }
resp := Response{"SET", key, "", value, false, expireTime, TTL, index} resp.NewKey = true
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
@ -232,7 +244,19 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
} }
// Get the value of the key // Get the value of the key
func (s *Store) Get(key string) Response { func (s *Store) Get(key string) ([]byte, error) {
resp := s.internalGet(key)
if resp != nil {
return json.Marshal(resp)
} else {
err := NotFoundError(key)
return nil, err
}
}
// Get the value of the key and return the raw response
func (s *Store) internalGet(key string) *Response {
key = path.Clean("/" + key) key = path.Clean("/" + key)
@ -244,21 +268,29 @@ func (s *Store) Get(key string) Response {
isExpire = !node.ExpireTime.Equal(PERMANENT) isExpire = !node.ExpireTime.Equal(PERMANENT)
resp := &Response{
Action: "GET",
Key: key,
Value: node.Value,
Index: s.Index,
}
// Update ttl // Update ttl
if isExpire { if isExpire {
TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
} else { resp.Expiration = &node.ExpireTime
TTL = -1 resp.TTL = TTL
} }
return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index} return resp
} else { } else {
// we do not found the key // we do not found the key
return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index} return nil
} }
} }
// List all the item in the prefix // List all the item in the prefix
func (s *Store) List(prefix string) ([]byte, error) { func (s *Store) List(prefix string) ([]byte, error) {
@ -273,7 +305,8 @@ func (s *Store) List(prefix string) ([]byte, error) {
} }
} }
return json.Marshal(ln) err := NotFoundError(prefix)
return nil, err
} }
// Delete the key // Delete the key
@ -288,20 +321,25 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
if ok { if ok {
resp := Response{
Action: "DELETE",
Key: key,
PrevValue: node.Value,
Index: index,
}
if node.ExpireTime.Equal(PERMANENT) { if node.ExpireTime.Equal(PERMANENT) {
s.Tree.delete(key) s.Tree.delete(key)
} else { } else {
resp.Expiration = &node.ExpireTime
// Kill the expire go routine // Kill the expire go routine
node.update <- PERMANENT node.update <- PERMANENT
s.Tree.delete(key) s.Tree.delete(key)
} }
resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)
s.watcher.notify(resp) s.watcher.notify(resp)
@ -317,28 +355,29 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
return msg, err return msg, err
} else { } else {
err := NotFoundError(key)
resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index} return nil, err
s.addToResponseMap(index, &resp)
return json.Marshal(resp)
} }
} }
// Set the value of the key to the value if the given prevValue is equal to the value of the key // Set the value of the key to the value if the given prevValue is equal to the value of the key
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := s.Get(key) resp := s.internalGet(key)
if resp.PrevValue == prevValue { if resp == nil {
err := NotFoundError(key)
return nil, err
}
if resp.Value == prevValue {
// If test success, do set // If test success, do set
return s.Set(key, value, expireTime, index) return s.Set(key, value, expireTime, index)
} else { } else {
// If fails, return the result of get which contains the current // If fails, return err
// status of the key-value pair err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue))
return json.Marshal(resp) return nil, err
} }
} }
@ -371,7 +410,13 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
s.Tree.delete(key) s.Tree.delete(key)
resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index} resp := Response{
Action: "DELETE",
Key: key,
PrevValue: node.Value,
Expiration: &node.ExpireTime,
Index: s.Index,
}
msg, err := json.Marshal(resp) msg, err := json.Marshal(resp)

View File

@ -102,7 +102,7 @@ func (t transporter) GetLeaderClientAddress() string {
// Send server side POST request // Send server side POST request
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
resp, err := t.client.Post(t.scheme + path, "application/json", body) resp, err := t.client.Post(t.scheme+path, "application/json", body)
return resp, err return resp, err
} }