Merge branch 'master' of https://github.com/coreos/etcd into logging

Conflicts:
	tests/functional/simple_snapshot_test.go
This commit is contained in:
Ben Johnson 2013-12-30 16:19:57 -07:00
commit cc10b1084d
25 changed files with 599 additions and 120 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
src/
pkg/
/etcd
/etcdbench
/server/release_version.go
/go-bindata
/machine*

View File

@ -32,7 +32,7 @@ Or feel free to just use curl, as in the examples below.
### Getting etcd
The latest release is available as a binary at [Github][github-release].
The latest release and setup instructions are available at [Github][github-release].
[github-release]: https://github.com/coreos/etcd/releases/
@ -162,16 +162,11 @@ curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello etcd"
"createdIndex": 3,
"key": "/message",
"modifiedIndex": 3,
"prevValue": "Hello world",
"value": "Hello etcd"
}
}
```
Notice that `node.prevValue` is set to the previous value of the key - `Hello world`.
It is useful when you want to atomically set a value to a key and get its old value.
### Deleting a key
You can remove the `/message` key with a `DELETE` request:
@ -186,8 +181,7 @@ curl -L http://127.0.0.1:4001/v2/keys/message -XDELETE
"node": {
"createdIndex": 3,
"key": "/message",
"modifiedIndex": 4,
"prevValue": "Hello etcd"
"modifiedIndex": 4
}
}
```
@ -330,6 +324,38 @@ curl -X POST http://127.0.0.1:4001/v2/keys/queue -d value=Job2
}
```
To enumerate the in-order keys as a sorted list, use the "sorted" parameter.
```sh
curl -s -X GET 'http://127.0.0.1:4001/v2/keys/queue?recursive=true&sorted=true'
```
```json
{
"action": "get",
"node": {
"createdIndex": 2,
"dir": true,
"key": "/queue",
"modifiedIndex": 2,
"nodes": [
{
"createdIndex": 2,
"key": "/queue/2",
"modifiedIndex": 2,
"value": "Job1"
},
{
"createdIndex": 3,
"key": "/queue/3",
"modifiedIndex": 3,
"value": "Job2"
}
]
}
}
```
[lockmod]: #lock
@ -383,7 +409,7 @@ curl -X GET http://127.0.0.1:4001/v2/keys/dir/asdf\?consistent\=true\&wait\=true
### Atomic Compare-and-Swap (CAS)
Etcd can be used as a centralized coordination service in a cluster and `CompareAndSwap` is the most basic operation to build distributed lock service.
Etcd can be used as a centralized coordination service in a cluster and `CompareAndSwap` is the most basic operation used to build a distributed lock service.
This command will set the value of a key only if the client-provided conditions are equal to the current conditions.
@ -454,7 +480,6 @@ The response should be
"createdIndex": 8,
"key": "/foo",
"modifiedIndex": 9,
"prevValue": "one",
"value": "two"
}
}
@ -734,7 +759,6 @@ And also the response from the etcd server:
"action": "set",
"key": "/foo",
"modifiedIndex": 3,
"prevValue": "bar",
"value": "bar"
}
```
@ -789,7 +813,6 @@ And also the response from the server:
"createdIndex": 12,
"key": "/foo",
"modifiedIndex": 12,
"prevValue": "two",
"value": "bar"
}
}
@ -966,27 +989,27 @@ These modules provide things like dashboards, locks and leader election.
### Dashboard
An HTML dashboard can be found at `http://127.0.0.1:4001/mod/dashboard/```
An HTML dashboard can be found at `http://127.0.0.1:4001/mod/dashboard/`
### Lock
The Lock module implements a fair lock that can be used when lots of clients want access to a single resource.
A lock can be associated with a name.
The name is unique so if a lock tries to request a name that is already queued for a lock then it will find it and watch until that name obtains the lock.
If you lock the same name on a key from two separate curl sessions they'll both return at the same time.
A lock can be associated with a value.
The value is unique so if a lock tries to request a value that is already queued for a lock then it will find it and watch until that value obtains the lock.
If you lock the same value on a key from two separate curl sessions they'll both return at the same time.
Here's the API:
**Acquire a lock (with no name) for "customer1"**
**Acquire a lock (with no value) for "customer1"**
```sh
curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60
```
**Acquire a lock for "customer1" that is associated with the name "bar"**
**Acquire a lock for "customer1" that is associated with the value "bar"**
```sh
curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar
curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d value=bar
```
**Renew the TTL on the "customer1" lock for index 2**
@ -995,13 +1018,13 @@ curl -X POST http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar
curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d index=2
```
**Renew the TTL on the "customer1" lock for name "customer1"**
**Renew the TTL on the "customer1" lock for value "customer1"**
```sh
curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d name=bar
curl -X PUT http://127.0.0.1:4001/mod/v2/lock/customer1?ttl=60 -d value=bar
```
**Retrieve the current name for the "customer1" lock.**
**Retrieve the current value for the "customer1" lock.**
```sh
curl http://127.0.0.1:4001/mod/v2/lock/customer1
@ -1016,13 +1039,13 @@ curl http://127.0.0.1:4001/mod/v2/lock/customer1?field=index
**Delete the "customer1" lock with the index 2**
```sh
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=customer1
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=2
```
**Delete the "customer1" lock with the name "bar"**
**Delete the "customer1" lock with the value "bar"**
```sh
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?name=bar
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?value=bar
```
@ -1037,7 +1060,7 @@ Here's the API:
**Attempt to set a value for the "order_processing" leader key:**
```sh
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com
curl -X PUT http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com
```
**Retrieve the current value for the "order_processing" leader key:**
@ -1050,14 +1073,14 @@ myserver1.foo.com
**Remove a value from the "order_processing" leader key:**
```sh
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com
curl -X DELETE http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com
```
If multiple clients attempt to set the value for a key then only one will succeed.
The other clients will hang until the current value is removed because of TTL or because of a `DELETE` operation.
Multiple clients can submit the same value and will all be notified when that value succeeds.
To update the TTL of a value simply reissue the same `POST` command that you used to set the value.
To update the TTL of a value simply reissue the same `PUT` command that you used to set the value.
## Contributing
@ -1118,6 +1141,11 @@ See [CONTRIBUTING](https://github.com/coreos/etcd/blob/master/CONTRIBUTING.md) f
- [spheromak/etcd-cookbook](https://github.com/spheromak/etcd-cookbook)
**BOSH Releases**
- [cloudfoundry-community/etcd-boshrelease](https://github.com/cloudfoundry-community/etcd-boshrelease)
- [cloudfoundry/cf-release](https://github.com/cloudfoundry/cf-release/tree/master/jobs/etcd)
**Projects using etcd**
- [binocarlos/yoda](https://github.com/binocarlos/yoda) - etcd + ZeroMQ
@ -1218,8 +1246,10 @@ The values are specified in milliseconds.
### Versioning
#### Service Versioning
etcd uses [semantic versioning][semver].
New minor versions may add additional features to the API however.
New minor versions may add additional features to the API.
You can get the version of etcd by issuing a request to /version:
@ -1227,10 +1257,15 @@ You can get the version of etcd by issuing a request to /version:
curl -L http://127.0.0.1:4001/version
```
During the pre-v1.0.0 series of releases we may break the API as we fix bugs and get feedback.
[semver]: http://semver.org/
#### API Versioning
Clients are encouraged to use the `v2` API. The `v1` API will not change.
The `v2` API responses should not change after the 0.2.0 release but new features will be added over time.
During the pre-v1.0.0 series of releases we may break the API as we fix bugs and get feedback.
### License

58
bench/bench.go Normal file
View File

@ -0,0 +1,58 @@
package main
import (
"flag"
"log"
"strconv"
"github.com/coreos/go-etcd/etcd"
)
func write(requests int, end chan int) {
client := etcd.NewClient(nil)
for i := 0; i < requests; i++ {
key := strconv.Itoa(i)
client.Set(key, key, 0)
}
end <- 1
}
func watch(key string) {
client := etcd.NewClient(nil)
receiver := make(chan *etcd.Response)
go client.Watch(key, 0, true, receiver, nil)
log.Printf("watching: %s", key)
received := 0
for {
<-receiver
received++
}
}
func main() {
rWrites := flag.Int("write-requests", 50000, "number of writes")
cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
watches := flag.Int("watches", 500, "number of writes")
flag.Parse()
for i := 0; i < *watches; i++ {
key := strconv.Itoa(i)
go watch(key)
}
wChan := make(chan int, *cWrites)
for i := 0; i < *cWrites; i++ {
go write((*rWrites / *cWrites), wChan)
}
for i := 0; i < *cWrites; i++ {
<-wChan
log.Printf("Completed %d writes", (*rWrites / *cWrites))
}
}

1
build
View File

@ -24,3 +24,4 @@ done
./scripts/release-version > server/release_version.go
go build "${ETCD_PACKAGE}"
go build -o etcdbench "${ETCD_PACKAGE}"/bench

View File

@ -111,10 +111,19 @@ func (e Error) toJsonString() string {
func (e Error) Write(w http.ResponseWriter) {
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
// 3xx is reft internal error
if e.ErrorCode/100 == 3 {
http.Error(w, e.toJsonString(), http.StatusInternalServerError)
} else {
http.Error(w, e.toJsonString(), http.StatusBadRequest)
// 3xx is raft internal error
status := http.StatusBadRequest
switch e.ErrorCode {
case EcodeKeyNotFound:
status = http.StatusNotFound
case EcodeNotFile, EcodeDirNotEmpty:
status = http.StatusForbidden
case EcodeTestFailed, EcodeNodeExist:
status = http.StatusPreconditionFailed
default:
if e.ErrorCode/100 == 3 {
status = http.StatusInternalServerError
}
}
http.Error(w, e.toJsonString(), status)
}

View File

@ -1,3 +0,0 @@
package main
const releaseVersion = "v0.1.2-33-g1a2a9d6"

View File

@ -2,7 +2,9 @@ package v2
import (
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/gorilla/mux"
)
@ -13,6 +15,35 @@ func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
recursive := (req.FormValue("recursive") == "true")
dir := (req.FormValue("dir") == "true")
c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
req.ParseForm()
_, valueOk := req.Form["prevValue"]
_, indexOk := req.Form["prevIndex"]
if !valueOk && !indexOk {
c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
return s.Dispatch(c, w, req)
}
var err error
prevIndex := uint64(0)
prevValue := req.Form.Get("prevValue")
if indexOk {
prevIndexStr := req.Form.Get("prevIndex")
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store().Index())
}
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store().Index())
}
}
c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, prevValue, prevIndex)
return s.Dispatch(c, w, req)
}

View File

@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil {
return etcdErr.NewError(500, key, s.Store().Index())
return err
}
cn, _ := w.(http.CloseNotifier)

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
@ -22,6 +23,7 @@ func TestV2DeleteKey(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "")
@ -31,7 +33,7 @@ func TestV2DeleteKey(t *testing.T) {
// Ensures that an empty directory is deleted when dir is set.
//
// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
// $ curl -X PUT localhost:4001/v2/keys/foo ->fail
// $ curl -X DELETE localhost:4001/v2/keys/foo ->fail
// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true
//
func TestV2DeleteEmptyDirectory(t *testing.T) {
@ -39,9 +41,11 @@ func TestV2DeleteEmptyDirectory(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
bodyJson := tests.ReadBodyJSON(resp)
assert.Equal(t, bodyJson["errorCode"], 102, "")
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
@ -59,9 +63,11 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
bodyJson := tests.ReadBodyJSON(resp)
assert.Equal(t, bodyJson["errorCode"], 108, "")
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true&recursive=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
@ -78,8 +84,120 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
})
}
// Ensures that a key is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2
//
func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
// Ensures that a key is not deleted if the previous index does not match
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=100
//
func TestV2DeleteKeyCADOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=100"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that an error is thrown if an invalid previous index is provided.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=bad_index
//
func TestV2DeleteKeyCADWithInvalidIndex(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=bad_index"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 203)
})
}
// Ensures that a key is deleted only if the previous value matches.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=XXX
//
func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=XXX"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
// Ensures that a key is not deleted if the previous value does not match.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=YYY
//
func TestV2DeleteKeyCADOnValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=YYY"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that an error is thrown if an invalid previous value is provided.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=
//
func TestV2DeleteKeyCADWithInvalidValue(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue="), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 201)
})
}

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
"time"
@ -13,6 +14,7 @@ import (
// Ensures that a value can be retrieve for a given key.
//
// $ curl localhost:4001/v2/keys/foo/bar -> fail
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl localhost:4001/v2/keys/foo/bar
//
@ -20,9 +22,15 @@ func TestV2GetKey(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.Get(fullURL)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
resp, _ = tests.PutForm(fullURL, v)
tests.ReadBody(resp)
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"))
resp, _ = tests.Get(fullURL)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
node := body["node"].(map[string]interface{})
@ -51,6 +59,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
tests.ReadBody(resp)
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
node := body["node"].(map[string]interface{})
@ -205,7 +214,7 @@ func TestV2WatchKeyInDir(t *testing.T) {
}()
// wait for expiration, we do have a up to 500 millisecond delay
time.Sleep(1500 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)
select {
case <-c:

View File

@ -3,6 +3,7 @@ package v2
import (
"fmt"
"testing"
"net/http"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
@ -18,7 +19,9 @@ import (
func TestV2CreateUnique(t *testing.T) {
tests.RunServer(func(s *server.Server) {
// POST should add index to list.
resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PostForm(fullURL, nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "create", "")
@ -28,7 +31,8 @@ func TestV2CreateUnique(t *testing.T) {
assert.Equal(t, node["modifiedIndex"], 2, "")
// Second POST should add next index to list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
resp, _ = tests.PostForm(fullURL, nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
@ -36,6 +40,7 @@ func TestV2CreateUnique(t *testing.T) {
// POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
"time"
@ -20,6 +21,7 @@ func TestV2SetKey(t *testing.T) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
@ -33,6 +35,7 @@ func TestV2SetKey(t *testing.T) {
func TestV2SetDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
@ -50,6 +53,7 @@ func TestV2SetKeyWithTTL(t *testing.T) {
v.Set("value", "XXX")
v.Set("ttl", "20")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
node := body["node"].(map[string]interface{})
assert.Equal(t, node["ttl"], 20, "")
@ -70,6 +74,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
v.Set("value", "XXX")
v.Set("ttl", "bad_ttl")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 202, "")
assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "")
@ -77,7 +82,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
})
}
// Ensures that a key is conditionally set only if it previously did not exist.
// Ensures that a key is conditionally set if it previously did not exist.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
//
@ -87,25 +92,29 @@ func TestV2CreateKeySuccess(t *testing.T) {
v.Set("value", "XXX")
v.Set("prevExist", "false")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "XXX", "")
})
}
// Ensures that a key is not conditionally because it previously existed.
// Ensures that a key is not conditionally set because it previously existed.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false -> fail
//
func TestV2CreateKeyFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
v.Set("prevExist", "false")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 105, "")
assert.Equal(t, body["message"], "Key already exists", "")
@ -123,12 +132,15 @@ func TestV2UpdateKeySuccess(t *testing.T) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "update", "")
})
@ -144,9 +156,11 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
v := url.Values{}
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
@ -156,19 +170,27 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
// Ensures that a key is not conditionally set if it previously did not exist.
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d prevExist=true
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=YYY -d prevExist=true -> fail
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true -> fail
//
func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
assert.Equal(t, body["cause"], "/foo", "")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
assert.Equal(t, body["cause"], "/foo", "")
})
}
@ -181,11 +203,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "2")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
@ -203,11 +228,14 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "10")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
@ -226,6 +254,7 @@ func TestV2SetKeyCASWithInvalidIndex(t *testing.T) {
v.Set("value", "YYY")
v.Set("prevIndex", "bad_index")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 203, "")
assert.Equal(t, body["message"], "The given index in POST form is not a number", "")
@ -242,11 +271,14 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "XXX")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
@ -264,11 +296,14 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
@ -287,6 +322,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
v.Set("value", "XXX")
v.Set("prevValue", "")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 201, "")
assert.Equal(t, body["message"], "PrevValue is Required in POST form", "")

View File

@ -22,6 +22,7 @@ type CommandFactory interface {
CreateDeleteCommand(key string, dir, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string,
prevIndex uint64, expireTime time.Time) raft.Command
CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command
CreateSyncCommand(now time.Time) raft.Command
}

View File

@ -1,13 +1,14 @@
package store
const (
Get = "get"
Create = "create"
Set = "set"
Update = "update"
Delete = "delete"
CompareAndSwap = "compareAndSwap"
Expire = "expire"
Get = "get"
Create = "create"
Set = "set"
Update = "update"
Delete = "delete"
CompareAndSwap = "compareAndSwap"
CompareAndDelete = "compareAndDelete"
Expire = "expire"
)
type Event struct {

View File

@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
return e
}
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified key
// scan enumerates events from the index history and stops at the first point
// where the key matches.
func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()
// the index should locate after the event history's StartIndex
if index-eh.StartIndex < 0 {
// index should be after the event history's StartIndex
if index < eh.StartIndex {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), 0)
}
// the index should locate before the size of the queue minus the duplicate count
// the index should come before the size of the queue minus the duplicate count
if index > eh.LastIndex { // future index
return nil, nil
}
i := eh.Queue.Front
offset := index - eh.StartIndex
i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
for {
e := eh.Queue.Events[i]
@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
ok = ok || strings.HasPrefix(e.Node.Key, key)
}
if ok && index <= e.Index() { // make sure we bypass the smaller one
if ok {
return e, nil
}
i = (i + 1) % eh.Queue.Capacity
if i > eh.Queue.back() {
if i == eh.Queue.Back {
return nil, nil
}
}
@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory {
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
Back: eh.Queue.Back,
}
for i, e := range eh.Queue.Events {

View File

@ -4,22 +4,17 @@ type eventQueue struct {
Events []*Event
Size int
Front int
Back int
Capacity int
}
func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}
func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e
eq.Events[eq.Back] = e
eq.Back = (eq.Back + 1) % eq.Capacity
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
eq.Front = (eq.Front + 1) % eq.Capacity
} else {
eq.Size++
}
}

View File

@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) {
t.Fatalf("bad index shoud reuturn nil")
}
}
// TestFullEventQueue tests a queue with capacity = 10
// Add 1000 events into that queue, and test if scanning
// works still for previous events.
func TestFullEventQueue(t *testing.T) {
eh := newEventHistory(10)
// Add
for i := 0; i < 1000; i++ {
e := newEvent(Create, "/foo", uint64(i), uint64(i))
eh.addEvent(e)
e, err := eh.scan("/foo", true, uint64(i-1))
if i > 0 {
if e == nil || err != nil {
t.Fatalf("scan error [/foo] [%v] %v", i-1, i)
}
}
}
}

View File

@ -306,6 +306,13 @@ func (n *node) UpdateTTL(expireTime time.Time) {
}
}
func (n *node) Compare(prevValue string, prevIndex uint64) bool {
compareValue := (prevValue == "" || n.Value == prevValue)
compareIndex := (prevIndex == 0 || n.ModifiedIndex == prevIndex)
return compareValue && compareIndex
}
// Clone function clone the node recursively and return the new node.
// If the node is a directory, it will clone all the content under this directory.
// If the node is a key-value pair, it will clone the pair.

View File

@ -35,6 +35,8 @@ const (
GetSuccess
GetFail
ExpireCount
CompareAndDeleteSuccess
CompareAndDeleteFail
)
type Stats struct {
@ -63,6 +65,10 @@ type Stats struct {
CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"`
CompareAndSwapFail uint64 `json:"compareAndSwapFail"`
// Number of compareAndDelete requests
CompareAndDeleteSuccess uint64 `json:"compareAndDeleteSuccess"`
CompareAndDeleteFail uint64 `json:"compareAndDeleteFail"`
ExpireCount uint64 `json:"expireCount"`
Watchers uint64 `json:"watchers"`
@ -76,7 +82,8 @@ func newStats() *Stats {
func (s *Stats) clone() *Stats {
return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess,
s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount}
s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail,
s.CompareAndDeleteSuccess, s.CompareAndDeleteFail, s.Watchers, s.ExpireCount}
}
// Status() return the statistics info of etcd storage its recent start
@ -93,6 +100,7 @@ func (s *Stats) TotalTranscations() uint64 {
return s.SetSuccess + s.SetFail +
s.DeleteSuccess + s.DeleteFail +
s.CompareAndSwapSuccess + s.CompareAndSwapFail +
s.CompareAndDeleteSuccess + s.CompareAndDeleteFail +
s.UpdateSuccess + s.UpdateFail
}
@ -122,6 +130,10 @@ func (s *Stats) Inc(field int) {
atomic.AddUint64(&s.CompareAndSwapSuccess, 1)
case CompareAndSwapFail:
atomic.AddUint64(&s.CompareAndSwapFail, 1)
case CompareAndDeleteSuccess:
atomic.AddUint64(&s.CompareAndDeleteSuccess, 1)
case CompareAndDeleteFail:
atomic.AddUint64(&s.CompareAndDeleteFail, 1)
case ExpireCount:
atomic.AddUint64(&s.ExpireCount, 1)
}

View File

@ -51,6 +51,7 @@ type Store interface {
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive, dir bool) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error)
@ -207,37 +208,37 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
return nil, err
}
if n.IsDir() { // can only test and set file
if n.IsDir() { // can only compare and swap file
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
// update etcd index
s.CurrentIndex++
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
eNode := e.Node
eNode.PrevValue = n.Value
// if test succeed, write the value
n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireTime)
eNode.Value = value
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndSwapSuccess)
return e, nil
if !n.Compare(prevValue, prevIndex) {
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
}
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
// update etcd index
s.CurrentIndex++
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
eNode := e.Node
eNode.PrevValue = n.Value
// if test succeed, write the value
n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireTime)
eNode.Value = value
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndSwapSuccess)
return e, nil
}
// Delete function deletes the node at the given path.
@ -257,8 +258,6 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
dir = true
}
nextIndex := s.CurrentIndex + 1
n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error
@ -266,6 +265,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
return nil, err
}
nextIndex := s.CurrentIndex + 1
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
eNode := e.Node
@ -276,7 +276,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
}
callback := func(path string) { // notify function
// notify the watchers with delted set true
// notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true)
}
@ -296,9 +296,52 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
return e, nil
}
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock()
defer s.worldLock.Unlock()
n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(CompareAndDeleteFail)
return nil, err
}
if n.IsDir() { // can only compare and delete file
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if !n.Compare(prevValue, prevIndex) {
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(CompareAndDeleteFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
}
// update etcd index
s.CurrentIndex++
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
callback := func(path string) { // notify function
// notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true)
}
// delete a key-value pair, no error should happen
n.Remove(false, false, callback)
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndDeleteSuccess)
return e, nil
}
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
key = path.Clean(path.Join("/", key))
nextIndex := s.CurrentIndex + 1
s.worldLock.RLock()

