diff --git a/server/server.go b/server/server.go index d1b1abf0f..0aeb5dbc4 100644 --- a/server/server.go +++ b/server/server.go @@ -254,10 +254,11 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro for i := 0; i < count; i++ { go func() { for j := 0; j < 10; j++ { - c := &store.UpdateCommand{ + c := &store.CreateCommand{ Key: "foo", Value: "bar", ExpireTime: time.Unix(0, 0), + Force: true, } s.peerServer.RaftServer().Do(c) } diff --git a/server/v1/set_key_handler.go b/server/v1/set_key_handler.go index 03b6d7f9b..acd4037c1 100644 --- a/server/v1/set_key_handler.go +++ b/server/v1/set_key_handler.go @@ -31,7 +31,7 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { // If the "prevValue" is specified then test-and-set. Otherwise create a new key. var c raft.Command if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { - c = &store.TestAndSetCommand{ + c = &store.CompareAndSwapCommand{ Key: key, Value: value, PrevValue: prevValueArr[0], diff --git a/server/v2/update_key_handler.go b/server/v2/update_key_handler.go index 841c8828d..60c260afc 100644 --- a/server/v2/update_key_handler.go +++ b/server/v2/update_key_handler.go @@ -29,35 +29,58 @@ func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error prevValue, valueOk := req.Form["prevValue"] prevIndexStr, indexOk := req.Form["prevIndex"] + prevExist, existOk := req.Form["prevExist"] var c raft.Command - if !valueOk && !indexOk { // update without test - c = &store.UpdateCommand{ + + // Set command: create a new node or replace the old one. + if !valueOk && !indexOk && !existOk { + c = &store.CreateCommand{ Key: key, Value: value, ExpireTime: expireTime, + Force: true, } + return s.Dispatch(c, w, req) + } - } else { // update with test - var prevIndex uint64 - - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) - - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) + // update with test + if existOk { + if prevExist[0] == "false" { + // Create command: create a new node. Fail, if a node already exists + // Ignore prevIndex and prevValue + c = &store.CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, } - } else { - prevIndex = 0 } + } - c = &store.TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue[0], - PrevIndex: prevIndex, + var prevIndex uint64 + + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm) } + } else { + prevIndex = 0 + } + + if valueOk { + if prevValue[0] == "" { + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm) + } + } + + c = &store.CompareAndSwapCommand{ + Key: key, + Value: value, + PrevValue: prevValue[0], + PrevIndex: prevIndex, } return s.Dispatch(c, w, req) diff --git a/store/test_and_set_command.go b/store/compare_and_swap_command.go similarity index 61% rename from store/test_and_set_command.go rename to store/compare_and_swap_command.go index 03cb1879a..c4bfee569 100644 --- a/store/test_and_set_command.go +++ b/store/compare_and_swap_command.go @@ -8,11 +8,11 @@ import ( ) func init() { - raft.RegisterCommand(&TestAndSetCommand{}) + raft.RegisterCommand(&CompareAndSwapCommand{}) } -// The TestAndSetCommand performs a conditional update on a key in the store. -type TestAndSetCommand struct { +// The CompareAndSwap performs a conditional update on a key in the store. +type CompareAndSwapCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` @@ -21,15 +21,15 @@ type TestAndSetCommand struct { } // The name of the testAndSet command in the log -func (c *TestAndSetCommand) CommandName() string { - return "etcd:testAndSet" +func (c *CompareAndSwapCommand) CommandName() string { + return "etcd:compareAndSwap" } // Set the key-value pair if the current value of the key equals to the given prevValue -func (c *TestAndSetCommand) Apply(server raft.Server) (interface{}, error) { +func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) { s, _ := server.StateMachine().(Store) - e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, + e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { diff --git a/store/stats_test.go b/store/stats_test.go index 44d1a8999..fa6483ac2 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -37,7 +37,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) + _, err := s.update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) if err != nil { UpdateFail++ } else { @@ -58,7 +58,7 @@ func TestBasicStats(t *testing.T) { for _, k := range keys { i++ - _, err := s.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1) + _, err := s.CompareAndSwap(k, "foo", 0, "bar", Permanent, i, 1) if err != nil { TestAndSetFail++ } else { diff --git a/store/store.go b/store/store.go index d063aa6ba..f25b4345e 100644 --- a/store/store.go +++ b/store/store.go @@ -17,8 +17,7 @@ type Store interface { Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) Create(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64) (*Event, error) - Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) - TestAndSet(nodePath string, prevValue string, prevIndex uint64, + CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) @@ -116,47 +115,7 @@ func (s *store) Create(nodePath string, value string, incrementalSuffix bool, fo return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create) } -// Update function updates the value/ttl of the node. -// If the node is a file, the value and the ttl can be updated. -// If the node is a directory, only the ttl can be updated. -func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - s.worldLock.Lock() - defer s.worldLock.Unlock() - nodePath = path.Clean(path.Join("/", nodePath)) - - n, err := s.internalGet(nodePath, index, term) - - if err != nil { // if the node does not exist, return error - s.Stats.Inc(UpdateFail) - return nil, err - } - - e := newEvent(Update, nodePath, s.Index, s.Term) - - if len(newValue) != 0 { - if n.IsDir() { - // if the node is a directory, we cannot update value - s.Stats.Inc(UpdateFail) - return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) - } - - e.PrevValue = n.Value - n.Write(newValue, index, term) - } - - // update ttl - n.UpdateTTL(expireTime) - - e.Expiration, e.TTL = n.ExpirationAndTTL() - - s.WatcherHub.notify(e) - - s.Stats.Inc(UpdateSuccess) - - return e, nil -} - -func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, +func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) @@ -164,8 +123,8 @@ func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, s.worldLock.Lock() defer s.worldLock.Unlock() - if prevValue == "" && prevIndex == 0 { // try create - return s.internalCreate(nodePath, value, false, false, expireTime, index, term, TestAndSet) + if prevValue == "" && prevIndex == 0 { // try just update + return s.update(nodePath, value, expireTime, index, term) } n, err := s.internalGet(nodePath, index, term) @@ -293,6 +252,46 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string return curr, nil } +// Update function updates the value/ttl of the node. +// If the node is a file, the value and the ttl can be updated. +// If the node is a directory, only the ttl can be updated. +func (s *store) update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + s.worldLock.Lock() + defer s.worldLock.Unlock() + nodePath = path.Clean(path.Join("/", nodePath)) + + n, err := s.internalGet(nodePath, index, term) + + if err != nil { // if the node does not exist, return error + s.Stats.Inc(UpdateFail) + return nil, err + } + + e := newEvent(Update, nodePath, s.Index, s.Term) + + if len(newValue) != 0 { + if n.IsDir() { + // if the node is a directory, we cannot update value + s.Stats.Inc(UpdateFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term) + } + + e.PrevValue = n.Value + n.Write(newValue, index, term) + } + + // update ttl + n.UpdateTTL(expireTime) + + e.Expiration, e.TTL = n.ExpirationAndTTL() + + s.WatcherHub.notify(e) + + s.Stats.Inc(UpdateSuccess) + + return e, nil +} + func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { diff --git a/store/store_test.go b/store/store_test.go index 958d99007..457a9b5f5 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -74,7 +74,7 @@ func TestUpdateFile(t *testing.T) { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) } - _, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1) + _, err = s.update("/foo/bar", "barbar", Permanent, 2, 1) if err != nil { t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error()) @@ -114,7 +114,7 @@ func TestUpdateFile(t *testing.T) { } expire := time.Now().Add(time.Second * 2) - _, err = s.Update("/foo/foo", "", expire, 7, 1) + _, err = s.update("/foo/foo", "", expire, 7, 1) if err != nil { t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error()) } @@ -286,18 +286,18 @@ func TestExpire(t *testing.T) { } } -func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? +func TestCompareAndSwap(t *testing.T) { // TODO prevValue == nil ? s := newStore() s.Create("/foo", "bar", false, false, Permanent, 1, 1) // test on wrong previous value - _, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) + _, err := s.CompareAndSwap("/foo", "barbar", 0, "car", Permanent, 2, 1) if err == nil { t.Fatal("test and set should fail barbar != bar") } // test on value - e, err := s.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1) + e, err := s.CompareAndSwap("/foo", "bar", 0, "car", Permanent, 3, 1) if err != nil { t.Fatal("test and set should succeed bar == bar") @@ -308,7 +308,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? } // test on index - e, err = s.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1) + e, err = s.CompareAndSwap("/foo", "", 3, "bar", Permanent, 4, 1) if err != nil { t.Fatal("test and set should succeed index 3 == 3") @@ -331,14 +331,14 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) - s.Update("/foo/foo/foo", "car", Permanent, 2, 1) + s.update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != Update { t.Fatal("watch for Update node fails ", e) } c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) - s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) + s.CompareAndSwap("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet node fails") @@ -360,14 +360,14 @@ func TestWatch(t *testing.T) { } c, _ = s.Watch("/foo", true, 0, 5, 1) - s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) + s.update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") } c, _ = s.Watch("/foo", true, 0, 6, 1) - s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) + s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet subdirectory fails") @@ -390,7 +390,7 @@ func TestWatch(t *testing.T) { } s.Create("/foo/foo/boo", "foo", false, false, Permanent, 10, 1) - s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) + s.update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) @@ -399,7 +399,7 @@ func TestWatch(t *testing.T) { } s.Create("/foo/foo/boo", "foo", false, false, Permanent, 12, 1) - s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) + s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) diff --git a/store/update_command.go b/store/update_command.go deleted file mode 100644 index 3152006bc..000000000 --- a/store/update_command.go +++ /dev/null @@ -1,38 +0,0 @@ -package store - -import ( - "time" - - "github.com/coreos/etcd/log" - "github.com/coreos/go-raft" -) - -func init() { - raft.RegisterCommand(&UpdateCommand{}) -} - -// The UpdateCommand updates the value of a key in the Store. -type UpdateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` -} - -// The name of the update command in the log -func (c *UpdateCommand) CommandName() string { - return "etcd:update" -} - -// Update node -func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(Store) - - e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) - - if err != nil { - log.Debug(err) - return nil, err - } - - return e, nil -}