add testAndSet Command

This commit is contained in:
Xiang Li 2013-07-08 11:00:10 -07:00
parent e2f0420862
commit dc3844a1f8
4 changed files with 126 additions and 76 deletions

View File

@ -36,9 +36,22 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
}
// Get the path for http request
func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
// TestAndSet command
type TestAndSetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
PrevValue string `json: prevValue`
ExpireTime time.Time `json:"expireTime"`
}
// The name of the command in the log
func (c *TestAndSetCommand) CommandName() string {
return "testAndSet"
}
// Set the value of key to value
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
}
// Get command
@ -57,10 +70,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
return json.Marshal(res)
}
func (c *GetCommand) GeneratePath() string {
return "get/" + c.Key
}
// List command
type ListCommand struct {
Prefix string `json:"prefix"`

View File

@ -326,6 +326,7 @@ func startClientTransport(port int, st int) {
http.HandleFunc("/v1/keys/", Multiplexer)
http.HandleFunc("/v1/watch/", WatchHttpHandler)
http.HandleFunc("/v1/list/", ListHttpHandler)
http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)
switch st {

View File

@ -134,6 +134,35 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
}
func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/testAndSet/"):]
debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)
command := &TestAndSetCommand{}
command.Key = key
command.PrevValue = req.FormValue("prevValue")
command.Value = req.FormValue("value")
strDuration := req.FormValue("ttl")
if strDuration != "" {
duration, err := strconv.Atoi(strDuration)
if err != nil {
warn("raftd: Bad duration: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
} else {
command.ExpireTime = time.Unix(0, 0)
}
excute(command, &w, req)
}
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]

View File

@ -212,75 +212,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
}
}
// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())
for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {
s.Tree.delete(key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
msg, err := json.Marshal(resp)
notify(resp)
// notify the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
return
}
case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}
func updateMap(index uint64, resp *Response) {
if s.ResponseMaxSize == 0 {
return
}
strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp
// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}
// get the value of the key
func Get(key string) Response {
key = "/" + key
@ -375,6 +306,86 @@ func Delete(key string, index uint64) ([]byte, error) {
}
}
// set the value of the key to the value if the given prevValue is equal to the value of the key
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := Get(key)
if resp.PrevValue == prevValue {
return Set(key, value, expireTime, index)
} else {
return json.Marshal(resp)
}
}
// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())
for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {
s.Tree.delete(key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
msg, err := json.Marshal(resp)
notify(resp)
// notify the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
return
}
case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}
func updateMap(index uint64, resp *Response) {
if s.ResponseMaxSize == 0 {
return
}
strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp
// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}
// save the current state of the storage system
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)