Lock testing.

This commit is contained in:
Ben Johnson 2013-11-29 16:33:49 -07:00
parent 22c2935ddb
commit d8e9838c38
6 changed files with 234 additions and 37 deletions

View File

@ -27,6 +27,7 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
indexpath := resp.Key
// Keep updating TTL to make sure lock request is not expired before acquisition.
stopChan := make(chan bool)
@ -44,7 +45,7 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
break
}
}
}(resp.Key)
}(indexpath)
// Extract the lock index.
index, _ := strconv.Atoi(path.Base(resp.Key))
@ -72,4 +73,7 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
return
}
}
// Write lock index to response body.
w.Write([]byte(strconv.Itoa(index)))
}

View File

@ -0,0 +1,30 @@
package lock
import (
"net/http"
"path"
"strconv"
"github.com/gorilla/mux"
)
// getIndexHandler retrieves the current lock index.
func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])
// Read all indices.
resp, err := h.client.GetAll(keypath, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
return
}
// Write out the index of the last one to the response body.
indices := extractResponseIndices(resp)
if len(indices) > 0 {
w.Write([]byte(strconv.Itoa(indices[0])))
}
}

View File

@ -10,7 +10,7 @@ import (
"github.com/coreos/go-etcd/etcd"
)
const prefix = "/_etcd/locks"
const prefix = "/_etcd/mod/lock"
// handler manages the lock HTTP request.
type handler struct {
@ -20,12 +20,12 @@ type handler struct {
// NewHandler creates an HTTP handler that can be registered on a router.
func NewHandler(addr string) (http.Handler) {
etcd.OpenDebug()
h := &handler{
Router: mux.NewRouter(),
client: etcd.NewClient([]string{addr}),
}
h.StrictSlash(false)
h.HandleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT")
h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE")

View File

@ -1,11 +1,24 @@
package lock
import (
"path"
"net/http"
"github.com/gorilla/mux"
)
// releaseLockHandler deletes the lock.
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
// TODO: h.client.Delete(key_with_index)
h.client.SyncCluster()
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key_with_index"])
// Delete the lock.
_, err := h.client.Delete(keypath)
if err != nil {
http.Error(w, "delete lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}

View File

@ -1,18 +1,30 @@
package lock
import (
"path"
"net/http"
_ "path"
"strconv"
_ "github.com/gorilla/mux"
"github.com/gorilla/mux"
)
// renewLockHandler attempts to update the TTL on an existing lock.
// Returns a 200 OK if successful. Otherwie
// Returns a 200 OK if successful. Returns non-200 on error.
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
/*
h.client.SyncCluster()
vars := mux.Vars(req)
key := path.Join(prefix, vars["key"])
ttl := vars["ttl"]
*/
keypath := path.Join(prefix, vars["key_with_index"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}
// Renew the lock, if it exists.
_, err = h.client.Update(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "renew lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}

View File

@ -2,7 +2,6 @@ package lock
import (
"fmt"
"net/url"
"testing"
"time"
@ -12,39 +11,178 @@ import (
)
// Ensure that a lock can be acquired and released.
func TestModLockAcquire(t *testing.T) {
v := url.Values{}
func TestModLockAcquireAndRelease(t *testing.T) {
tests.RunServer(func(s *server.Server) {
// Acquire lock.
url := fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock/foo?ttl=2")
resp, err := tests.PutForm(url, v)
body, err := testAcquireLock(s, "foo", 10)
assert.NoError(t, err)
ret := tests.ReadBody(resp)
assert.Equal(t, string(ret), "XXX")
assert.Equal(t, body, "2")
time.Sleep(60 * time.Second)
// TODO: Check that it has been acquired.
// TODO: Release lock.
// TODO: Check that it has been released.
// Check that we have the lock.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "2")
// Release lock.
body, err = testReleaseLock(s, "foo", 2)
assert.NoError(t, err)
assert.Equal(t, body, "")
// Check that we have the lock.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "")
})
}
// Ensure that a lock can be acquired and another process is blocked until released.
func TestModLockAcquireBlocked(t *testing.T) {
// TODO: Acquire lock with process #1.
// TODO: Acquire lock with process #2.
// TODO: Check that process #2 has not obtained lock.
// TODO: Release lock from process #1.
// TODO: Check that process #2 obtains the lock.
// TODO: Release lock from process #2.
// TODO: Check that no lock exists.
func TestModLockBlockUntilAcquire(t *testing.T) {
tests.RunServer(func(s *server.Server) {
c := make(chan bool)
// Acquire lock #1.
go func() {
body, err := testAcquireLock(s, "foo", 10)
assert.NoError(t, err)
assert.Equal(t, body, "2")
c <- true
}()
<- c
// Acquire lock #2.
go func() {
c <- true
body, err := testAcquireLock(s, "foo", 10)
assert.NoError(t, err)
assert.Equal(t, body, "4")
}()
<- c
time.Sleep(1 * time.Second)
// Check that we have the lock #1.
body, err := testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "2")
// Release lock #1.
body, err = testReleaseLock(s, "foo", 2)
assert.NoError(t, err)
// Check that we have lock #2.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "4")
// Release lock #2.
body, err = testReleaseLock(s, "foo", 4)
assert.NoError(t, err)
// Check that we have no lock.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "")
})
}
// Ensure that an unowned lock can be released by force.
func TestModLockForceRelease(t *testing.T) {
// TODO: Acquire lock.
// TODO: Check that it has been acquired.
// TODO: Force release lock.
// TODO: Check that it has been released.
// TODO: Check that acquiring goroutine is notified that their lock has been released.
// Ensure that a lock will be released after the TTL.
func TestModLockExpireAndRelease(t *testing.T) {
tests.RunServer(func(s *server.Server) {
c := make(chan bool)
// Acquire lock #1.
go func() {
body, err := testAcquireLock(s, "foo", 2)
assert.NoError(t, err)
assert.Equal(t, body, "2")
c <- true
}()
<- c
// Acquire lock #2.
go func() {
c <- true
body, err := testAcquireLock(s, "foo", 10)
assert.NoError(t, err)
assert.Equal(t, body, "4")
}()
<- c
time.Sleep(1 * time.Second)
// Check that we have the lock #1.
body, err := testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "2")
// Wait for lock #1 TTL.
time.Sleep(2 * time.Second)
// Check that we have lock #2.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "4")
})
}
// Ensure that a lock can be renewed.
func TestModLockRenew(t *testing.T) {
tests.RunServer(func(s *server.Server) {
// Acquire lock.
body, err := testAcquireLock(s, "foo", 3)
assert.NoError(t, err)
assert.Equal(t, body, "2")
time.Sleep(2 * time.Second)
// Check that we have the lock.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "2")
// Renew lock.
body, err = testRenewLock(s, "foo", 2, 3)
assert.NoError(t, err)
assert.Equal(t, body, "")
time.Sleep(2 * time.Second)
// Check that we still have the lock.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "2")
time.Sleep(2 * time.Second)
// Check that lock was released.
body, err = testGetLockIndex(s, "foo")
assert.NoError(t, err)
assert.Equal(t, body, "")
})
}
func testAcquireLock(s *server.Server, key string, ttl int) (string, error) {
resp, err := tests.PostForm(fmt.Sprintf("%s/mod/lock/%s?ttl=%d", s.URL(), key, ttl), nil)
ret := tests.ReadBody(resp)
return string(ret), err
}
func testGetLockIndex(s *server.Server, key string) (string, error) {
resp, err := tests.Get(fmt.Sprintf("%s/mod/lock/%s", s.URL(), key))
ret := tests.ReadBody(resp)
return string(ret), err
}
func testReleaseLock(s *server.Server, key string, index int) (string, error) {
resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/lock/%s/%d", s.URL(), key, index), nil)
ret := tests.ReadBody(resp)
return string(ret), err
}
func testRenewLock(s *server.Server, key string, index int, ttl int) (string, error) {
resp, err := tests.PutForm(fmt.Sprintf("%s/mod/lock/%s/%d?ttl=%d", s.URL(), key, index, ttl), nil)
ret := tests.ReadBody(resp)
return string(ret), err
}