diff --git a/.gitignore b/.gitignore index a080789ef..e1f4d0fee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ src/ pkg/ /etcd +/etcdbench /server/release_version.go /go-bindata /machine* diff --git a/README.md b/README.md index 78a32a60a..2f525a27a 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Or feel free to just use curl, as in the examples below. ### Getting etcd -The latest release is available as a binary at [Github][github-release]. +The latest release and setup instructions are available at [Github][github-release]. [github-release]: https://github.com/coreos/etcd/releases/ @@ -162,16 +162,11 @@ curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello etcd" "createdIndex": 3, "key": "/message", "modifiedIndex": 3, - "prevValue": "Hello world", "value": "Hello etcd" } } ``` -Notice that `node.prevValue` is set to the previous value of the key - `Hello world`. -It is useful when you want to atomically set a value to a key and get its old value. - - ### Deleting a key You can remove the `/message` key with a `DELETE` request: @@ -186,8 +181,7 @@ curl -L http://127.0.0.1:4001/v2/keys/message -XDELETE "node": { "createdIndex": 3, "key": "/message", - "modifiedIndex": 4, - "prevValue": "Hello etcd" + "modifiedIndex": 4 } } ``` @@ -330,6 +324,38 @@ curl -X POST http://127.0.0.1:4001/v2/keys/queue -d value=Job2 } ``` +To enumerate the in-order keys as a sorted list, use the "sorted" parameter. + +```sh +curl -s -X GET 'http://127.0.0.1:4001/v2/keys/queue?recursive=true&sorted=true' +``` + +```json +{ + "action": "get", + "node": { + "createdIndex": 2, + "dir": true, + "key": "/queue", + "modifiedIndex": 2, + "nodes": [ + { + "createdIndex": 2, + "key": "/queue/2", + "modifiedIndex": 2, + "value": "Job1" + }, + { + "createdIndex": 3, + "key": "/queue/3", + "modifiedIndex": 3, + "value": "Job2" + } + ] + } +} +``` + [lockmod]: #lock @@ -383,7 +409,7 @@ curl -X GET http://127.0.0.1:4001/v2/keys/dir/asdf\?consistent\=true\&wait\=true ### Atomic Compare-and-Swap (CAS) -Etcd can be used as a centralized coordination service in a cluster and `CompareAndSwap` is the most basic operation to build distributed lock service. +Etcd can be used as a centralized coordination service in a cluster and `CompareAndSwap` is the most basic operation used to build a distributed lock service. This command will set the value of a key only if the client-provided conditions are equal to the current conditions. @@ -454,7 +480,6 @@ The response should be "createdIndex": 8, "key": "/foo", "modifiedIndex": 9, - "prevValue": "one", "value": "two" } } @@ -734,7 +759,6 @@ And also the response from the etcd server: "action": "set", "key": "/foo", "modifiedIndex": 3, - "prevValue": "bar", "value": "bar" } ``` @@ -789,7 +813,6 @@ And also the response from the server: "createdIndex": 12, "key": "/foo", "modifiedIndex": 12, - "prevValue": "two", "value": "bar" } } @@ -966,27 +989,27 @@ These modules provide things like dashboards, locks and leader election. ### Dashboard -An HTML dashboard can be found at `http://127.0.0.1:4001/mod/dashboard/``` +An HTML dashboard can be found at `http://127.0.0.1:4001/mod/dashboard/` ### Lock The Lock module implements a fair lock that can be used when lots of clients want access to a single resource. -A lock can be associated with a name. -The name is unique so if a lock tries to request a name that is already queued for a lock then it will find it and watch until that name obtains the lock. -If you lock the same name on a key from two separate curl sessions they'll both return at the same time. +A lock can be associated with a value. +The value is unique so if a lock tries to request a value that is already queued for a lock then it will find it and watch until that value obtains the lock. +If you lock the same value on a key from two separate curl sessions they'll both return at the same time. Here's the API: -**Acquire a lock (with no name) for "customer1"** +**Acquire a lock (with no value) for "customer1"** ```sh curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 ``` -**Acquire a lock for "customer1" that is associated with the name "bar"** +**Acquire a lock for "customer1" that is associated with the value "bar"** ```sh -curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar +curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d value=bar ``` **Renew the TTL on the "customer1" lock for index 2** @@ -995,13 +1018,13 @@ curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d index=2 ``` -**Renew the TTL on the "customer1" lock for name "customer1"** +**Renew the TTL on the "customer1" lock for value "customer1"** ```sh -curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar +curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d value=bar ``` -**Retrieve the current name for the "customer1" lock.** +**Retrieve the current value for the "customer1" lock.** ```sh curl http://127.0.0.1:4001/mod/v2/lock/customer1 @@ -1016,13 +1039,13 @@ curl http://127.0.0.1:4001/mod/v2/lock/customer1?field=index **Delete the "customer1" lock with the index 2** ```sh -curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=customer1 +curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=2 ``` -**Delete the "customer1" lock with the name "bar"** +**Delete the "customer1" lock with the value "bar"** ```sh -curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?name=bar +curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?value=bar ``` @@ -1037,7 +1060,7 @@ Here's the API: **Attempt to set a value for the "order_processing" leader key:** ```sh -curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com +curl -X PUT http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com ``` **Retrieve the current value for the "order_processing" leader key:** @@ -1050,14 +1073,14 @@ myserver1.foo.com **Remove a value from the "order_processing" leader key:** ```sh -curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com +curl -X DELETE http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com ``` If multiple clients attempt to set the value for a key then only one will succeed. The other clients will hang until the current value is removed because of TTL or because of a `DELETE` operation. Multiple clients can submit the same value and will all be notified when that value succeeds. -To update the TTL of a value simply reissue the same `POST` command that you used to set the value. +To update the TTL of a value simply reissue the same `PUT` command that you used to set the value. ## Contributing @@ -1118,6 +1141,11 @@ See [CONTRIBUTING](https://github.com/coreos/etcd/blob/master/CONTRIBUTING.md) f - [spheromak/etcd-cookbook](https://github.com/spheromak/etcd-cookbook) +**BOSH Releases** + +- [cloudfoundry-community/etcd-boshrelease](https://github.com/cloudfoundry-community/etcd-boshrelease) +- [cloudfoundry/cf-release](https://github.com/cloudfoundry/cf-release/tree/master/jobs/etcd) + **Projects using etcd** - [binocarlos/yoda](https://github.com/binocarlos/yoda) - etcd + ZeroMQ @@ -1218,8 +1246,10 @@ The values are specified in milliseconds. ### Versioning +#### Service Versioning + etcd uses [semantic versioning][semver]. -New minor versions may add additional features to the API however. +New minor versions may add additional features to the API. You can get the version of etcd by issuing a request to /version: @@ -1227,10 +1257,15 @@ You can get the version of etcd by issuing a request to /version: curl -L http://127.0.0.1:4001/version ``` -During the pre-v1.0.0 series of releases we may break the API as we fix bugs and get feedback. - [semver]: http://semver.org/ +#### API Versioning + +Clients are encouraged to use the `v2` API. The `v1` API will not change. + +The `v2` API responses should not change after the 0.2.0 release but new features will be added over time. + +During the pre-v1.0.0 series of releases we may break the API as we fix bugs and get feedback. ### License diff --git a/bench/bench.go b/bench/bench.go new file mode 100644 index 000000000..c1fab70f5 --- /dev/null +++ b/bench/bench.go @@ -0,0 +1,58 @@ +package main + +import ( + "flag" + "log" + "strconv" + + "github.com/coreos/go-etcd/etcd" +) + +func write(requests int, end chan int) { + client := etcd.NewClient(nil) + + for i := 0; i < requests; i++ { + key := strconv.Itoa(i) + client.Set(key, key, 0) + } + end <- 1 +} + +func watch(key string) { + client := etcd.NewClient(nil) + + receiver := make(chan *etcd.Response) + go client.Watch(key, 0, true, receiver, nil) + + log.Printf("watching: %s", key) + + received := 0 + for { + <-receiver + received++ + } +} + +func main() { + rWrites := flag.Int("write-requests", 50000, "number of writes") + cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes") + + watches := flag.Int("watches", 500, "number of writes") + + flag.Parse() + + for i := 0; i < *watches; i++ { + key := strconv.Itoa(i) + go watch(key) + } + + wChan := make(chan int, *cWrites) + for i := 0; i < *cWrites; i++ { + go write((*rWrites / *cWrites), wChan) + } + + for i := 0; i < *cWrites; i++ { + <-wChan + log.Printf("Completed %d writes", (*rWrites / *cWrites)) + } +} diff --git a/build b/build index 8e76b4c86..eff0f84d5 100755 --- a/build +++ b/build @@ -24,3 +24,4 @@ done ./scripts/release-version > server/release_version.go go build "${ETCD_PACKAGE}" +go build -o etcdbench "${ETCD_PACKAGE}"/bench diff --git a/error/error.go b/error/error.go index 3eb6c4561..dd06067ff 100644 --- a/error/error.go +++ b/error/error.go @@ -111,10 +111,19 @@ func (e Error) toJsonString() string { func (e Error) Write(w http.ResponseWriter) { w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index)) - // 3xx is reft internal error - if e.ErrorCode/100 == 3 { - http.Error(w, e.toJsonString(), http.StatusInternalServerError) - } else { - http.Error(w, e.toJsonString(), http.StatusBadRequest) + // 3xx is raft internal error + status := http.StatusBadRequest + switch e.ErrorCode { + case EcodeKeyNotFound: + status = http.StatusNotFound + case EcodeNotFile, EcodeDirNotEmpty: + status = http.StatusForbidden + case EcodeTestFailed, EcodeNodeExist: + status = http.StatusPreconditionFailed + default: + if e.ErrorCode/100 == 3 { + status = http.StatusInternalServerError + } } + http.Error(w, e.toJsonString(), status) } diff --git a/release_version.go b/release_version.go deleted file mode 100644 index 23d8bce95..000000000 --- a/release_version.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -const releaseVersion = "v0.1.2-33-g1a2a9d6" diff --git a/server/v2/delete_handler.go b/server/v2/delete_handler.go index a9046985b..a9f2a4122 100644 --- a/server/v2/delete_handler.go +++ b/server/v2/delete_handler.go @@ -2,7 +2,9 @@ package v2 import ( "net/http" + "strconv" + etcdErr "github.com/coreos/etcd/error" "github.com/gorilla/mux" ) @@ -13,6 +15,35 @@ func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error { recursive := (req.FormValue("recursive") == "true") dir := (req.FormValue("dir") == "true") - c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive) + req.ParseForm() + _, valueOk := req.Form["prevValue"] + _, indexOk := req.Form["prevIndex"] + + if !valueOk && !indexOk { + c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive) + return s.Dispatch(c, w, req) + } + + var err error + prevIndex := uint64(0) + prevValue := req.Form.Get("prevValue") + + if indexOk { + prevIndexStr := req.Form.Get("prevIndex") + prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store().Index()) + } + } + + if valueOk { + if prevValue == "" { + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store().Index()) + } + } + + c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, prevValue, prevIndex) return s.Dispatch(c, w, req) } diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 9a67ea2ae..abdb94f7e 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { // Start the watcher on the store. eventChan, err := s.Store().Watch(key, recursive, sinceIndex) if err != nil { - return etcdErr.NewError(500, key, s.Store().Index()) + return err } cn, _ := w.(http.CloseNotifier) diff --git a/server/v2/tests/delete_handler_test.go b/server/v2/tests/delete_handler_test.go index b6f7fa042..d5e1c8206 100644 --- a/server/v2/tests/delete_handler_test.go +++ b/server/v2/tests/delete_handler_test.go @@ -2,6 +2,7 @@ package v2 import ( "fmt" + "net/http" "net/url" "testing" @@ -22,6 +23,7 @@ func TestV2DeleteKey(t *testing.T) { resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) tests.ReadBody(resp) resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "") @@ -31,7 +33,7 @@ func TestV2DeleteKey(t *testing.T) { // Ensures that an empty directory is deleted when dir is set. // // $ curl -X PUT localhost:4001/v2/keys/foo?dir=true -// $ curl -X PUT localhost:4001/v2/keys/foo ->fail +// $ curl -X DELETE localhost:4001/v2/keys/foo ->fail // $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true // func TestV2DeleteEmptyDirectory(t *testing.T) { @@ -39,9 +41,11 @@ func TestV2DeleteEmptyDirectory(t *testing.T) { resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{}) tests.ReadBody(resp) resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusForbidden) bodyJson := tests.ReadBodyJSON(resp) assert.Equal(t, bodyJson["errorCode"], 102, "") resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "") @@ -59,9 +63,11 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) { resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?dir=true"), url.Values{}) tests.ReadBody(resp) resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusForbidden) bodyJson := tests.ReadBodyJSON(resp) assert.Equal(t, bodyJson["errorCode"], 108, "") resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true&recursive=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "") @@ -78,8 +84,120 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) { resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{}) tests.ReadBody(resp) resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "") }) } + +// Ensures that a key is deleted if the previous index matches +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2 +// +func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v) + tests.ReadBody(resp) + resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{}) + assert.Nil(t, err, "") + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndDelete", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo", "") + assert.Equal(t, node["modifiedIndex"], 3, "") + }) +} + +// Ensures that a key is not deleted if the previous index does not match +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=100 +// +func TestV2DeleteKeyCADOnIndexFail(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v) + tests.ReadBody(resp) + resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=100"), url.Values{}) + assert.Nil(t, err, "") + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101) + }) +} + +// Ensures that an error is thrown if an invalid previous index is provided. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=bad_index +// +func TestV2DeleteKeyCADWithInvalidIndex(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + tests.ReadBody(resp) + resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=bad_index"), v) + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 203) + }) +} + +// Ensures that a key is deleted only if the previous value matches. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=XXX +// +func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + tests.ReadBody(resp) + resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=XXX"), v) + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndDelete", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["modifiedIndex"], 3, "") + }) +} + +// Ensures that a key is not deleted if the previous value does not match. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=YYY +// +func TestV2DeleteKeyCADOnValueFail(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + tests.ReadBody(resp) + resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=YYY"), v) + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101) + }) +} + +// Ensures that an error is thrown if an invalid previous value is provided. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex= +// +func TestV2DeleteKeyCADWithInvalidValue(t *testing.T) { + tests.RunServer(func(s *server.Server) { + v := url.Values{} + v.Set("value", "XXX") + resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + tests.ReadBody(resp) + resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue="), v) + body := tests.ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 201) + }) +} diff --git a/server/v2/tests/get_handler_test.go b/server/v2/tests/get_handler_test.go index a8dd0e1d3..19c6dbf22 100644 --- a/server/v2/tests/get_handler_test.go +++ b/server/v2/tests/get_handler_test.go @@ -2,6 +2,7 @@ package v2 import ( "fmt" + "net/http" "net/url" "testing" "time" @@ -13,6 +14,7 @@ import ( // Ensures that a value can be retrieve for a given key. // +// $ curl localhost:4001/v2/keys/foo/bar -> fail // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX // $ curl localhost:4001/v2/keys/foo/bar // @@ -20,9 +22,15 @@ func TestV2GetKey(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.Get(fullURL) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + + resp, _ = tests.PutForm(fullURL, v) tests.ReadBody(resp) - resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")) + + resp, _ = tests.Get(fullURL) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "get", "") node := body["node"].(map[string]interface{}) @@ -51,6 +59,7 @@ func TestV2GetKeyRecursively(t *testing.T) { tests.ReadBody(resp) resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true")) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "get", "") node := body["node"].(map[string]interface{}) @@ -205,7 +214,7 @@ func TestV2WatchKeyInDir(t *testing.T) { }() // wait for expiration, we do have a up to 500 millisecond delay - time.Sleep(1500 * time.Millisecond) + time.Sleep(2000 * time.Millisecond) select { case <-c: diff --git a/server/v2/tests/post_handler_test.go b/server/v2/tests/post_handler_test.go index a048d5cb5..92acfc90a 100644 --- a/server/v2/tests/post_handler_test.go +++ b/server/v2/tests/post_handler_test.go @@ -3,6 +3,7 @@ package v2 import ( "fmt" "testing" + "net/http" "github.com/coreos/etcd/server" "github.com/coreos/etcd/tests" @@ -18,7 +19,9 @@ import ( func TestV2CreateUnique(t *testing.T) { tests.RunServer(func(s *server.Server) { // POST should add index to list. - resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PostForm(fullURL, nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "create", "") @@ -28,7 +31,8 @@ func TestV2CreateUnique(t *testing.T) { assert.Equal(t, node["modifiedIndex"], 2, "") // Second POST should add next index to list. - resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil) + resp, _ = tests.PostForm(fullURL, nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) @@ -36,6 +40,7 @@ func TestV2CreateUnique(t *testing.T) { // POST to a different key should add index to that list. resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) diff --git a/server/v2/tests/put_handler_test.go b/server/v2/tests/put_handler_test.go index 7b51da4c2..f507c0d3d 100644 --- a/server/v2/tests/put_handler_test.go +++ b/server/v2/tests/put_handler_test.go @@ -2,6 +2,7 @@ package v2 import ( "fmt" + "net/http" "net/url" "testing" "time" @@ -20,6 +21,7 @@ func TestV2SetKey(t *testing.T) { v := url.Values{} v.Set("value", "XXX") resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") @@ -33,6 +35,7 @@ func TestV2SetKey(t *testing.T) { func TestV2SetDirectory(t *testing.T) { tests.RunServer(func(s *server.Server) { resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") @@ -50,6 +53,7 @@ func TestV2SetKeyWithTTL(t *testing.T) { v.Set("value", "XXX") v.Set("ttl", "20") resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBodyJSON(resp) node := body["node"].(map[string]interface{}) assert.Equal(t, node["ttl"], 20, "") @@ -70,6 +74,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) { v.Set("value", "XXX") v.Set("ttl", "bad_ttl") resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 202, "") assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "") @@ -77,7 +82,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) { }) } -// Ensures that a key is conditionally set only if it previously did not exist. +// Ensures that a key is conditionally set if it previously did not exist. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false // @@ -87,25 +92,29 @@ func TestV2CreateKeySuccess(t *testing.T) { v.Set("value", "XXX") v.Set("prevExist", "false") resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBodyJSON(resp) node := body["node"].(map[string]interface{}) assert.Equal(t, node["value"], "XXX", "") }) } -// Ensures that a key is not conditionally because it previously existed. +// Ensures that a key is not conditionally set because it previously existed. // -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false -> fail // func TestV2CreateKeyFail(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") v.Set("prevExist", "false") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 105, "") assert.Equal(t, body["message"], "Key already exists", "") @@ -123,12 +132,15 @@ func TestV2UpdateKeySuccess(t *testing.T) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevExist", "true") - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "update", "") }) @@ -144,9 +156,11 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) { v := url.Values{} resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) v.Set("value", "YYY") v.Set("prevExist", "true") resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 100, "") assert.Equal(t, body["message"], "Key not found", "") @@ -156,19 +170,27 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) { // Ensures that a key is not conditionally set if it previously did not exist. // -// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d prevExist=true -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=YYY -d prevExist=true -> fail +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true -> fail // func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "YYY") v.Set("prevExist", "true") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 100, "") assert.Equal(t, body["message"], "Key not found", "") assert.Equal(t, body["cause"], "/foo", "") + + resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + body = tests.ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 100, "") + assert.Equal(t, body["message"], "Key not found", "") + assert.Equal(t, body["cause"], "/foo", "") }) } @@ -181,11 +203,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevIndex", "2") - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) @@ -203,11 +228,14 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevIndex", "10") - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") @@ -226,6 +254,7 @@ func TestV2SetKeyCASWithInvalidIndex(t *testing.T) { v.Set("value", "YYY") v.Set("prevIndex", "bad_index") resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 203, "") assert.Equal(t, body["message"], "The given index in POST form is not a number", "") @@ -242,11 +271,14 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "XXX") - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) @@ -264,11 +296,14 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) { tests.RunServer(func(s *server.Server) { v := url.Values{} v.Set("value", "XXX") - resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar") + resp, _ := tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "AAA") - resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + resp, _ = tests.PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") @@ -287,6 +322,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) { v.Set("value", "XXX") v.Set("prevValue", "") resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 201, "") assert.Equal(t, body["message"], "PrevValue is Required in POST form", "") diff --git a/store/command_factory.go b/store/command_factory.go index 65cf18851..9eabffe27 100644 --- a/store/command_factory.go +++ b/store/command_factory.go @@ -22,6 +22,7 @@ type CommandFactory interface { CreateDeleteCommand(key string, dir, recursive bool) raft.Command CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command + CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command CreateSyncCommand(now time.Time) raft.Command } diff --git a/store/event.go b/store/event.go index 120f01e71..00885af80 100644 --- a/store/event.go +++ b/store/event.go @@ -1,13 +1,14 @@ package store const ( - Get = "get" - Create = "create" - Set = "set" - Update = "update" - Delete = "delete" - CompareAndSwap = "compareAndSwap" - Expire = "expire" + Get = "get" + Create = "create" + Set = "set" + Update = "update" + Delete = "delete" + CompareAndSwap = "compareAndSwap" + CompareAndDelete = "compareAndDelete" + Expire = "expire" ) type Event struct { diff --git a/store/event_history.go b/store/event_history.go index 19d781def..308932e03 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event { return e } -// scan function is enumerating events from the index in history and -// stops till the first point where the key has identified key +// scan enumerates events from the index history and stops at the first point +// where the key matches. func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - // the index should locate after the event history's StartIndex - if index-eh.StartIndex < 0 { + // index should be after the event history's StartIndex + if index < eh.StartIndex { return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, fmt.Sprintf("the requested history has been cleared [%v/%v]", eh.StartIndex, index), 0) } - // the index should locate before the size of the queue minus the duplicate count + // the index should come before the size of the queue minus the duplicate count if index > eh.LastIndex { // future index return nil, nil } - i := eh.Queue.Front + offset := index - eh.StartIndex + i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity for { e := eh.Queue.Events[i] @@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, ok = ok || strings.HasPrefix(e.Node.Key, key) } - if ok && index <= e.Index() { // make sure we bypass the smaller one + if ok { return e, nil } i = (i + 1) % eh.Queue.Capacity - if i > eh.Queue.back() { + if i == eh.Queue.Back { return nil, nil } } @@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory { Events: make([]*Event, eh.Queue.Capacity), Size: eh.Queue.Size, Front: eh.Queue.Front, + Back: eh.Queue.Back, } for i, e := range eh.Queue.Events { diff --git a/store/event_queue.go b/store/event_queue.go index 0852956b1..e32bf4cc3 100644 --- a/store/event_queue.go +++ b/store/event_queue.go @@ -4,22 +4,17 @@ type eventQueue struct { Events []*Event Size int Front int + Back int Capacity int } -func (eq *eventQueue) back() int { - return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity -} - func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity - - eq.Events[index] = e + eq.Events[eq.Back] = e + eq.Back = (eq.Back + 1) % eq.Capacity if eq.Size == eq.Capacity { //dequeue - eq.Front = (index + 1) % eq.Capacity + eq.Front = (eq.Front + 1) % eq.Capacity } else { eq.Size++ } - } diff --git a/store/event_test.go b/store/event_test.go index a4579b72a..8515cf767 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) { t.Fatalf("bad index shoud reuturn nil") } } + +// TestFullEventQueue tests a queue with capacity = 10 +// Add 1000 events into that queue, and test if scanning +// works still for previous events. +func TestFullEventQueue(t *testing.T) { + + eh := newEventHistory(10) + + // Add + for i := 0; i < 1000; i++ { + e := newEvent(Create, "/foo", uint64(i), uint64(i)) + eh.addEvent(e) + e, err := eh.scan("/foo", true, uint64(i-1)) + if i > 0 { + if e == nil || err != nil { + t.Fatalf("scan error [/foo] [%v] %v", i-1, i) + } + } + } +} diff --git a/store/node.go b/store/node.go index f036a594e..71c0a64d7 100644 --- a/store/node.go +++ b/store/node.go @@ -306,6 +306,13 @@ func (n *node) UpdateTTL(expireTime time.Time) { } } +func (n *node) Compare(prevValue string, prevIndex uint64) bool { + compareValue := (prevValue == "" || n.Value == prevValue) + compareIndex := (prevIndex == 0 || n.ModifiedIndex == prevIndex) + + return compareValue && compareIndex +} + // Clone function clone the node recursively and return the new node. // If the node is a directory, it will clone all the content under this directory. // If the node is a key-value pair, it will clone the pair. diff --git a/store/stats.go b/store/stats.go index 2eb271d73..5f3cd62a3 100644 --- a/store/stats.go +++ b/store/stats.go @@ -35,6 +35,8 @@ const ( GetSuccess GetFail ExpireCount + CompareAndDeleteSuccess + CompareAndDeleteFail ) type Stats struct { @@ -63,6 +65,10 @@ type Stats struct { CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"` CompareAndSwapFail uint64 `json:"compareAndSwapFail"` + // Number of compareAndDelete requests + CompareAndDeleteSuccess uint64 `json:"compareAndDeleteSuccess"` + CompareAndDeleteFail uint64 `json:"compareAndDeleteFail"` + ExpireCount uint64 `json:"expireCount"` Watchers uint64 `json:"watchers"` @@ -76,7 +82,8 @@ func newStats() *Stats { func (s *Stats) clone() *Stats { return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail, s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess, - s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount} + s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, + s.CompareAndDeleteSuccess, s.CompareAndDeleteFail, s.Watchers, s.ExpireCount} } // Status() return the statistics info of etcd storage its recent start @@ -93,6 +100,7 @@ func (s *Stats) TotalTranscations() uint64 { return s.SetSuccess + s.SetFail + s.DeleteSuccess + s.DeleteFail + s.CompareAndSwapSuccess + s.CompareAndSwapFail + + s.CompareAndDeleteSuccess + s.CompareAndDeleteFail + s.UpdateSuccess + s.UpdateFail } @@ -122,6 +130,10 @@ func (s *Stats) Inc(field int) { atomic.AddUint64(&s.CompareAndSwapSuccess, 1) case CompareAndSwapFail: atomic.AddUint64(&s.CompareAndSwapFail, 1) + case CompareAndDeleteSuccess: + atomic.AddUint64(&s.CompareAndDeleteSuccess, 1) + case CompareAndDeleteFail: + atomic.AddUint64(&s.CompareAndDeleteFail, 1) case ExpireCount: atomic.AddUint64(&s.ExpireCount, 1) } diff --git a/store/store.go b/store/store.go index 01023e308..08d585056 100644 --- a/store/store.go +++ b/store/store.go @@ -51,6 +51,7 @@ type Store interface { CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time) (*Event, error) Delete(nodePath string, recursive, dir bool) (*Event, error) + CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) Save() ([]byte, error) @@ -207,37 +208,37 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint return nil, err } - if n.IsDir() { // can only test and set file + if n.IsDir() { // can only compare and swap file s.Stats.Inc(CompareAndSwapFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex) } // If both of the prevValue and prevIndex are given, we will test both of them. // Command will be executed, only if both of the tests are successful. - if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) { - // update etcd index - s.CurrentIndex++ - - e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex) - eNode := e.Node - - eNode.PrevValue = n.Value - - // if test succeed, write the value - n.Write(value, s.CurrentIndex) - n.UpdateTTL(expireTime) - - eNode.Value = value - eNode.Expiration, eNode.TTL = n.ExpirationAndTTL() - - s.WatcherHub.notify(e) - s.Stats.Inc(CompareAndSwapSuccess) - return e, nil + if !n.Compare(prevValue, prevIndex) { + cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) + s.Stats.Inc(CompareAndSwapFail) + return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex) } - cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) - s.Stats.Inc(CompareAndSwapFail) - return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex) + // update etcd index + s.CurrentIndex++ + + e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex) + eNode := e.Node + + eNode.PrevValue = n.Value + + // if test succeed, write the value + n.Write(value, s.CurrentIndex) + n.UpdateTTL(expireTime) + + eNode.Value = value + eNode.Expiration, eNode.TTL = n.ExpirationAndTTL() + + s.WatcherHub.notify(e) + s.Stats.Inc(CompareAndSwapSuccess) + return e, nil } // Delete function deletes the node at the given path. @@ -257,8 +258,6 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { dir = true } - nextIndex := s.CurrentIndex + 1 - n, err := s.internalGet(nodePath) if err != nil { // if the node does not exist, return error @@ -266,6 +265,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { return nil, err } + nextIndex := s.CurrentIndex + 1 e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex) eNode := e.Node @@ -276,7 +276,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { } callback := func(path string) { // notify function - // notify the watchers with delted set true + // notify the watchers with deleted set true s.WatcherHub.notifyWatchers(e, path, true) } @@ -296,9 +296,52 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { return e, nil } +func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) { + nodePath = path.Clean(path.Join("/", nodePath)) + + s.worldLock.Lock() + defer s.worldLock.Unlock() + + n, err := s.internalGet(nodePath) + + if err != nil { // if the node does not exist, return error + s.Stats.Inc(CompareAndDeleteFail) + return nil, err + } + + if n.IsDir() { // can only compare and delete file + s.Stats.Inc(CompareAndSwapFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex) + } + + // If both of the prevValue and prevIndex are given, we will test both of them. + // Command will be executed, only if both of the tests are successful. + if !n.Compare(prevValue, prevIndex) { + cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) + s.Stats.Inc(CompareAndDeleteFail) + return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex) + } + + // update etcd index + s.CurrentIndex++ + + e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex) + + callback := func(path string) { // notify function + // notify the watchers with deleted set true + s.WatcherHub.notifyWatchers(e, path, true) + } + + // delete a key-value pair, no error should happen + n.Remove(false, false, callback) + + s.WatcherHub.notify(e) + s.Stats.Inc(CompareAndDeleteSuccess) + return e, nil +} + func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { key = path.Clean(path.Join("/", key)) - nextIndex := s.CurrentIndex + 1 s.worldLock.RLock() diff --git a/store/store_test.go b/store/store_test.go index 103b4e9f6..b29fea627 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -337,7 +337,58 @@ func TestRootRdOnly(t *testing.T) { _, err = s.CompareAndSwap("/", "", 0, "", Permanent) assert.NotNil(t, err, "") +} +func TestStoreCompareAndDeletePrevValue(t *testing.T) { + s := newStore() + s.Create("/foo", false, "bar", false, Permanent) + e, err := s.CompareAndDelete("/foo", "bar", 0) + assert.Nil(t, err, "") + assert.Equal(t, e.Action, "compareAndDelete", "") + assert.Equal(t, e.Node.Key, "/foo", "") +} + +func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) { + s := newStore() + s.Create("/foo", false, "bar", false, Permanent) + e, _err := s.CompareAndDelete("/foo", "baz", 0) + err := _err.(*etcdErr.Error) + assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") + assert.Equal(t, err.Message, "Compare failed", "") + assert.Nil(t, e, "") + e, _ = s.Get("/foo", false, false) + assert.Equal(t, e.Node.Value, "bar", "") +} + +func TestStoreCompareAndDeletePrevIndex(t *testing.T) { + s := newStore() + s.Create("/foo", false, "bar", false, Permanent) + e, err := s.CompareAndDelete("/foo", "", 1) + assert.Nil(t, err, "") + assert.Equal(t, e.Action, "compareAndDelete", "") +} + +func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) { + s := newStore() + s.Create("/foo", false, "bar", false, Permanent) + e, _err := s.CompareAndDelete("/foo", "", 100) + assert.NotNil(t, _err, "") + err := _err.(*etcdErr.Error) + assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "") + assert.Equal(t, err.Message, "Compare failed", "") + assert.Nil(t, e, "") + e, _ = s.Get("/foo", false, false) + assert.Equal(t, e.Node.Value, "bar", "") +} + +// Ensure that the store cannot delete a directory. +func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) { + s := newStore() + s.Create("/foo", true, "", false, Permanent) + _, _err := s.CompareAndDelete("/foo", "", 0) + assert.NotNil(t, _err, "") + err := _err.(*etcdErr.Error) + assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "") } // Ensure that the store can conditionally update a key if it has a previous value. diff --git a/store/v2/command_factory.go b/store/v2/command_factory.go index 1833bfcc1..bc06661e7 100644 --- a/store/v2/command_factory.go +++ b/store/v2/command_factory.go @@ -75,6 +75,15 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p } } +// CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store. +func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command { + return &CompareAndDeleteCommand{ + Key: key, + PrevValue: prevValue, + PrevIndex: prevIndex, + } +} + func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command { return &SyncCommand{ Time: time.Now(), diff --git a/store/v2/compare_and_delete_command.go b/store/v2/compare_and_delete_command.go new file mode 100644 index 000000000..8a518c5a6 --- /dev/null +++ b/store/v2/compare_and_delete_command.go @@ -0,0 +1,37 @@ +package v2 + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/raft" +) + +func init() { + raft.RegisterCommand(&CompareAndDeleteCommand{}) +} + +// The CompareAndDelete performs a conditional delete on a key in the store. +type CompareAndDeleteCommand struct { + Key string `json:"key"` + PrevValue string `json:"prevValue"` + PrevIndex uint64 `json:"prevIndex"` +} + +// The name of the compareAndDelete command in the log +func (c *CompareAndDeleteCommand) CommandName() string { + return "etcd:compareAndDelete" +} + +// Set the key-value pair if the current value of the key equals to the given prevValue +func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(store.Store) + + e, err := s.CompareAndDelete(c.Key, c.PrevValue, c.PrevIndex) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} diff --git a/tests/functional/simple_snapshot_test.go b/tests/functional/simple_snapshot_test.go index a93ecb39d..b47be3e65 100644 --- a/tests/functional/simple_snapshot_test.go +++ b/tests/functional/simple_snapshot_test.go @@ -56,7 +56,7 @@ func TestSimpleSnapshot(t *testing.T) { index, _ := strconv.Atoi(snapshots[0].Name()[2:5]) - if index <= 507 || index >= 510 { + if index < 507 || index > 510 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } @@ -89,7 +89,7 @@ func TestSimpleSnapshot(t *testing.T) { index, _ = strconv.Atoi(snapshots[0].Name()[2:6]) - if index <= 1014 || index > 1017 { + if index < 1014 || index > 1017 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } } diff --git a/tests/functional/v1_migration_test.go b/tests/functional/v1_migration_test.go index c8906a915..3a0512101 100644 --- a/tests/functional/v1_migration_test.go +++ b/tests/functional/v1_migration_test.go @@ -3,6 +3,7 @@ package test import ( "fmt" "io/ioutil" + "net/http" "os" "os/exec" "path/filepath" @@ -91,7 +92,7 @@ func TestV1ClusterMigration(t *testing.T) { resp, err := tests.Get("http://localhost:4001/v2/keys/message") body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, resp.StatusCode, 400) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n") // Ensure TTL'd message is removed.