From 013d07bc2a8b662bd48925f3a14f549956c7b015 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 13 Oct 2013 16:58:36 -0600 Subject: [PATCH] Fix server dispatch redirection. --- etcd_test.go | 9 +++------ server/peer_server.go | 6 ++---- server/registry.go | 10 ++++++++-- server/server.go | 8 +++++++- test/test.go | 4 ++-- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/etcd_test.go b/etcd_test.go index 95e4c3c52..4ba0f5fa9 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -21,7 +21,7 @@ import ( func TestSingleNode(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - args := []string{"etcd", "-vv", "-n=node1", "-f", "-d=/tmp/node1"} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"} process, err := os.StartProcess("etcd", args, procAttr) if err != nil { @@ -249,6 +249,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) { clusterSize := 5 argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false) + defer test.DestroyCluster(etcds) if err != nil { t.Fatal("cannot create cluster") @@ -300,9 +301,6 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) { if result.Index != 18 { t.Fatalf("recovery failed! [%d/18]", result.Index) } - - // kill all - test.DestroyCluster(etcds) } // Create a five nodes @@ -479,6 +477,7 @@ func TestRemoveNode(t *testing.T) { clusterSize := 3 argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false) + defer test.DestroyCluster(etcds) time.Sleep(time.Second) @@ -572,8 +571,6 @@ func TestRemoveNode(t *testing.T) { } } } - test.DestroyCluster(etcds) - } func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) { diff --git a/server/peer_server.go b/server/peer_server.go index 0b16f98e8..62ec06c41 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -410,7 +410,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} version, err := getVersion(t, versionURL) if err != nil { - return fmt.Errorf("Unable to join: %v", err) + return fmt.Errorf("Error during join version check: %v", err) } // TODO: versioning of the internal protocol. See: @@ -442,12 +442,9 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s return nil } if resp.StatusCode == http.StatusTemporaryRedirect { - address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) - resp, req, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { @@ -538,6 +535,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R } url, _ := s.registry.PeerURL(leader) + log.Debugf("Not leader; Current leader: %s; redirect: %s", leader, url) redirect(url, w, req) return nil diff --git a/server/registry.go b/server/registry.go index 9f30d7854..d2fd67ec4 100644 --- a/server/registry.go +++ b/server/registry.go @@ -38,14 +38,16 @@ func NewRegistry(s *store.Store) *Registry { } // Adds a node to the registry. -func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) { +func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) error { r.Lock() defer r.Unlock() // Write data to store. key := path.Join(RegistryKey, name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) - r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) + _, err := r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) + log.Debugf("Register: %s (%v)", name, err) + return err } // Removes a node from the registry. @@ -53,8 +55,12 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro r.Lock() defer r.Unlock() + // Remove from cache. + delete(r.nodes, name) + // Remove the key from the store. _, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) + log.Debugf("Unregister: %s (%v)", name, err) return err } diff --git a/server/server.go b/server/server.go index 76a62fdce..8cf837420 100644 --- a/server/server.go +++ b/server/server.go @@ -193,7 +193,13 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) } - url, _ := s.registry.PeerURL(leader) + var url string + switch c.(type) { + case *JoinCommand, *RemoveCommand: + url, _ = s.registry.PeerURL(leader) + default: + url, _ = s.registry.URL(leader) + } redirect(url, w, req) return nil diff --git a/test/test.go b/test/test.go index acb212d48..f4e5fad2e 100644 --- a/test/test.go +++ b/test/test.go @@ -69,13 +69,13 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"} + argGroup[i] = []string{"etcd", "-v", "-d=/tmp/node1", "-n=node1"} if ssl { argGroup[i] = append(argGroup[i], sslServer1...) } } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-v", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} if ssl { argGroup[i] = append(argGroup[i], sslServer2...) }