diff --git a/client_handlers.go b/client_handlers.go index 4833a9055..d5f9ea23b 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -194,13 +194,14 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) // command? //-------------------------------------- -// Handler to return the current leader name +// Handler to return the current leader's raft address func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) { leader := raftServer.Leader() if leader != "" { w.WriteHeader(http.StatusOK) - w.Write([]byte(raftServer.Leader())) + raftURL, _ := nameToRaftURL(leader) + w.Write([]byte(raftURL)) } else { // not likely, but it may happen diff --git a/command.go b/command.go index 29118c8bb..860b211d6 100644 --- a/command.go +++ b/command.go @@ -110,9 +110,9 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // JoinCommand type JoinCommand struct { - Name string `json:"name"` - RaftURL string `json:"raftURL"` - ClientURL string `json:"clientURL"` + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` } // The name of the join command in the log @@ -136,14 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { return []byte("join fail"), fmt.Errorf(errors[103]) } - raftTransporter.AddPeer(c) + addNameToURL(c.Name, c.RaftURL, c.EtcdURL) // add peer in raft err := raftServer.AddPeer(c.Name) // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL) + value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) return []byte("join success"), err diff --git a/etcd.go b/etcd.go index 8e171d4e9..95759bc16 100644 --- a/etcd.go +++ b/etcd.go @@ -56,7 +56,6 @@ func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging") - flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") @@ -112,11 +111,11 @@ const ( //------------------------------------------------------------------------------ type Info struct { - Name string `json:"name"` + Name string `json:"name"` - RaftURL string `json:"raftURL"` - ClientURL string `json:"clientURL"` - WebURL string `json:"webURL"` + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` + WebURL string `json:"webURL"` ServerCertFile string `json:"serverCertFile"` ServerKeyFile string `json:"serverKeyFile"` @@ -289,9 +288,9 @@ func startRaft(tlsConfs []*tls.Config) { // leader need to join self as a peer for { command := &JoinCommand{ - Name: raftServer.Name(), - RaftURL: argInfo.RaftURL, - ClientURL: argInfo.ClientURL, + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + EtcdURL: argInfo.ClientURL, } _, err := raftServer.Do(command) if err == nil { @@ -359,8 +358,6 @@ func startRaft(tlsConfs []*tls.Config) { func newTransporter(tlsConf *tls.Config) transporter { t := transporter{} - t.names = make(map[string]*JoinCommand) - if tlsConf == nil { t.scheme = "http://" @@ -589,9 +586,9 @@ func joinCluster(s *raft.Server, serverName string) error { var b bytes.Buffer command := &JoinCommand{ - Name: s.Name(), - RaftURL: info.RaftURL, - ClientURL: info.ClientURL, + Name: s.Name(), + RaftURL: info.RaftURL, + EtcdURL: info.ClientURL, } json.NewEncoder(&b).Encode(command) diff --git a/etcd_long_test.go b/etcd_long_test.go index 0247332ad..ff59caadc 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -34,10 +34,11 @@ func TestKillLeader(t *testing.T) { var totalTime time.Duration - leader := "127.0.0.1:7001" + leader := "http://127.0.0.1:7001" for i := 0; i < clusterSize; i++ { - port, _ := strconv.Atoi(strings.Split(leader, ":")[1]) + fmt.Println("leader is ", leader) + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) num := port - 7001 fmt.Println("kill server ", num) etcds[num].Kill() diff --git a/etcd_test.go b/etcd_test.go index 4d6381fc8..c6c68aac0 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -14,7 +14,7 @@ import ( func TestSingleNode(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - args := []string{"etcd", "-h=127.0.0.1", "-f", "-d=/tmp/node1"} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"} process, err := os.StartProcess("etcd", args, procAttr) if err != nil { @@ -56,7 +56,7 @@ func TestSingleNode(t *testing.T) { func TestSingleNodeRecovery(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - args := []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"} + args := []string{"etcd", "-n=node1", "-d=/tmp/node1"} process, err := os.StartProcess("etcd", append(args, "-f"), procAttr) if err != nil { diff --git a/machines.go b/machines.go index bb2bdb3c6..a104d68a8 100644 --- a/machines.go +++ b/machines.go @@ -1,20 +1,20 @@ package main import ( - "path" "net/url" + "path" ) func getClientAddr(name string) (string, bool) { - response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) + resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - m, err := url.ParseQuery(response[0].Value) + m, err := url.ParseQuery(resps[0].Value) if err != nil { panic("Failed to parse machines entry") } - addr := m["client"][0] + addr := m["etcd"][0] return addr, true } diff --git a/name_url_map.go b/name_url_map.go new file mode 100644 index 000000000..6c90c75c6 --- /dev/null +++ b/name_url_map.go @@ -0,0 +1,80 @@ +package main + +import ( + "net/url" + "path" +) + +// we map node name to url +type nodeInfo struct { + raftURL string + etcdURL string +} + +var namesMap = make(map[string]*nodeInfo) + +// nameToEtcdURL maps node name to its etcd http address +func nameToEtcdURL(name string) (string, bool) { + + if info, ok := namesMap[name]; ok { + // first try to read from the map + return info.etcdURL, true + } else { + // if fails, try to recover from etcd storage + key := path.Join("/_etcd/machines", name) + + resps, err := etcdStore.RawGet(key) + + if err != nil { + return "", false + } + + m, err := url.ParseQuery(resps[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + etcdURL := m["etcd"][0] + + return etcdURL, true + + } +} + +// nameToRaftURL maps node name to its raft http address +func nameToRaftURL(name string) (string, bool) { + if info, ok := namesMap[name]; ok { + // first try to read from the map + return info.raftURL, true + + } else { + // if fails, try to recover from etcd storage + key := path.Join("/_etcd/machines", name) + + resps, err := etcdStore.RawGet(key) + + if err != nil { + return "", false + } + + m, err := url.ParseQuery(resps[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + raftURL := m["raft"][0] + + return raftURL, true + + } +} + +// addNameToURL add a name that maps to raftURL and etcdURL +func addNameToURL(name string, raftURL string, etcdURL string) { + namesMap[name] = &nodeInfo{ + raftURL: raftURL, + etcdURL: etcdURL, + } +} diff --git a/raft_handlers.go b/raft_handlers.go index dbe30e12c..0b986a0ba 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -108,15 +108,9 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { } } -// Response to the join request +// Response to the name request func NameHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommand{} - - if err := decodeJsonRequest(req, command); err == nil { - debugf("Receive Join Request from %s", command.Name) - dispatch(command, &w, req, false) - } else { - w.WriteHeader(http.StatusInternalServerError) - return - } + debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name()) + w.WriteHeader(http.StatusOK) + w.Write([]byte(raftServer.Name())) } diff --git a/test.go b/test.go index 4166d81af..b34464745 100644 --- a/test.go +++ b/test.go @@ -9,7 +9,7 @@ import ( "os" "strconv" "time" - "net/url" + //"net/url" ) var client = http.Client{ @@ -60,7 +60,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, argGroup := make([][]string, size) for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"} + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"} } else { strI := strconv.Itoa(i + 1) argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"} @@ -75,7 +75,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, if err != nil { return nil, nil, err } - + // TODOBP: Change this sleep to wait until the master is up. // The problem is that if the master isn't up then the children // have to retry. This retry can take upwards of 15 seconds @@ -164,31 +164,14 @@ func getLeader(addr string) (string, error) { } b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() - c := etcd.NewClient() - path := "/_etcd/machines/" + string(b) - fmt.Println(path) - fmt.Println(addr) - response, err := c.GetFrom(path, addr) - fmt.Println(response) if err != nil { return "", err } - m, err := url.ParseQuery(response[0].Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - addr = m["server"][0] - - if err != nil { - return "", err - } - - return addr, nil + return string(b), nil } diff --git a/transporter.go b/transporter.go index cc04ce8c6..e79c592ae 100644 --- a/transporter.go +++ b/transporter.go @@ -15,19 +15,6 @@ type transporter struct { client *http.Client // scheme scheme string - names map[string]*JoinCommand -} - -func (t transporter) NameToRaftURL(name string) string { - return t.names[name].RaftURL -} - -func (t transporter) NameToClientURL(name string) string { - return t.names[name].ClientURL -} - -func (t transporter) AddPeer(jc *JoinCommand) { - t.names[jc.Name] = jc } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -36,7 +23,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send LogEntries to %s ", u) resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) @@ -62,7 +49,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send Vote to %s", u) resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -88,7 +75,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) @@ -111,7 +98,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) @@ -129,7 +116,10 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft // Get the client address of the leader in the cluster func (t transporter) GetLeaderClientAddress() string { - resp, _ := t.Get(raftServer.Leader() + "/client") + + u, _ := nameToRaftURL(raftServer.Leader()) + + resp, _ := t.Get(fmt.Sprintf("%s/client", u)) if resp != nil { body, _ := ioutil.ReadAll(resp.Body) resp.Body.Close()