View File

@ -337,7 +337,58 @@ func TestRootRdOnly(t *testing.T) {
_, err = s.CompareAndSwap("/", "", 0, "", Permanent)
assert.NotNil(t, err, "")
}
func TestStoreCompareAndDeletePrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", "bar", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Node.Key, "/foo", "")
}
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", "baz", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Compare failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Node.Value, "bar", "")
}
func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", "", 100)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Compare failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Node.Value, "bar", "")
}
// Ensure that the store cannot delete a directory.
func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) {
s := newStore()
s.Create("/foo", true, "", false, Permanent)
_, _err := s.CompareAndDelete("/foo", "", 0)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
}
// Ensure that the store can conditionally update a key if it has a previous value.

View File

@ -75,6 +75,15 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
}
}
// CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store.
func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command {
return &CompareAndDeleteCommand{
Key: key,
PrevValue: prevValue,
PrevIndex: prevIndex,
}
}
func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
return &SyncCommand{
Time: time.Now(),

View File

@ -0,0 +1,37 @@
package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/raft"
)
func init() {
raft.RegisterCommand(&CompareAndDeleteCommand{})
}
// The CompareAndDelete performs a conditional delete on a key in the store.
type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
// The name of the compareAndDelete command in the log
func (c *CompareAndDeleteCommand) CommandName() string {
return "etcd:compareAndDelete"
}
// Set the key-value pair if the current value of the key equals to the given prevValue
func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndDelete(c.Key, c.PrevValue, c.PrevIndex)
if err != nil {
log.Debug(err)
return nil, err
}
return e, nil
}

View File

@ -56,7 +56,7 @@ func TestSimpleSnapshot(t *testing.T) {
index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
if index <= 507 || index >= 510 {
if index < 507 || index > 510 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
}
@ -89,7 +89,7 @@ func TestSimpleSnapshot(t *testing.T) {
index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
if index <= 1014 || index > 1017 {
if index < 1014 || index > 1017 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
}
}

View File

@ -3,6 +3,7 @@ package test
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
@ -91,7 +92,7 @@ func TestV1ClusterMigration(t *testing.T) {
resp, err := tests.Get("http://localhost:4001/v2/keys/message")
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, resp.StatusCode, 400)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
// Ensure TTL'd message is removed.