From 32861246b974f2c1678c1f10b2b10f7f580a5df7 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 27 Nov 2013 14:36:14 -0700 Subject: [PATCH] mod/lock --- mod/lock/acquire_handler.go | 49 ++++++++++++++++++++++++++++ mod/lock/handler.go | 49 +++++++++++++++++----------- mod/lock/release_handler.go | 11 +++++++ mod/lock/renew_handler.go | 16 +++++++++ mod/lock/{ => tests}/handler_test.go | 23 ++++++++++--- mod/mod.go | 8 ++--- 6 files changed, 129 insertions(+), 27 deletions(-) create mode 100644 mod/lock/acquire_handler.go create mode 100644 mod/lock/release_handler.go create mode 100644 mod/lock/renew_handler.go rename mod/lock/{ => tests}/handler_test.go (62%) diff --git a/mod/lock/acquire_handler.go b/mod/lock/acquire_handler.go new file mode 100644 index 000000000..d142a3f2e --- /dev/null +++ b/mod/lock/acquire_handler.go @@ -0,0 +1,49 @@ +package lock + +import ( + "net/http" + "path" + "strconv" + + "github.com/gorilla/mux" +) + +// acquireHandler attempts to acquire a lock on the given key. +func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + keypath := path.Join(prefix, vars["key"]) + ttl, err := strconv.Atoi(vars["ttl"]) + if err != nil { + http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError) + return + } + + // Create an incrementing id for the lock. + resp, err := h.client.AddChild(keypath, "X", ttl) + if err != nil { + http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError) + return + } + + // Extract the lock index. + index, _ := strconv.Atoi(path.Base(resp.Key)) + + // Read all indices. + resp, err = h.client.GetAll(key) + if err != nil { + http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError) + return + } + indices := extractResponseIndices(resp) + + // TODO: child_keys := parse_and_sort_child_keys + // TODO: if index == min(child_keys) then return 200 + // TODO: else: + // TODO: h.client.WatchAll(key) + // TODO: if next_lowest_key is deleted + // TODO: get_all_keys + // TODO: if index == min(child_keys) then return 200 + // TODO: rinse_and_repeat until we're the lowest. + + // TODO: +} diff --git a/mod/lock/handler.go b/mod/lock/handler.go index ccb2a3c5d..66a62be4f 100644 --- a/mod/lock/handler.go +++ b/mod/lock/handler.go @@ -1,19 +1,20 @@ package lock import ( - "bytes" + "fmt" "net/http" - "os" "path" - "time" + "github.com/gorilla/mux" "github.com/coreos/go-etcd/etcd" ) +const prefix = "/_etcd/locks" + // handler manages the lock HTTP request. type handler struct { *mux.Router - client string + client *etcd.Client } // NewHandler creates an HTTP handler that can be registered on a router. @@ -22,23 +23,33 @@ func NewHandler(addr string) (http.Handler) { Router: mux.NewRouter(), client: etcd.NewClient([]string{addr}), } - h.HandleFunc("/{key:.+}", h.getLockHandler).Methods("GET") - h.HandleFunc("/{key:.+}", h.acquireLockHandler).Methods("PUT") - h.HandleFunc("/{key:.+}", h.releaseLockHandler).Methods("DELETE") + h.StrictSlash(false) + h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST") + h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT") + h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE") + return h } -// getLockHandler retrieves whether a lock has been obtained for a given key. -func (h *handler) getLockHandler(w http.ResponseWriter, req *http.Request) { - // TODO + +// extractResponseIndices extracts a sorted list of indicies from a response. +func extractResponseIndices(resp *etcd.Response) []int { + var indices []int + for _, kv := range resp.Kvs { + if index, _ := strconv.Atoi(path.Base(kv.Key)); index > 0 { + indicies = append(indices, index) + } + } + return indices } -// acquireLockHandler attempts to acquire a lock on the given key. -// The lock is released when the connection is disconnected. -func (h *handler) acquireLockHandler(w http.ResponseWriter, req *http.Request) { - // TODO -} - -// releaseLockHandler forces the release of a lock on the given key. -func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) { - // TODO +// findPrevIndex retrieves the previous index before the given index. +func findPrevIndex(indices []int, idx int) int { + var prevIndex int + for _, index := range indices { + if index == idx { + break + } + prevIndex = index + } + return prevIndex } diff --git a/mod/lock/release_handler.go b/mod/lock/release_handler.go new file mode 100644 index 000000000..09b875183 --- /dev/null +++ b/mod/lock/release_handler.go @@ -0,0 +1,11 @@ +package lock + +import ( + "net/http" +) + +// releaseLockHandler deletes the lock. +func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) { + // TODO: h.client.Delete(key_with_index) +} + diff --git a/mod/lock/renew_handler.go b/mod/lock/renew_handler.go new file mode 100644 index 000000000..da9c0b8c2 --- /dev/null +++ b/mod/lock/renew_handler.go @@ -0,0 +1,16 @@ +package lock + +import ( + "net/http" +) + +// renewLockHandler attempts to update the TTL on an existing lock. +// Returns a 200 OK if successful. Otherwie +func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + key := path.Join(prefix, vars["key"]) + ttl := vars["ttl"] + w.Write([]byte(fmt.Sprintf("%s-%s", key, ttl))) + + // TODO: +} diff --git a/mod/lock/handler_test.go b/mod/lock/tests/handler_test.go similarity index 62% rename from mod/lock/handler_test.go rename to mod/lock/tests/handler_test.go index 91ecc310d..fbc36ea00 100644 --- a/mod/lock/handler_test.go +++ b/mod/lock/tests/handler_test.go @@ -1,17 +1,32 @@ package lock import ( + "fmt" + "net/url" "testing" + "time" + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" "github.com/stretchr/testify/assert" ) // Ensure that a lock can be acquired and released. func TestModLockAcquire(t *testing.T) { - // TODO: Acquire lock. - // TODO: Check that it has been acquired. - // TODO: Release lock. - // TODO: Check that it has been released. + v := url.Values{} + tests.RunServer(func(s *server.Server) { + // Acquire lock. + resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"), v) + assert.NoError(t, err) + ret := tests.ReadBody(resp) + assert.Equal(t, string(ret), "XXX") + + fmt.Println("URL:", fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock")) + time.Sleep(60 * time.Second) + // TODO: Check that it has been acquired. + // TODO: Release lock. + // TODO: Check that it has been released. + }) } // Ensure that a lock can be acquired and another process is blocked until released. diff --git a/mod/mod.go b/mod/mod.go index 7964c683f..9d3b54d69 100644 --- a/mod/mod.go +++ b/mod/mod.go @@ -13,9 +13,9 @@ var ServeMux *http.Handler func HttpHandler() (handler http.Handler) { r := mux.NewRouter() - r.PathPrefix("/dashboard/"). - Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler())) - r.PathPrefix("/lock/"). - Handler(http.StripPrefix("/lock", lock.NewHandler())) + r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler())) + + // TODO: Use correct addr. + r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler("127.0.0.1:4001"))) return r }