merge compareAndDelete

This commit is contained in:
Xiang Li 2013-12-19 22:19:49 +08:00
commit e2fa89d554
9 changed files with 409 additions and 11 deletions

View File

@ -2,7 +2,9 @@ package v2
import ( import (
"net/http" "net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -13,6 +15,35 @@ func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
recursive := (req.FormValue("recursive") == "true") recursive := (req.FormValue("recursive") == "true")
dir := (req.FormValue("dir") == "true") dir := (req.FormValue("dir") == "true")
req.ParseForm()
_, valueOk := req.Form["prevValue"]
_, indexOk := req.Form["prevIndex"]
if !valueOk && !indexOk {
c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive) c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
return s.Dispatch(c, w, req) return s.Dispatch(c, w, req)
}
var err error
prevIndex := uint64(0)
prevValue := req.Form.Get("prevValue")
if indexOk {
prevIndexStr := req.Form.Get("prevIndex")
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store().Index())
}
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store().Index())
}
}
c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, recursive, prevValue, prevIndex)
return s.Dispatch(c, w, req)
} }

View File

@ -83,3 +83,167 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "") assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
}) })
} }
// Ensures that a directory is deleted.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true
//
func TestV2DeleteDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "delete", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a directory is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true&prevIndex=1
//
func TestV2DeleteDirectoryCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true&prevIndex=1"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a directory is not deleted if the previous index does not match
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true&prevIndex=100
//
func TestV2DeleteDirectoryCADOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true&prevIndex=100"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that a key is deleted only if the previous index matches.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=1
//
func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=1"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a key is not deleted if the previous index does not matche.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=2
//
func TestV2DeleteKeyCADOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=100"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that an error is thrown if an invalid previous index is provided.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=bad_index
//
func TestV2DeleteKeyCADWithInvalidIndex(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=bad_index"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 203)
})
}
// Ensures that a key is deleted only if the previous value matches.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=XXX
//
func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=XXX"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a key is not deleted if the previous value does not matche.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=YYY
//
func TestV2DeleteKeyCADOnValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=YYY"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that an error is thrown if an invalid previous value is provided.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=
//
func TestV2DeleteKeyCADWithInvalidValue(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue="), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 201)
})
}

View File

@ -22,6 +22,7 @@ type CommandFactory interface {
CreateDeleteCommand(key string, dir, recursive bool) raft.Command CreateDeleteCommand(key string, dir, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string, CreateCompareAndSwapCommand(key string, value string, prevValue string,
prevIndex uint64, expireTime time.Time) raft.Command prevIndex uint64, expireTime time.Time) raft.Command
CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command
CreateSyncCommand(now time.Time) raft.Command CreateSyncCommand(now time.Time) raft.Command
} }

View File

@ -7,6 +7,7 @@ const (
Update = "update" Update = "update"
Delete = "delete" Delete = "delete"
CompareAndSwap = "compareAndSwap" CompareAndSwap = "compareAndSwap"
CompareAndDelete = "compareAndDelete"
Expire = "expire" Expire = "expire"
) )

View File

