mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #472 from benbjohnson/fix-error-codes
Fix mod/lock and mod/leader return error codes.
This commit is contained in:
commit
87113f985f
@ -658,6 +658,214 @@ curl -L http://127.0.0.1:4001/v2/keys/
|
||||
|
||||
Here we see the `/message` key but our hidden `/_message` key is not returned.
|
||||
|
||||
|
||||
## Lock Module
|
||||
|
||||
The lock module is used to serialize access to resources used by clients.
|
||||
Multiple clients can attempt to acquire a lock but only one can have it at a time.
|
||||
Once the lock is released, the next client waiting for the lock will receive it.
|
||||
|
||||
|
||||
### Acquiring a Lock
|
||||
|
||||
To acquire a lock, simply send a `POST` request to the lock module with they lock name and TTL:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/lock/mylock -XPOST -d ttl=20
|
||||
```
|
||||
|
||||
You will receive the lock index when you acquire the lock:
|
||||
|
||||
```
|
||||
2
|
||||
```
|
||||
|
||||
If the TTL is not specified or is not a number then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 202,
|
||||
"message": "The given TTL in POST form is not a number",
|
||||
"cause": "Acquire",
|
||||
}
|
||||
```
|
||||
|
||||
If you specify a timeout that is not a number then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 205,
|
||||
"message": "The given timeout in POST form is not a number",
|
||||
"cause": "Acquire",
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### Renewing a Lock
|
||||
|
||||
To extend the TTL of an already acquired lock, simply repeat your original request but with a `PUT` and the lock index instead:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/lock/mylock -XPUT -d index=5 -d ttl=20
|
||||
```
|
||||
|
||||
If the index or value is not specified then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 207,
|
||||
"message": "Index or value is required",
|
||||
"cause": "Renew",
|
||||
}
|
||||
```
|
||||
|
||||
If the index or value does not exist then you'll receive the following error with a `404 Not Found` HTTP code:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 100,
|
||||
"message": "Key not found",
|
||||
"index": 1
|
||||
}
|
||||
```
|
||||
|
||||
If the TTL is not specified or is not a number then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 202,
|
||||
"message": "The given TTL in POST form is not a number",
|
||||
"cause": "Renew",
|
||||
}
|
||||
```
|
||||
|
||||
### Releasing a Lock
|
||||
|
||||
When the client is finished with the lock, simply send a `DELETE` request to release the lock:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/lock/mylock -XDELETE -d index=5
|
||||
```
|
||||
|
||||
If the index or value is not specified then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 207,
|
||||
"message": "Index or value is required",
|
||||
"cause": "Release",
|
||||
}
|
||||
```
|
||||
|
||||
If the index and value are both specified then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 208,
|
||||
"message": "Index and value cannot both be specified",
|
||||
"cause": "Release",
|
||||
}
|
||||
```
|
||||
|
||||
If the index or value does not exist then you'll receive the following error with a `404 Not Found` HTTP code:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 100,
|
||||
"message": "Key not found",
|
||||
"index": 1
|
||||
}
|
||||
```
|
||||
|
||||
### Retrieving a Lock
|
||||
|
||||
To determine the current value or index of a lock, send a `GET` request to the lock.
|
||||
You can specify a `field` of `index` or `value`.
|
||||
The default is `value`.
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/lock/mylock?field=index
|
||||
```
|
||||
|
||||
Will return the current index:
|
||||
|
||||
```sh
|
||||
2
|
||||
```
|
||||
|
||||
If you specify a field other than `field` or `value` then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 209,
|
||||
"message": "Invalid field",
|
||||
"cause": "Get",
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Leader Module
|
||||
|
||||
The leader module wraps the lock module to provide a simple interface for electing a single leader in a cluster.
|
||||
|
||||
|
||||
### Setting the Leader
|
||||
|
||||
A client can attempt to become leader by sending a `PUT` request to the leader module with the name of the leader to elect:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/leader/myclustername -XPUT -d ttl=300 -d name=foo.mydomain.com
|
||||
```
|
||||
|
||||
You will receive a successful `200` HTTP response code when the leader is elected.
|
||||
|
||||
If the name is not specified then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 206,
|
||||
"message": "Name is required in POST form",
|
||||
"cause": "Set",
|
||||
}
|
||||
```
|
||||
|
||||
You can also receive any errors specified by the Lock module.
|
||||
|
||||
|
||||
### Retrieving the Current Leader
|
||||
|
||||
A client can check to determine if there is a current leader by sending a `GET` request to the leader module:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/leader/myclustername
|
||||
```
|
||||
|
||||
You will receive the name of the current leader:
|
||||
|
||||
```sh
|
||||
foo.mydomain.com
|
||||
```
|
||||
|
||||
|
||||
### Relinquishing Leadership
|
||||
|
||||
A client can give up leadership by sending a `DELETE` request with the leader name:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/mod/v2/leader/myclustername?name=foo.mydomain.com -XDELETE
|
||||
```
|
||||
|
||||
If the name is not specified then you'll receive the following error:
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode": 206,
|
||||
"message": "Name is required in POST form",
|
||||
"cause": "Set",
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Statistics
|
||||
|
||||
An etcd cluster keeps track of a number of stastics including latency, bandwidth and uptime.
|
||||
|
@ -35,11 +35,16 @@ const (
|
||||
EcodeRootROnly = 107
|
||||
EcodeDirNotEmpty = 108
|
||||
|
||||
EcodeValueRequired = 200
|
||||
EcodePrevValueRequired = 201
|
||||
EcodeTTLNaN = 202
|
||||
EcodeIndexNaN = 203
|
||||
EcodeValueOrTTLRequired = 204
|
||||
EcodeValueRequired = 200
|
||||
EcodePrevValueRequired = 201
|
||||
EcodeTTLNaN = 202
|
||||
EcodeIndexNaN = 203
|
||||
EcodeValueOrTTLRequired = 204
|
||||
EcodeTimeoutNaN = 205
|
||||
EcodeNameRequired = 206
|
||||
EcodeIndexOrValueRequired = 207
|
||||
EcodeIndexValueMutex = 208
|
||||
EcodeInvalidField = 209
|
||||
|
||||
EcodeRaftInternal = 300
|
||||
EcodeLeaderElect = 301
|
||||
@ -68,6 +73,11 @@ func init() {
|
||||
errors[EcodeTTLNaN] = "The given TTL in POST form is not a number"
|
||||
errors[EcodeIndexNaN] = "The given index in POST form is not a number"
|
||||
errors[EcodeValueOrTTLRequired] = "Value or TTL is required in POST form"
|
||||
errors[EcodeTimeoutNaN] = "The given timeout in POST form is not a number"
|
||||
errors[EcodeNameRequired] = "Name is required in POST form"
|
||||
errors[EcodeIndexOrValueRequired] = "Index or value is required"
|
||||
errors[EcodeIndexValueMutex] = "Index and value cannot both be specified"
|
||||
errors[EcodeInvalidField] = "Invalid field"
|
||||
|
||||
// raft related errors
|
||||
errors[EcodeRaftInternal] = "Raft Internal Error"
|
||||
|
@ -7,22 +7,21 @@ import (
|
||||
"net/url"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// deleteHandler remove a given leader leader.
|
||||
func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// deleteHandler remove a given leader.
|
||||
func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
vars := mux.Vars(req)
|
||||
name := req.FormValue("name")
|
||||
if name == "" {
|
||||
http.Error(w, "leader name required", http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeNameRequired, "Delete", 0)
|
||||
}
|
||||
|
||||
// Proxy the request to the the lock service.
|
||||
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("value", name)
|
||||
@ -30,20 +29,17 @@ func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
r, err := http.NewRequest("DELETE", u.String(), nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Read from the leader lock.
|
||||
resp, err := h.client.Do(r)
|
||||
if err != nil {
|
||||
http.Error(w, "delete leader http error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("delete leader error: "))
|
||||
}
|
||||
io.Copy(w, resp.Body)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -9,21 +9,18 @@ import (
|
||||
)
|
||||
|
||||
// getHandler retrieves the current leader.
|
||||
func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
vars := mux.Vars(req)
|
||||
|
||||
// Proxy the request to the lock service.
|
||||
url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
|
||||
resp, err := h.client.Get(url)
|
||||
if err != nil {
|
||||
http.Error(w, "read leader error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("get leader error: "))
|
||||
}
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
io.Copy(w, resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// prefix is appended to the lock's prefix since the leader mod uses the lock mod.
|
||||
@ -27,8 +28,22 @@ func NewHandler(addr string) (http.Handler) {
|
||||
addr: addr,
|
||||
}
|
||||
h.StrictSlash(false)
|
||||
h.HandleFunc("/{key:.*}", h.getHandler).Methods("GET")
|
||||
h.HandleFunc("/{key:.*}", h.setHandler).Methods("PUT")
|
||||
h.HandleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE")
|
||||
h.handleFunc("/{key:.*}", h.getHandler).Methods("GET")
|
||||
h.handleFunc("/{key:.*}", h.setHandler).Methods("PUT")
|
||||
h.handleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE")
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *handler) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
|
||||
return h.Router.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
|
||||
if err := f(w, req); err != nil {
|
||||
switch err := err.(type) {
|
||||
case *etcdErr.Error:
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
err.Write(w)
|
||||
default:
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -7,22 +7,21 @@ import (
|
||||
"net/url"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// setHandler attempts to set the current leader.
|
||||
func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
vars := mux.Vars(req)
|
||||
name := req.FormValue("name")
|
||||
if name == "" {
|
||||
http.Error(w, "leader name required", http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeNameRequired, "Set", 0)
|
||||
}
|
||||
|
||||
// Proxy the request to the the lock service.
|
||||
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("value", name)
|
||||
@ -32,8 +31,7 @@ func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
r, err := http.NewRequest("POST", u.String(), nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Close request if this connection disconnects.
|
||||
@ -51,13 +49,10 @@ func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// Read from the leader lock.
|
||||
resp, err := h.client.Do(r)
|
||||
if err != nil {
|
||||
http.Error(w, "set leader http error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("set leader error: "))
|
||||
}
|
||||
io.Copy(w, resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
@ -14,23 +14,27 @@ import (
|
||||
func TestModLeaderSet(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Set leader.
|
||||
body, err := testSetLeader(s, "foo", "xxx", 10)
|
||||
body, status, err := testSetLeader(s, "foo", "xxx", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Check that the leader is set.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
body, status, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "xxx")
|
||||
|
||||
// Delete leader.
|
||||
body, err = testDeleteLeader(s, "foo", "xxx")
|
||||
body, status, err = testDeleteLeader(s, "foo", "xxx")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
|
||||
// Check that the leader is removed.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
body, status, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
@ -39,42 +43,45 @@ func TestModLeaderSet(t *testing.T) {
|
||||
func TestModLeaderRenew(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Set leader.
|
||||
body, err := testSetLeader(s, "foo", "xxx", 2)
|
||||
body, status, err := testSetLeader(s, "foo", "xxx", 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Renew leader.
|
||||
body, err = testSetLeader(s, "foo", "xxx", 3)
|
||||
body, status, err = testSetLeader(s, "foo", "xxx", 3)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that the leader is set.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
body, status, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "xxx")
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
func testSetLeader(s *server.Server, key string, name string, ttl int) (string, error) {
|
||||
func testSetLeader(s *server.Server, key string, name string, ttl int) (string, int, error) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s&ttl=%d", s.URL(), key, name, ttl), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testGetLeader(s *server.Server, key string) (string, error) {
|
||||
func testGetLeader(s *server.Server, key string) (string, int, error) {
|
||||
resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/leader/%s", s.URL(), key))
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testDeleteLeader(s *server.Server, key string, name string) (string, error) {
|
||||
func testDeleteLeader(s *server.Server, key string, name string) (string, int, error) {
|
||||
resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s", s.URL(), key, name), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
@ -8,8 +8,9 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// acquireHandler attempts to acquire a lock on the given key.
|
||||
@ -17,7 +18,7 @@ import (
|
||||
// The "value" parameter specifies a value to associate with the lock.
|
||||
// The "ttl" parameter specifies how long the lock will persist for.
|
||||
// The "timeout" parameter specifies how long the request should wait for the lock.
|
||||
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
h.client.SyncCluster()
|
||||
|
||||
// Setup connection watcher.
|
||||
@ -36,16 +37,14 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
|
||||
if req.FormValue("timeout") == "" {
|
||||
timeout = -1
|
||||
} else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
|
||||
http.Error(w, "invalid timeout: " + req.FormValue("timeout"), http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeTimeoutNaN, "Acquire", 0)
|
||||
}
|
||||
timeout = timeout + 1
|
||||
|
||||
// Parse TTL.
|
||||
ttl, err := strconv.Atoi(req.FormValue("ttl"))
|
||||
if err != nil {
|
||||
http.Error(w, "invalid ttl: " + req.FormValue("ttl"), http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Acquire", 0)
|
||||
}
|
||||
|
||||
// If node exists then just watch it. Otherwise create the node and watch it.
|
||||
@ -65,12 +64,14 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// Stop all goroutines.
|
||||
close(stopChan)
|
||||
|
||||
// Write response.
|
||||
// Check for an error.
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
w.Write([]byte(strconv.Itoa(index)))
|
||||
return err
|
||||
}
|
||||
|
||||
// Write response.
|
||||
w.Write([]byte(strconv.Itoa(index)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// createNode creates a new lock node and watches it until it is acquired or acquisition fails.
|
||||
@ -83,7 +84,7 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-
|
||||
// Create an incrementing id for the lock.
|
||||
resp, err := h.client.AddChild(keypath, value, uint64(ttl))
|
||||
if err != nil {
|
||||
return 0, errors.New("acquire lock index error: " + err.Error())
|
||||
return 0, err
|
||||
}
|
||||
indexpath := resp.Node.Key
|
||||
index, _ := strconv.Atoi(path.Base(indexpath))
|
||||
@ -98,7 +99,7 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-
|
||||
if err != nil {
|
||||
select {
|
||||
case <-closeChan:
|
||||
err = errors.New("acquire lock error: user interrupted")
|
||||
err = errors.New("user interrupted")
|
||||
default:
|
||||
}
|
||||
}
|
||||
@ -174,7 +175,7 @@ func (h *handler) watch(keypath string, index int, closeChan <- chan bool) error
|
||||
if err == etcd.ErrWatchStoppedByUser {
|
||||
return fmt.Errorf("lock watch closed")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("lock watch error:%s", err.Error())
|
||||
return fmt.Errorf("lock watch error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,11 +5,12 @@ import (
|
||||
"path"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// getIndexHandler retrieves the current lock index.
|
||||
// The "field" parameter specifies to read either the lock "index" or lock "value".
|
||||
func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
h.client.SyncCluster()
|
||||
|
||||
vars := mux.Vars(req)
|
||||
@ -22,8 +23,7 @@ func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// Read all indices.
|
||||
resp, err := h.client.Get(keypath, true, true)
|
||||
if err != nil {
|
||||
http.Error(w, "read lock error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
|
||||
@ -37,7 +37,9 @@ func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
|
||||
w.Write([]byte(node.Value))
|
||||
|
||||
default:
|
||||
http.Error(w, "read lock error: invalid field: " + field, http.StatusInternalServerError)
|
||||
return etcdErr.NewError(etcdErr.EcodeInvalidField, "Get", 0)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
const prefix = "/_etcd/mod/lock"
|
||||
@ -22,9 +23,26 @@ func NewHandler(addr string) (http.Handler) {
|
||||
client: etcd.NewClient([]string{addr}),
|
||||
}
|
||||
h.StrictSlash(false)
|
||||
h.HandleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
|
||||
h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
|
||||
h.HandleFunc("/{key:.*}", h.renewLockHandler).Methods("PUT")
|
||||
h.HandleFunc("/{key:.*}", h.releaseLockHandler).Methods("DELETE")
|
||||
h.handleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
|
||||
h.handleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
|
||||
h.handleFunc("/{key:.*}", h.renewLockHandler).Methods("PUT")
|
||||
h.handleFunc("/{key:.*}", h.releaseLockHandler).Methods("DELETE")
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *handler) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
|
||||
return h.Router.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
|
||||
if err := f(w, req); err != nil {
|
||||
switch err := err.(type) {
|
||||
case *etcdErr.Error:
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
err.Write(w)
|
||||
case etcd.EtcdError:
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
etcdErr.NewError(err.ErrorCode, err.Cause, err.Index).Write(w)
|
||||
default:
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -5,10 +5,11 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// releaseLockHandler deletes the lock.
|
||||
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
h.client.SyncCluster()
|
||||
|
||||
vars := mux.Vars(req)
|
||||
@ -18,34 +19,30 @@ func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
index := req.FormValue("index")
|
||||
value := req.FormValue("value")
|
||||
if len(index) == 0 && len(value) == 0 {
|
||||
http.Error(w, "release lock error: index or value required", http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexOrValueRequired, "Release", 0)
|
||||
} else if len(index) != 0 && len(value) != 0 {
|
||||
http.Error(w, "release lock error: index and value cannot both be specified", http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexValueMutex, "Release", 0)
|
||||
}
|
||||
|
||||
// Look up index by value if index is missing.
|
||||
if len(index) == 0 {
|
||||
resp, err := h.client.Get(keypath, true, true)
|
||||
if err != nil {
|
||||
http.Error(w, "release lock index error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
node, _ := nodes.FindByValue(value)
|
||||
if node == nil {
|
||||
http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeKeyNotFound, "Release", 0)
|
||||
}
|
||||
index = path.Base(node.Key)
|
||||
}
|
||||
|
||||
// Delete the lock.
|
||||
_, err := h.client.Delete(path.Join(keypath, index), false)
|
||||
if err != nil {
|
||||
http.Error(w, "release lock error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
if _, err := h.client.Delete(path.Join(keypath, index), false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -6,11 +6,12 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
)
|
||||
|
||||
// renewLockHandler attempts to update the TTL on an existing lock.
|
||||
// Returns a 200 OK if successful. Returns non-200 on error.
|
||||
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
h.client.SyncCluster()
|
||||
|
||||
// Read the lock path.
|
||||
@ -20,31 +21,26 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// Parse new TTL parameter.
|
||||
ttl, err := strconv.Atoi(req.FormValue("ttl"))
|
||||
if err != nil {
|
||||
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Renew", 0)
|
||||
}
|
||||
|
||||
// Read and set defaults for index and value.
|
||||
index := req.FormValue("index")
|
||||
value := req.FormValue("value")
|
||||
if len(index) == 0 && len(value) == 0 {
|
||||
// The index or value is required.
|
||||
http.Error(w, "renew lock error: index or value required", http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexOrValueRequired, "Renew", 0)
|
||||
}
|
||||
|
||||
if len(index) == 0 {
|
||||
// If index is not specified then look it up by value.
|
||||
resp, err := h.client.Get(keypath, true, true)
|
||||
if err != nil {
|
||||
http.Error(w, "renew lock index error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
node, _ := nodes.FindByValue(value)
|
||||
if node == nil {
|
||||
http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError)
|
||||
return
|
||||
return etcdErr.NewError(etcdErr.EcodeKeyNotFound, "Renew", 0)
|
||||
}
|
||||
index = path.Base(node.Key)
|
||||
|
||||
@ -52,16 +48,15 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// If value is not specified then default it to the previous value.
|
||||
resp, err := h.client.Get(path.Join(keypath, index), true, false)
|
||||
if err != nil {
|
||||
http.Error(w, "renew lock value error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
return err
|
||||
}
|
||||
value = resp.Node.Value
|
||||
}
|
||||
|
||||
// Renew the lock, if it exists.
|
||||
_, err = h.client.Update(path.Join(keypath, index), value, uint64(ttl))
|
||||
if err != nil {
|
||||
http.Error(w, "renew lock error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
if _, err = h.client.Update(path.Join(keypath, index), value, uint64(ttl)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -14,23 +14,27 @@ import (
|
||||
func TestModLockAcquireAndRelease(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Acquire lock.
|
||||
body, err := testAcquireLock(s, "foo", "", 10)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Check that we have the lock.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Release lock.
|
||||
body, err = testReleaseLock(s, "foo", "2", "")
|
||||
body, status, err = testReleaseLock(s, "foo", "2", "")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
|
||||
// Check that we have the lock.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
@ -42,8 +46,9 @@ func TestModLockBlockUntilAcquire(t *testing.T) {
|
||||
|
||||
// Acquire lock #1.
|
||||
go func() {
|
||||
body, err := testAcquireLock(s, "foo", "", 10)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
c <- true
|
||||
}()
|
||||
@ -53,8 +58,9 @@ func TestModLockBlockUntilAcquire(t *testing.T) {
|
||||
waiting := true
|
||||
go func() {
|
||||
c <- true
|
||||
body, err := testAcquireLock(s, "foo", "", 10)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "4")
|
||||
waiting = false
|
||||
}()
|
||||
@ -63,29 +69,34 @@ func TestModLockBlockUntilAcquire(t *testing.T) {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Check that we have the lock #1.
|
||||
body, err := testGetLockIndex(s, "foo")
|
||||
body, status, err := testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Check that we are still waiting for lock #2.
|
||||
assert.Equal(t, waiting, true)
|
||||
|
||||
// Release lock #1.
|
||||
body, err = testReleaseLock(s, "foo", "2", "")
|
||||
_, status, err = testReleaseLock(s, "foo", "2", "")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
|
||||
// Check that we have lock #2.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "4")
|
||||
|
||||
// Release lock #2.
|
||||
body, err = testReleaseLock(s, "foo", "4", "")
|
||||
_, status, err = testReleaseLock(s, "foo", "4", "")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
|
||||
// Check that we have no lock.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
@ -97,8 +108,9 @@ func TestModLockExpireAndRelease(t *testing.T) {
|
||||
|
||||
// Acquire lock #1.
|
||||
go func() {
|
||||
body, err := testAcquireLock(s, "foo", "", 2)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
c <- true
|
||||
}()
|
||||
@ -107,8 +119,9 @@ func TestModLockExpireAndRelease(t *testing.T) {
|
||||
// Acquire lock #2.
|
||||
go func() {
|
||||
c <- true
|
||||
body, err := testAcquireLock(s, "foo", "", 10)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "4")
|
||||
}()
|
||||
<- c
|
||||
@ -116,16 +129,18 @@ func TestModLockExpireAndRelease(t *testing.T) {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Check that we have the lock #1.
|
||||
body, err := testGetLockIndex(s, "foo")
|
||||
body, status, err := testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Wait for lock #1 TTL.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that we have lock #2.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "4")
|
||||
})
|
||||
}
|
||||
@ -134,34 +149,39 @@ func TestModLockExpireAndRelease(t *testing.T) {
|
||||
func TestModLockRenew(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Acquire lock.
|
||||
body, err := testAcquireLock(s, "foo", "", 3)
|
||||
body, status, err := testAcquireLock(s, "foo", "", 3)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that we have the lock.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Renew lock.
|
||||
body, err = testRenewLock(s, "foo", "2", "", 3)
|
||||
body, status, err = testRenewLock(s, "foo", "2", "", 3)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that we still have the lock.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that lock was released.
|
||||
body, err = testGetLockIndex(s, "foo")
|
||||
body, status, err = testGetLockIndex(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
@ -170,55 +190,59 @@ func TestModLockRenew(t *testing.T) {
|
||||
func TestModLockAcquireAndReleaseByValue(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Acquire lock.
|
||||
body, err := testAcquireLock(s, "foo", "XXX", 10)
|
||||
body, status, err := testAcquireLock(s, "foo", "XXX", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Check that we have the lock.
|
||||
body, err = testGetLockValue(s, "foo")
|
||||
body, status, err = testGetLockValue(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "XXX")
|
||||
|
||||
// Release lock.
|
||||
body, err = testReleaseLock(s, "foo", "", "XXX")
|
||||
body, status, err = testReleaseLock(s, "foo", "", "XXX")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
|
||||
// Check that we released the lock.
|
||||
body, err = testGetLockValue(s, "foo")
|
||||
body, status, err = testGetLockValue(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, status, 200)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
func testAcquireLock(s *server.Server, key string, value string, ttl int) (string, error) {
|
||||
func testAcquireLock(s *server.Server, key string, value string, ttl int) (string, int, error) {
|
||||
resp, err := tests.PostForm(fmt.Sprintf("%s/mod/v2/lock/%s?value=%s&ttl=%d", s.URL(), key, value, ttl), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testGetLockIndex(s *server.Server, key string) (string, error) {
|
||||
func testGetLockIndex(s *server.Server, key string) (string, int, error) {
|
||||
resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/lock/%s?field=index", s.URL(), key))
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testGetLockValue(s *server.Server, key string) (string, error) {
|
||||
func testGetLockValue(s *server.Server, key string) (string, int, error) {
|
||||
resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/lock/%s", s.URL(), key))
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testReleaseLock(s *server.Server, key string, index string, value string) (string, error) {
|
||||
func testReleaseLock(s *server.Server, key string, index string, value string) (string, int, error) {
|
||||
resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/lock/%s?index=%s&value=%s", s.URL(), key, index, value), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
||||
func testRenewLock(s *server.Server, key string, index string, value string, ttl int) (string, error) {
|
||||
func testRenewLock(s *server.Server, key string, index string, value string, ttl int) (string, int, error) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/lock/%s?index=%s&value=%s&ttl=%d", s.URL(), key, index, value, ttl), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
return string(ret), resp.StatusCode, err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user