From 22c2935ddb3ab7ce5e642a43abb114f3778cc866 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 27 Nov 2013 16:59:05 -0700 Subject: [PATCH] Initial mod_lock acquire. --- mod/lock/acquire_handler.go | 64 +++++++++++++------ mod/lock/handler.go | 7 +- mod/lock/renew_handler.go | 8 ++- mod/lock/tests/handler_test.go | 4 +- mod/mod.go | 5 +- server/registry.go | 1 + server/server.go | 4 +- test.sh | 16 ++--- tests/server_utils.go | 5 +- .../coreos/go-etcd/etcd/requests.go | 2 +- 10 files changed, 75 insertions(+), 41 deletions(-) diff --git a/mod/lock/acquire_handler.go b/mod/lock/acquire_handler.go index d142a3f2e..3e7f2e973 100644 --- a/mod/lock/acquire_handler.go +++ b/mod/lock/acquire_handler.go @@ -4,46 +4,72 @@ import ( "net/http" "path" "strconv" + "time" "github.com/gorilla/mux" ) // acquireHandler attempts to acquire a lock on the given key. func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) { + h.client.SyncCluster() + vars := mux.Vars(req) keypath := path.Join(prefix, vars["key"]) - ttl, err := strconv.Atoi(vars["ttl"]) + ttl, err := strconv.Atoi(req.FormValue("ttl")) if err != nil { http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError) return } // Create an incrementing id for the lock. - resp, err := h.client.AddChild(keypath, "X", ttl) + resp, err := h.client.AddChild(keypath, "-", uint64(ttl)) if err != nil { http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError) return } + // Keep updating TTL to make sure lock request is not expired before acquisition. + stopChan := make(chan bool) + defer close(stopChan) + go func(k string) { + stopped := false + for { + select { + case <-time.After(time.Duration(ttl / 2) * time.Second): + case <-stopChan: + stopped = true + } + h.client.Update(k, "-", uint64(ttl)) + if stopped { + break + } + } + }(resp.Key) + // Extract the lock index. index, _ := strconv.Atoi(path.Base(resp.Key)) - // Read all indices. - resp, err = h.client.GetAll(key) - if err != nil { - http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError) - return + for { + // Read all indices. + resp, err = h.client.GetAll(keypath, true) + if err != nil { + http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError) + return + } + indices := extractResponseIndices(resp) + waitIndex := resp.ModifiedIndex + prevIndex := findPrevIndex(indices, index) + + // If there is no previous index then we have the lock. + if prevIndex == 0 { + break + } + + // Otherwise watch previous index until it's gone. + _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, nil, nil) + if err != nil { + http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError) + return + } } - indices := extractResponseIndices(resp) - - // TODO: child_keys := parse_and_sort_child_keys - // TODO: if index == min(child_keys) then return 200 - // TODO: else: - // TODO: h.client.WatchAll(key) - // TODO: if next_lowest_key is deleted - // TODO: get_all_keys - // TODO: if index == min(child_keys) then return 200 - // TODO: rinse_and_repeat until we're the lowest. - - // TODO: } diff --git a/mod/lock/handler.go b/mod/lock/handler.go index 66a62be4f..355a6339f 100644 --- a/mod/lock/handler.go +++ b/mod/lock/handler.go @@ -1,9 +1,10 @@ package lock import ( - "fmt" "net/http" "path" + "strconv" + "sort" "github.com/gorilla/mux" "github.com/coreos/go-etcd/etcd" @@ -19,6 +20,7 @@ type handler struct { // NewHandler creates an HTTP handler that can be registered on a router. func NewHandler(addr string) (http.Handler) { + etcd.OpenDebug() h := &handler{ Router: mux.NewRouter(), client: etcd.NewClient([]string{addr}), @@ -36,9 +38,10 @@ func extractResponseIndices(resp *etcd.Response) []int { var indices []int for _, kv := range resp.Kvs { if index, _ := strconv.Atoi(path.Base(kv.Key)); index > 0 { - indicies = append(indices, index) + indices = append(indices, index) } } + sort.Ints(indices) return indices } diff --git a/mod/lock/renew_handler.go b/mod/lock/renew_handler.go index da9c0b8c2..ba9fe31d2 100644 --- a/mod/lock/renew_handler.go +++ b/mod/lock/renew_handler.go @@ -2,15 +2,17 @@ package lock import ( "net/http" + _ "path" + + _ "github.com/gorilla/mux" ) // renewLockHandler attempts to update the TTL on an existing lock. // Returns a 200 OK if successful. Otherwie func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) { + /* vars := mux.Vars(req) key := path.Join(prefix, vars["key"]) ttl := vars["ttl"] - w.Write([]byte(fmt.Sprintf("%s-%s", key, ttl))) - - // TODO: + */ } diff --git a/mod/lock/tests/handler_test.go b/mod/lock/tests/handler_test.go index fbc36ea00..e3caafe25 100644 --- a/mod/lock/tests/handler_test.go +++ b/mod/lock/tests/handler_test.go @@ -16,12 +16,12 @@ func TestModLockAcquire(t *testing.T) { v := url.Values{} tests.RunServer(func(s *server.Server) { // Acquire lock. - resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"), v) + url := fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock/foo?ttl=2") + resp, err := tests.PutForm(url, v) assert.NoError(t, err) ret := tests.ReadBody(resp) assert.Equal(t, string(ret), "XXX") - fmt.Println("URL:", fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock")) time.Sleep(60 * time.Second) // TODO: Check that it has been acquired. // TODO: Release lock. diff --git a/mod/mod.go b/mod/mod.go index d9b0ee01b..7c0194f56 100644 --- a/mod/mod.go +++ b/mod/mod.go @@ -17,13 +17,12 @@ func addSlash(w http.ResponseWriter, req *http.Request) { return } -func HttpHandler() (handler http.Handler) { +func HttpHandler(addr string) http.Handler { r := mux.NewRouter() r.HandleFunc("/dashboard", addSlash) r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler())) // TODO: Use correct addr. - r.HandleFunc("/lock", addSlash) - r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler("127.0.0.1:4001"))) + r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler(addr))) return r } diff --git a/server/registry.go b/server/registry.go index d1d98d9ed..27b0ce414 100644 --- a/server/registry.go +++ b/server/registry.go @@ -46,6 +46,7 @@ func (r *Registry) Register(name string, peerURL string, url string) error { key := path.Join(RegistryKey, name) value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url) _, err := r.store.Create(key, value, false, store.Permanent) + fmt.Println("register.1:", key, value, err) log.Debugf("Register: %s", name) return err } diff --git a/server/server.go b/server/server.go index 4f75df2e0..f0de64a67 100644 --- a/server/server.go +++ b/server/server.go @@ -130,7 +130,7 @@ func (s *Server) installV2() { func (s *Server) installMod() { r := s.router - r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler())) + r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.url))) } // Adds a v1 server handler to the router. @@ -320,12 +320,14 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err // Handler to return the current leader's raft address func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { leader := s.peerServer.RaftServer().Leader() + fmt.Println("/leader.1?", leader) if leader == "" { return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index()) } w.WriteHeader(http.StatusOK) url, _ := s.registry.PeerURL(leader) w.Write([]byte(url)) + fmt.Println("/leader.2?", leader, url) return nil } diff --git a/test.sh b/test.sh index 5cc633975..690f3a932 100755 --- a/test.sh +++ b/test.sh @@ -1,6 +1,9 @@ #!/bin/sh set -e +PKGS="./mod/lock/tests" +# PKGS="./store ./server ./server/v2/tests" + # Get GOPATH, etc from build . ./build @@ -8,14 +11,11 @@ set -e export GOPATH="${PWD}" # Unit tests -go test -i ./server -go test -v ./server - -go test -i ./server/v2/tests -go test -v ./server/v2/tests - -go test -i ./store -go test -v ./store +for PKG in $PKGS +do + go test -i $PKG + go test -v $PKG +done # Functional tests go test -i ./tests/functional diff --git a/tests/server_utils.go b/tests/server_utils.go index e3e7d5323..b02eb6371 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -23,8 +23,9 @@ func RunServer(f func(*server.Server)) { store := store.New() registry := server.NewRegistry(store) - ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount) - s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store) + ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount) + ps.MaxClusterSize = 9 + s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store) ps.SetServer(s) // Start up peer server. diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go index 83e3b519e..4db818f97 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/requests.go +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -207,7 +207,7 @@ func (c *Client) sendRequest(method string, _path string, values url.Values) (*R if err != nil { retry++ if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") + return nil, errors.New("Cannot reach servers" + err.Error()) } num := retry % len(c.cluster.Machines) logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")