@ -35,6 +35,8 @@ const (
GetSuccess GetSuccess
GetFail GetFail
ExpireCount ExpireCount
CompareAndDeleteSuccess
CompareAndDeleteFail
) )
type Stats struct { type Stats struct {
@ -63,6 +65,10 @@ type Stats struct {
CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"` CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"`
CompareAndSwapFail uint64 `json:"compareAndSwapFail"` CompareAndSwapFail uint64 `json:"compareAndSwapFail"`
// Number of compareAndDelete requests
CompareAndDeleteSuccess uint64 `json:"compareAndDeleteSuccess"`
CompareAndDeleteFail uint64 `json:"compareAndDeleteFail"`
ExpireCount uint64 `json:"expireCount"` ExpireCount uint64 `json:"expireCount"`
Watchers uint64 `json:"watchers"` Watchers uint64 `json:"watchers"`
@ -76,7 +82,8 @@ func newStats() *Stats {
func (s *Stats) clone() *Stats { func (s *Stats) clone() *Stats {
return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess, s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess,
s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount} s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail,
s.CompareAndDeleteSuccess, s.CompareAndDeleteFail, s.Watchers, s.ExpireCount}
} }
// Status() return the statistics info of etcd storage its recent start // Status() return the statistics info of etcd storage its recent start
@ -93,6 +100,7 @@ func (s *Stats) TotalTranscations() uint64 {
return s.SetSuccess + s.SetFail + return s.SetSuccess + s.SetFail +
s.DeleteSuccess + s.DeleteFail + s.DeleteSuccess + s.DeleteFail +
s.CompareAndSwapSuccess + s.CompareAndSwapFail + s.CompareAndSwapSuccess + s.CompareAndSwapFail +
s.CompareAndDeleteSuccess + s.CompareAndDeleteFail +
s.UpdateSuccess + s.UpdateFail s.UpdateSuccess + s.UpdateFail
} }
@ -122,6 +130,10 @@ func (s *Stats) Inc(field int) {
atomic.AddUint64(&s.CompareAndSwapSuccess, 1) atomic.AddUint64(&s.CompareAndSwapSuccess, 1)
case CompareAndSwapFail: case CompareAndSwapFail:
atomic.AddUint64(&s.CompareAndSwapFail, 1) atomic.AddUint64(&s.CompareAndSwapFail, 1)
case CompareAndDeleteSuccess:
atomic.AddUint64(&s.CompareAndDeleteSuccess, 1)
case CompareAndDeleteFail:
atomic.AddUint64(&s.CompareAndDeleteFail, 1)
case ExpireCount: case ExpireCount:
atomic.AddUint64(&s.ExpireCount, 1) atomic.AddUint64(&s.ExpireCount, 1)
} }

View File

@ -51,6 +51,7 @@ type Store interface {
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error) value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive, dir bool) (*Event, error) Delete(nodePath string, recursive, dir bool) (*Event, error)
CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error) Save() ([]byte, error)
@ -276,7 +277,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
} }
callback := func(path string) { // notify function callback := func(path string) { // notify function
// notify the watchers with delted set true // notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true) s.WatcherHub.notifyWatchers(e, path, true)
} }
@ -296,9 +297,57 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
return e, nil return e, nil
} }
func (s *store) CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock()
defer s.worldLock.Unlock()
n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(CompareAndDeleteFail)
return nil, err
}
isDir := n.IsDir()
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if (isDir || prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex+1, n.CreatedIndex)
if isDir {
e.Node.Dir = true
}
callback := func(path string) { // notify function
// notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true)
}
err = n.Remove(true, recursive, callback)
if err != nil {
s.Stats.Inc(CompareAndDeleteFail)
return nil, err
}
// update etcd index
s.CurrentIndex++
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndDeleteSuccess)
return e, nil
}
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(CompareAndDeleteFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
}
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
key = path.Clean(path.Join("/", key)) key = path.Clean(path.Join("/", key))
nextIndex := s.CurrentIndex + 1 nextIndex := s.CurrentIndex + 1
s.worldLock.RLock() s.worldLock.RLock()

View File

@ -337,7 +337,99 @@ func TestRootRdOnly(t *testing.T) {
_, err = s.CompareAndSwap("/", "", 0, "", Permanent) _, err = s.CompareAndSwap("/", "", 0, "", Permanent)
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
}
func TestStoreCompareAndDeletePrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", false, "bar", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, false, "")
}
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "baz", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
}
func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", false, "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "", 100)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
}
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretory(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, true, "")
}
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretoryIgnoringPrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "baz", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store can delete a directory with a prev index.
func TestStoreCompareAndDeleteDirectoryPrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store won't delete a directory if prevIndex does not match
func TestStoreCompareAndDeleteDirectoryPrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", true, "", 100)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "", "")
}
// Ensure that the store cannot delete a directory if recursive is not specified.
func TestStoreCompareAndDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not A File", "")
assert.Nil(t, e, "")
} }
// Ensure that the store can conditionally update a key if it has a previous value. // Ensure that the store can conditionally update a key if it has a previous value.

View File

@ -75,6 +75,16 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
} }
} }
// CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store.
func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command {
return &CompareAndDeleteCommand{
Key: key,
Recursive: recursive,
PrevValue: prevValue,
PrevIndex: prevIndex,
}
}
func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command { func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
return &SyncCommand{ return &SyncCommand{
Time: time.Now(), Time: time.Now(),

View File

@ -0,0 +1,38 @@
package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/raft"
)
func init() {
raft.RegisterCommand(&CompareAndDeleteCommand{})
}
// The CompareAndDelete performs a conditional delete on a key in the store.
type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
Recursive bool `json:"recursive"`
}
// The name of the compareAndDelete command in the log
func (c *CompareAndDeleteCommand) CommandName() string {
return "etcd:compareAndDelete"
}
// Set the key-value pair if the current value of the key equals to the given prevValue
func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndDelete(c.Key, c.Recursive, c.PrevValue, c.PrevIndex)
if err != nil {
log.Debug(err)
return nil, err
}
return e, nil
}