This commit is contained in:
Ben Johnson 2013-11-27 14:36:14 -07:00
parent 228754a99c
commit 32861246b9
6 changed files with 129 additions and 27 deletions

View File

@ -0,0 +1,49 @@
package lock
import (
"net/http"
"path"
"strconv"
"github.com/gorilla/mux"
)
// acquireHandler attempts to acquire a lock on the given key.
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])
ttl, err := strconv.Atoi(vars["ttl"])
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}
// Create an incrementing id for the lock.
resp, err := h.client.AddChild(keypath, "X", ttl)
if err != nil {
http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
// Extract the lock index.
index, _ := strconv.Atoi(path.Base(resp.Key))
// Read all indices.
resp, err = h.client.GetAll(key)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
return
}
indices := extractResponseIndices(resp)
// TODO: child_keys := parse_and_sort_child_keys
// TODO: if index == min(child_keys) then return 200
// TODO: else:
// TODO: h.client.WatchAll(key)
// TODO: if next_lowest_key is deleted
// TODO: get_all_keys
// TODO: if index == min(child_keys) then return 200
// TODO: rinse_and_repeat until we're the lowest.
// TODO:
}

View File

@ -1,19 +1,20 @@
package lock
import (
"bytes"
"fmt"
"net/http"
"os"
"path"
"time"
"github.com/gorilla/mux"
"github.com/coreos/go-etcd/etcd"
)
const prefix = "/_etcd/locks"
// handler manages the lock HTTP request.
type handler struct {
*mux.Router
client string
client *etcd.Client
}
// NewHandler creates an HTTP handler that can be registered on a router.
@ -22,23 +23,33 @@ func NewHandler(addr string) (http.Handler) {
Router: mux.NewRouter(),
client: etcd.NewClient([]string{addr}),
}
h.HandleFunc("/{key:.+}", h.getLockHandler).Methods("GET")
h.HandleFunc("/{key:.+}", h.acquireLockHandler).Methods("PUT")
h.HandleFunc("/{key:.+}", h.releaseLockHandler).Methods("DELETE")
h.StrictSlash(false)
h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT")
h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE")
return h
}
// getLockHandler retrieves whether a lock has been obtained for a given key.
func (h *handler) getLockHandler(w http.ResponseWriter, req *http.Request) {
// TODO
// extractResponseIndices extracts a sorted list of indicies from a response.
func extractResponseIndices(resp *etcd.Response) []int {
var indices []int
for _, kv := range resp.Kvs {
if index, _ := strconv.Atoi(path.Base(kv.Key)); index > 0 {
indicies = append(indices, index)
}
}
return indices
}
// acquireLockHandler attempts to acquire a lock on the given key.
// The lock is released when the connection is disconnected.
func (h *handler) acquireLockHandler(w http.ResponseWriter, req *http.Request) {
// TODO
// findPrevIndex retrieves the previous index before the given index.
func findPrevIndex(indices []int, idx int) int {
var prevIndex int
for _, index := range indices {
if index == idx {
break
}
// releaseLockHandler forces the release of a lock on the given key.
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
// TODO
prevIndex = index
}
return prevIndex
}

View File

@ -0,0 +1,11 @@
package lock
import (
"net/http"
)
// releaseLockHandler deletes the lock.
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
// TODO: h.client.Delete(key_with_index)
}

16
mod/lock/renew_handler.go Normal file
View File

@ -0,0 +1,16 @@
package lock
import (
"net/http"
)
// renewLockHandler attempts to update the TTL on an existing lock.
// Returns a 200 OK if successful. Otherwie
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
key := path.Join(prefix, vars["key"])
ttl := vars["ttl"]
w.Write([]byte(fmt.Sprintf("%s-%s", key, ttl)))
// TODO:
}

View File

@ -1,17 +1,32 @@
package lock
import (
"fmt"
"net/url"
"testing"
"time"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
"github.com/stretchr/testify/assert"
)
// Ensure that a lock can be acquired and released.
func TestModLockAcquire(t *testing.T) {
// TODO: Acquire lock.
v := url.Values{}
tests.RunServer(func(s *server.Server) {
// Acquire lock.
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"), v)
assert.NoError(t, err)
ret := tests.ReadBody(resp)
assert.Equal(t, string(ret), "XXX")
fmt.Println("URL:", fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"))
time.Sleep(60 * time.Second)
// TODO: Check that it has been acquired.
// TODO: Release lock.
// TODO: Check that it has been released.
})
}
// Ensure that a lock can be acquired and another process is blocked until released.

View File

@ -13,9 +13,9 @@ var ServeMux *http.Handler
func HttpHandler() (handler http.Handler) {
r := mux.NewRouter()
r.PathPrefix("/dashboard/").
Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
r.PathPrefix("/lock/").
Handler(http.StripPrefix("/lock", lock.NewHandler()))
r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
// TODO: Use correct addr.
r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler("127.0.0.1:4001")))
return r
}