mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
fd8ce5d11a
35
README.md
35
README.md
@ -1025,6 +1025,41 @@ curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=customer1
|
||||
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?name=bar
|
||||
```
|
||||
|
||||
|
||||
### Leader Election
|
||||
|
||||
The Leader Election module wraps the Lock module to allow clients to come to consensus on a single value.
|
||||
This is useful when you want one server to process at a time but allow other servers to fail over.
|
||||
The API is similar to the Lock module but is limited to simple strings values.
|
||||
|
||||
Here's the API:
|
||||
|
||||
**Attempt to set a value for the "order_processing" leader key:**
|
||||
|
||||
```sh
|
||||
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com
|
||||
```
|
||||
|
||||
**Retrieve the current value for the "order_processing" leader key:**
|
||||
|
||||
```sh
|
||||
curl http://127.0.0.1:4001/mod/v2/leader/order_processing
|
||||
myserver1.foo.com
|
||||
```
|
||||
|
||||
**Remove a value from the "order_processing" leader key:**
|
||||
|
||||
```sh
|
||||
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com
|
||||
```
|
||||
|
||||
If multiple clients attempt to set the value for a key then only one will succeed.
|
||||
The other clients will hang until the current value is removed because of TTL or because of a `DELETE` operation.
|
||||
Multiple clients can submit the same value and will all be notified when that value succeeds.
|
||||
|
||||
To update the TTL of a value simply reissue the same `POST` command that you used to set the value.
|
||||
|
||||
|
||||
## Contributing
|
||||
|
||||
See [CONTRIBUTING](https://github.com/coreos/etcd/blob/master/CONTRIBUTING.md) for details on submitting patches and contacting developers via IRC and mailing lists.
|
||||
|
49
mod/leader/v2/delete_handler.go
Normal file
49
mod/leader/v2/delete_handler.go
Normal file
@ -0,0 +1,49 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// deleteHandler remove a given leader leader.
|
||||
func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
name := req.FormValue("name")
|
||||
if name == "" {
|
||||
http.Error(w, "leader name required", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Proxy the request to the the lock service.
|
||||
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("value", name)
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
r, err := http.NewRequest("DELETE", u.String(), nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Read from the leader lock.
|
||||
resp, err := h.client.Do(r)
|
||||
if err != nil {
|
||||
http.Error(w, "delete leader http error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("delete leader error: "))
|
||||
}
|
||||
io.Copy(w, resp.Body)
|
||||
}
|
29
mod/leader/v2/get_handler.go
Normal file
29
mod/leader/v2/get_handler.go
Normal file
@ -0,0 +1,29 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// getHandler retrieves the current leader.
|
||||
func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
|
||||
// Proxy the request to the lock service.
|
||||
url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
|
||||
resp, err := h.client.Get(url)
|
||||
if err != nil {
|
||||
http.Error(w, "read leader error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("get leader error: "))
|
||||
}
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
io.Copy(w, resp.Body)
|
||||
}
|
34
mod/leader/v2/handler.go
Normal file
34
mod/leader/v2/handler.go
Normal file
@ -0,0 +1,34 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// prefix is appended to the lock's prefix since the leader mod uses the lock mod.
|
||||
const prefix = "/_mod/leader"
|
||||
|
||||
// handler manages the leader HTTP request.
|
||||
type handler struct {
|
||||
*mux.Router
|
||||
client *http.Client
|
||||
transport *http.Transport
|
||||
addr string
|
||||
}
|
||||
|
||||
// NewHandler creates an HTTP handler that can be registered on a router.
|
||||
func NewHandler(addr string) (http.Handler) {
|
||||
transport := &http.Transport{DisableKeepAlives: false}
|
||||
h := &handler{
|
||||
Router: mux.NewRouter(),
|
||||
client: &http.Client{Transport: transport},
|
||||
transport: transport,
|
||||
addr: addr,
|
||||
}
|
||||
h.StrictSlash(false)
|
||||
h.HandleFunc("/{key:.*}", h.getHandler).Methods("GET")
|
||||
h.HandleFunc("/{key:.*}", h.setHandler).Methods("PUT")
|
||||
h.HandleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE")
|
||||
return h
|
||||
}
|
63
mod/leader/v2/set_handler.go
Normal file
63
mod/leader/v2/set_handler.go
Normal file
@ -0,0 +1,63 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// setHandler attempts to set the current leader.
|
||||
func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
name := req.FormValue("name")
|
||||
if name == "" {
|
||||
http.Error(w, "leader name required", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Proxy the request to the the lock service.
|
||||
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("value", name)
|
||||
q.Set("ttl", req.FormValue("ttl"))
|
||||
q.Set("timeout", req.FormValue("timeout"))
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
r, err := http.NewRequest("POST", u.String(), nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Close request if this connection disconnects.
|
||||
closeNotifier, _ := w.(http.CloseNotifier)
|
||||
stopChan := make(chan bool)
|
||||
defer close(stopChan)
|
||||
go func() {
|
||||
select {
|
||||
case <-closeNotifier.CloseNotify():
|
||||
h.transport.CancelRequest(r)
|
||||
case <-stopChan:
|
||||
}
|
||||
}()
|
||||
|
||||
// Read from the leader lock.
|
||||
resp, err := h.client.Do(r)
|
||||
if err != nil {
|
||||
http.Error(w, "set leader http error: " + err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Write([]byte("set leader error: "))
|
||||
}
|
||||
io.Copy(w, resp.Body)
|
||||
}
|
80
mod/leader/v2/tests/mod_leader_test.go
Normal file
80
mod/leader/v2/tests/mod_leader_test.go
Normal file
@ -0,0 +1,80 @@
|
||||
package leader
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/tests"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Ensure that a leader can be set and read.
|
||||
func TestModLeaderSet(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Set leader.
|
||||
body, err := testSetLeader(s, "foo", "xxx", 10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
// Check that the leader is set.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "xxx")
|
||||
|
||||
// Delete leader.
|
||||
body, err = testDeleteLeader(s, "foo", "xxx")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "")
|
||||
|
||||
// Check that the leader is removed.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "")
|
||||
})
|
||||
}
|
||||
|
||||
// Ensure that a leader can be renewed.
|
||||
func TestModLeaderRenew(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// Set leader.
|
||||
body, err := testSetLeader(s, "foo", "xxx", 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Renew leader.
|
||||
body, err = testSetLeader(s, "foo", "xxx", 3)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "2")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Check that the leader is set.
|
||||
body, err = testGetLeader(s, "foo")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, body, "xxx")
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
func testSetLeader(s *server.Server, key string, name string, ttl int) (string, error) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s&ttl=%d", s.URL(), key, name, ttl), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
}
|
||||
|
||||
func testGetLeader(s *server.Server, key string) (string, error) {
|
||||
resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/leader/%s", s.URL(), key))
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
}
|
||||
|
||||
func testDeleteLeader(s *server.Server, key string, name string) (string, error) {
|
||||
resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s", s.URL(), key, name), nil)
|
||||
ret := tests.ReadBody(resp)
|
||||
return string(ret), err
|
||||
}
|
@ -49,9 +49,15 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
// If node exists then just watch it. Otherwise create the node and watch it.
|
||||
index := h.findExistingNode(keypath, value)
|
||||
node, index, pos := h.findExistingNode(keypath, value)
|
||||
if index > 0 {
|
||||
err = h.watch(keypath, index, nil)
|
||||
if pos == 0 {
|
||||
// If lock is already acquired then update the TTL.
|
||||
h.client.Update(node.Key, node.Value, uint64(ttl))
|
||||
} else {
|
||||
// Otherwise watch until it becomes acquired (or errors).
|
||||
err = h.watch(keypath, index, nil)
|
||||
}
|
||||
} else {
|
||||
index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)
|
||||
}
|
||||
@ -108,18 +114,18 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-
|
||||
}
|
||||
|
||||
// findExistingNode search for a node on the lock with the given value.
|
||||
func (h *handler) findExistingNode(keypath string, value string) int {
|
||||
func (h *handler) findExistingNode(keypath string, value string) (*etcd.Node, int, int) {
|
||||
if len(value) > 0 {
|
||||
resp, err := h.client.Get(keypath, true, true)
|
||||
if err == nil {
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
if node := nodes.FindByValue(value); node != nil {
|
||||
if node, pos := nodes.FindByValue(value); node != nil {
|
||||
index, _ := strconv.Atoi(path.Base(node.Key))
|
||||
return index
|
||||
return node, index, pos
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
return nil, 0, 0
|
||||
}
|
||||
|
||||
// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
|
||||
|
@ -30,15 +30,15 @@ func (s lockNodes) First() *etcd.Node {
|
||||
}
|
||||
|
||||
// Retrieves the first node with a given value.
|
||||
func (s lockNodes) FindByValue(value string) *etcd.Node {
|
||||
func (s lockNodes) FindByValue(value string) (*etcd.Node, int) {
|
||||
sort.Sort(s)
|
||||
|
||||
for _, node := range s.Nodes {
|
||||
for i, node := range s.Nodes {
|
||||
if node.Value == value {
|
||||
return &node
|
||||
return &node, i
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// Retrieves the index that occurs before a given index.
|
||||
|
@ -33,7 +33,7 @@ func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
node := nodes.FindByValue(value)
|
||||
node, _ := nodes.FindByValue(value)
|
||||
if node == nil {
|
||||
http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -41,7 +41,7 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
nodes := lockNodes{resp.Node.Nodes}
|
||||
node := nodes.FindByValue(value)
|
||||
node, _ := nodes.FindByValue(value)
|
||||
if node == nil {
|
||||
http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/mod/dashboard"
|
||||
lock2 "github.com/coreos/etcd/mod/lock/v2"
|
||||
leader2 "github.com/coreos/etcd/mod/leader/v2"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
@ -22,7 +23,7 @@ func HttpHandler(addr string) http.Handler {
|
||||
r.HandleFunc("/dashboard", addSlash)
|
||||
r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
|
||||
|
||||
// TODO: Use correct addr.
|
||||
r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr)))
|
||||
r.PathPrefix("/v2/leader").Handler(http.StripPrefix("/v2/leader", leader2.NewHandler(addr)))
|
||||
return r
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user