implement recursive for CompareAndDelete in the store

This commit is contained in:
rick 2013-12-01 13:38:09 -07:00
parent f8985d731f
commit d2d7e37990
5 changed files with 69 additions and 15 deletions

View File

@ -21,7 +21,7 @@ type CommandFactory interface {
CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
CreateDeleteCommand(key string, recursive bool) raft.Command CreateDeleteCommand(key string, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command
CreateSyncCommand(now time.Time) raft.Command CreateSyncCommand(now time.Time) raft.Command
} }

View File

@ -50,7 +50,7 @@ type Store interface {
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error) value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive bool) (*Event, error) Delete(nodePath string, recursive bool) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error) Save() ([]byte, error)
Recovery(state []byte) error Recovery(state []byte) error
@ -281,9 +281,7 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
return e, nil return e, nil
} }
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64, func (s *store) CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error) {
) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
@ -296,24 +294,25 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
return nil, err return nil, err
} }
if n.IsDir() { // can only test and set file isDir := n.IsDir()
s.Stats.Inc(CompareAndDeleteFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// If both of the prevValue and prevIndex are given, we will test both of them. // If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful. // Command will be executed, only if both of the tests are successful.
if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) { if (isDir || prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex+1) e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex+1)
if isDir {
e.Dir = true
} else {
e.PrevValue = n.Value e.PrevValue = n.Value
}
callback := func(path string) { // notify function callback := func(path string) { // notify function
// notify the watchers with deleted set true // notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true) s.WatcherHub.notifyWatchers(e, path, true)
} }
err = n.Remove(false, callback) err = n.Remove(recursive, callback)
if err != nil { if err != nil {
s.Stats.Inc(CompareAndDeleteFail) s.Stats.Inc(CompareAndDeleteFail)

View File

@ -230,6 +230,7 @@ func TestStoreCompareAndDeletePrevValue(t *testing.T) {
e, err := s.CompareAndDelete("/foo", false, "bar", 0) e, err := s.CompareAndDelete("/foo", false, "bar", 0)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "") assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, false, "")
} }
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
@ -255,7 +256,7 @@ func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) { func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", "bar", false, Permanent) s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "baz", 100) e, _err := s.CompareAndDelete("/foo", false, "", 100)
err := _err.(*etcdErr.Error) err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "") assert.Equal(t, err.Message, "Test Failed", "")
@ -264,6 +265,58 @@ func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
assert.Equal(t, e.Value, "bar", "") assert.Equal(t, e.Value, "bar", "")
} }
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretory(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, true, "")
}
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretoryIgnoringPrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "baz", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store can delete a directory with a prev index.
func TestStoreCompareAndDeleteDirectoryPrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store won't delete a directory if prevIndex does not match
func TestStoreCompareAndDeleteDirectoryPrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", true, "", 100)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "", "")
}
// Ensure that the store cannot delete a directory if recursive is not specified.
func TestStoreCompareAndDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not A File", "")
assert.Nil(t, e, "")
}
// Ensure that the store can conditionally update a key if it has a previous value. // Ensure that the store can conditionally update a key if it has a previous value.
func TestStoreCompareAndSwapPrevValue(t *testing.T) { func TestStoreCompareAndSwapPrevValue(t *testing.T) {
s := newStore() s := newStore()

View File

@ -73,9 +73,10 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
} }
// CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store. // CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store.
func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command { func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command {
return &CompareAndDeleteCommand{ return &CompareAndDeleteCommand{
Key: key, Key: key,
Recursive: recursive,
PrevValue: prevValue, PrevValue: prevValue,
PrevIndex: prevIndex, PrevIndex: prevIndex,
} }

View File

@ -15,6 +15,7 @@ type CompareAndDeleteCommand struct {
Key string `json:"key"` Key string `json:"key"`
PrevValue string `json:"prevValue"` PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"` PrevIndex uint64 `json:"prevIndex"`
Recursive bool `json:"recursive"`
} }
// The name of the compareAndDelete command in the log // The name of the compareAndDelete command in the log
@ -26,7 +27,7 @@ func (c *CompareAndDeleteCommand) CommandName() string {
func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) { func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store) s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndDelete(c.Key, c.PrevValue, c.PrevIndex) e, err := s.CompareAndDelete(c.Key, c.Recursive, c.PrevValue, c.PrevIndex)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)