From c39f7712f740aee0f4af905e73699a17b5eab378 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 6 Aug 2013 23:28:59 -0700 Subject: [PATCH 1/6] fix test --- client_handlers.go | 12 ++++++------ etcd.go | 2 +- test.go | 3 +++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 7bf83be57..371934059 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -1,10 +1,10 @@ package main import ( + "fmt" "github.com/coreos/etcd/store" "net/http" "strconv" - "fmt" "time" ) @@ -72,9 +72,9 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { if len(prevValue) != 0 { command := &TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue, + Key: key, + Value: value, + PrevValue: prevValue, ExpireTime: expireTime, } @@ -82,8 +82,8 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { } else { command := &SetCommand{ - Key: key, - Value: value, + Key: key, + Value: value, ExpireTime: expireTime, } diff --git a/etcd.go b/etcd.go index 605db53a1..a9eea5510 100644 --- a/etcd.go +++ b/etcd.go @@ -555,7 +555,7 @@ func getInfo(path string) *Info { info := &Info{ Hostname: hostname, - RaftPort: raftPort, + RaftPort: raftPort, ClientPort: clientPort, WebPort: webPort, diff --git a/test.go b/test.go index 02ce50c5a..db9bedb5a 100644 --- a/test.go +++ b/test.go @@ -74,6 +74,9 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, if err != nil { return nil, nil, err } + // we only add machine one to the cluster list + // thus we need to make sure other node start after the first one has done set up + time.Sleep(time.Millisecond * 200) } return argGroup, etcds, nil From 06fab60dd67978dfdc1c35bbcd6aaa11b3864a02 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 9 Aug 2013 10:12:50 -0700 Subject: [PATCH 2/6] simplify createTrans --- etcd.go | 154 +++++++++++++++++++++++--------------------------------- 1 file changed, 64 insertions(+), 90 deletions(-) diff --git a/etcd.go b/etcd.go index cc75a8bea..9a34400ef 100644 --- a/etcd.go +++ b/etcd.go @@ -89,14 +89,8 @@ func init() { // CONSTANTS const ( - HTTP = iota - HTTPS - HTTPSANDVERIFY -) - -const ( - SERVER = iota - CLIENT + RaftServer = iota + EtcdServer ) const ( @@ -200,19 +194,20 @@ func main() { info = getInfo(dirPath) - // security type - st := securityType(SERVER) + raftTlsConfs, ok := tlsConf(RaftServer) + if !ok { + fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") + } - clientSt := securityType(CLIENT) - - if st == -1 || clientSt == -1 { + etcdTlsConfs, ok := tlsConf(EtcdServer) + if !ok { fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") } // Create etcd key-value store etcdStore = store.CreateStore(maxSize) - startRaft(st) + startRaft(raftTlsConfs) if argInfo.WebPort != -1 { // start web @@ -221,18 +216,18 @@ func main() { go web.Start(raftServer, argInfo.WebPort) } - startClientTransport(*info, clientSt) + startEtcdTransport(*info, etcdTlsConfs[0]) } // Start the raft server -func startRaft(securityType int) { +func startRaft(tlsConfs []*tls.Config) { var err error raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort) // Create transporter for raft - raftTransporter = createTransporter(securityType) + raftTransporter = newTransporter(tlsConfs[1]) // Create raft server raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) @@ -328,44 +323,30 @@ func startRaft(securityType int) { } // start to response to raft requests - go startRaftTransport(*info, securityType) + go startRaftTransport(*info, tlsConfs[0]) } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func createTransporter(st int) transporter { +func newTransporter(tlsConf *tls.Config) transporter { t := transporter{} - switch st { - case HTTP: + if tlsConf == nil { t.scheme = "http://" - tr := &http.Transport{ - Dial: dialTimeout, - } - t.client = &http.Client{ - Transport: tr, + Transport: &http.Transport{ + Dial: dialTimeout, + }, } - case HTTPS: - fallthrough - case HTTPSANDVERIFY: + } else { t.scheme = "https://" - tlsCert, err := tls.LoadX509KeyPair(argInfo.ServerCertFile, argInfo.ServerKeyFile) - - if err != nil { - fatal(err) - } - tr := &http.Transport{ - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - InsecureSkipVerify: true, - }, + TLSClientConfig: tlsConf, Dial: dialTimeout, DisableCompression: true, } @@ -382,7 +363,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { } // Start to listen and response raft command -func startRaftTransport(info Info, st int) { +func startRaftTransport(info Info, tlsConf *tls.Config) { // internal commands http.HandleFunc("/join", JoinHttpHandler) @@ -393,24 +374,14 @@ func startRaftTransport(info Info, st int) { http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) http.HandleFunc("/client", ClientHttpHandler) - switch st { - - case HTTP: + if tlsConf == nil { fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort) fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil)) - case HTTPS: - fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort) - fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", info.RaftPort), info.ServerCertFile, argInfo.ServerKeyFile, nil)) - - case HTTPSANDVERIFY: - + } else { server := &http.Server{ - TLSConfig: &tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - ClientCAs: createCertPool(info.ServerCAFile), - }, - Addr: fmt.Sprintf(":%d", info.RaftPort), + TLSConfig: tlsConf, + Addr: fmt.Sprintf(":%d", info.RaftPort), } fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort) fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile)) @@ -419,7 +390,7 @@ func startRaftTransport(info Info, st int) { } // Start to listen and response client command -func startClientTransport(info Info, st int) { +func startEtcdTransport(info Info, tlsConf *tls.Config) { // external commands http.HandleFunc("/"+version+"/keys/", Multiplexer) http.HandleFunc("/"+version+"/watch/", WatchHttpHandler) @@ -429,24 +400,13 @@ func startClientTransport(info Info, st int) { http.HandleFunc("/stats", StatsHttpHandler) http.HandleFunc("/test/", TestHttpHandler) - switch st { - - case HTTP: + if tlsConf == nil { fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort) fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil)) - - case HTTPS: - fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort) - http.ListenAndServeTLS(fmt.Sprintf(":%d", info.ClientPort), info.ClientCertFile, info.ClientKeyFile, nil) - - case HTTPSANDVERIFY: - + } else { server := &http.Server{ - TLSConfig: &tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - ClientCAs: createCertPool(info.ClientCAFile), - }, - Addr: fmt.Sprintf(":%d", info.ClientPort), + TLSConfig: tlsConf, + Addr: fmt.Sprintf(":%d", info.ClientPort), } fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort) fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile)) @@ -456,20 +416,28 @@ func startClientTransport(info Info, st int) { //-------------------------------------- // Config //-------------------------------------- - -// Get the security type -func securityType(source int) int { - +func tlsConf(source int) ([]*tls.Config, bool) { var keyFile, certFile, CAFile string + var tlsCert tls.Certificate + var isAuth bool + var err error switch source { - case SERVER: + case RaftServer: keyFile = info.ServerKeyFile certFile = info.ServerCertFile CAFile = info.ServerCAFile - case CLIENT: + if keyFile != "" && certFile != "" { + tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile) + if err == nil { + fatal(err) + } + isAuth = true + } + + case EtcdServer: keyFile = info.ClientKeyFile certFile = info.ClientCertFile CAFile = info.ClientCAFile @@ -478,25 +446,28 @@ func securityType(source int) int { // If the user do not specify key file, cert file and // CA file, the type will be HTTP if keyFile == "" && certFile == "" && CAFile == "" { - - return HTTP - + return []*tls.Config{nil, nil}, true } if keyFile != "" && certFile != "" { - if CAFile != "" { - // If the user specify all the three file, the type - // will be HTTPS with client cert auth - return HTTPSANDVERIFY + serverConf := &tls.Config{} + serverConf.ClientAuth, serverConf.ClientCAs = newCertPool(CAFile) + + if isAuth { + raftTransConf := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + } + return []*tls.Config{serverConf, raftTransConf}, true } - // If the user specify key file and cert file but not - // CA file, the type will be HTTPS without client cert - // auth - return HTTPS + + return []*tls.Config{serverConf, nil}, true + } // bad specification - return -1 + return nil, false + } func parseInfo(path string) *Info { @@ -569,7 +540,10 @@ func getInfo(path string) *Info { } // Create client auth certpool -func createCertPool(CAFile string) *x509.CertPool { +func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { + if CAFile == "" { + return tls.NoClientCert, nil + } pemByte, _ := ioutil.ReadFile(CAFile) block, pemByte := pem.Decode(pemByte) @@ -584,7 +558,7 @@ func createCertPool(CAFile string) *x509.CertPool { certPool.AddCert(cert) - return certPool + return tls.RequireAndVerifyClientCert, certPool } // Send join requests to the leader. From a19048841f160fdb2b28b7a406f372156f9dd306 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Fri, 9 Aug 2013 16:25:07 -0700 Subject: [PATCH 3/6] feat(trasnport): add an independent node name Don't let the raft algorithm know anything about the transport. Give it a nodename instead. This will allow us to support more complex networking setups in the future. --- client_handlers.go | 8 ++--- command.go | 9 ++--- etcd.go | 90 ++++++++++++++++++++++++++++------------------ etcd_long_test.go | 2 +- machines.go | 12 +++---- raft_handlers.go | 16 +++++++-- test.go | 30 ++++++++++++---- transporter.go | 41 ++++++++++++++------- web/web.go | 3 +- 9 files changed, 141 insertions(+), 70 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 7bf83be57..941484c74 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { return } - debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key) value := req.FormValue("value") @@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key) command := &DeleteCommand{ Key: key, @@ -172,9 +172,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) if client { clientAddr, _ := getClientAddr(raftServer.Leader()) - url = scheme + clientAddr + path + url = clientAddr + path } else { - url = scheme + raftServer.Leader() + path + url = raftServer.Leader() + path } debugf("Redirect to %s", url) diff --git a/command.go b/command.go index 8674ec9a9..29118c8bb 100644 --- a/command.go +++ b/command.go @@ -111,9 +111,8 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // JoinCommand type JoinCommand struct { Name string `json:"name"` - Hostname string `json:"hostName"` - RaftPort int `json:"raftPort"` - ClientPort int `json:"clientPort"` + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` } // The name of the join command in the log @@ -137,12 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { return []byte("join fail"), fmt.Errorf(errors[103]) } + raftTransporter.AddPeer(c) + // add peer in raft err := raftServer.AddPeer(c.Name) // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort) + value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) return []byte("join success"), err diff --git a/etcd.go b/etcd.go index 9a34400ef..8e171d4e9 100644 --- a/etcd.go +++ b/etcd.go @@ -56,13 +56,14 @@ func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging") + flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") - flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine") - flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients") - flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers") - flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)") + flag.StringVar(&argInfo.Name, "n", "", "the node name (required)") + flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients") + flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers") + flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface") flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile") flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server") @@ -111,10 +112,11 @@ const ( //------------------------------------------------------------------------------ type Info struct { - Hostname string `json:"hostname"` - RaftPort int `json:"raftPort"` - ClientPort int `json:"clientPort"` - WebPort int `json:"webPort"` + Name string `json:"name"` + + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` + WebURL string `json:"webURL"` ServerCertFile string `json:"serverCertFile"` ServerKeyFile string `json:"serverKeyFile"` @@ -142,6 +144,21 @@ var info *Info // //------------------------------------------------------------------------------ +// Check a URL and clean it up if the user forgot the schema +func checkURL(u string, defaultSchema string) string { + p, err := url.Parse(u) + + if err != nil { + panic(err) + } + + if len(p.Host) == 0 && len(defaultSchema) != 0 { + return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "") + } + + return p.String() +} + //-------------------------------------- // Main //-------------------------------------- @@ -184,6 +201,16 @@ func main() { cluster = strings.Split(string(b), ",") } + // Otherwise ask user for info and write it to file. + argInfo.Name = strings.TrimSpace(argInfo.Name) + + if argInfo.Name == "" { + fatal("Please give the name of the server") + } + + argInfo.RaftURL = checkURL(argInfo.RaftURL, "http") + argInfo.ClientURL = checkURL(argInfo.ClientURL, "http") + // Setup commands. registerCommands() @@ -209,11 +236,11 @@ func main() { startRaft(raftTlsConfs) - if argInfo.WebPort != -1 { + if argInfo.WebURL != "" { // start web etcdStore.SetMessager(storeMsg) go webHelper() - go web.Start(raftServer, argInfo.WebPort) + go web.Start(raftServer, argInfo.WebURL) } startEtcdTransport(*info, etcdTlsConfs[0]) @@ -224,7 +251,7 @@ func main() { func startRaft(tlsConfs []*tls.Config) { var err error - raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort) + raftName := info.Name // Create transporter for raft raftTransporter = newTransporter(tlsConfs[1]) @@ -262,10 +289,9 @@ func startRaft(tlsConfs []*tls.Config) { // leader need to join self as a peer for { command := &JoinCommand{ - Name: raftServer.Name(), - Hostname: argInfo.Hostname, - RaftPort: argInfo.RaftPort, - ClientPort: argInfo.ClientPort, + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + ClientURL: argInfo.ClientURL, } _, err := raftServer.Do(command) if err == nil { @@ -333,6 +359,8 @@ func startRaft(tlsConfs []*tls.Config) { func newTransporter(tlsConf *tls.Config) transporter { t := transporter{} + t.names = make(map[string]*JoinCommand) + if tlsConf == nil { t.scheme = "http://" @@ -366,6 +394,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { func startRaftTransport(info Info, tlsConf *tls.Config) { // internal commands + http.HandleFunc("/name", NameHttpHandler) http.HandleFunc("/join", JoinHttpHandler) http.HandleFunc("/vote", VoteHttpHandler) http.HandleFunc("/log", GetLogHttpHandler) @@ -374,16 +403,16 @@ func startRaftTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) http.HandleFunc("/client", ClientHttpHandler) - if tlsConf == nil { - fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort) - fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil)) + u, _ := url.Parse(info.RaftURL) + fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + if tlsConf == nil { + http.ListenAndServe(u.Host, nil) } else { server := &http.Server{ TLSConfig: tlsConf, - Addr: fmt.Sprintf(":%d", info.RaftPort), + Addr: u.Host, } - fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort) fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile)) } @@ -400,15 +429,16 @@ func startEtcdTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/stats", StatsHttpHandler) http.HandleFunc("/test/", TestHttpHandler) + u, _ := url.Parse(info.ClientURL) + fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + if tlsConf == nil { - fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort) - fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil)) + fatal(http.ListenAndServe(u.Host, nil)) } else { server := &http.Server{ TLSConfig: tlsConf, - Addr: fmt.Sprintf(":%d", info.ClientPort), + Addr: u.Host, } - fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort) fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile)) } } @@ -518,13 +548,6 @@ func getInfo(path string) *Info { return info } - // Otherwise ask user for info and write it to file. - argInfo.Hostname = strings.TrimSpace(argInfo.Hostname) - - if argInfo.Hostname == "" { - fatal("Please give the address of the local machine") - } - info = &argInfo // Write to file. @@ -567,9 +590,8 @@ func joinCluster(s *raft.Server, serverName string) error { command := &JoinCommand{ Name: s.Name(), - Hostname: info.Hostname, - RaftPort: info.RaftPort, - ClientPort: info.ClientPort, + RaftURL: info.RaftURL, + ClientURL: info.ClientURL, } json.NewEncoder(&b).Encode(command) diff --git a/etcd_long_test.go b/etcd_long_test.go index e80643a37..0247332ad 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -36,7 +36,7 @@ func TestKillLeader(t *testing.T) { leader := "127.0.0.1:7001" - for i := 0; i < 10; i++ { + for i := 0; i < clusterSize; i++ { port, _ := strconv.Atoi(strings.Split(leader, ":")[1]) num := port - 7001 fmt.Println("kill server ", num) diff --git a/machines.go b/machines.go index dc358a8e3..bb2bdb3c6 100644 --- a/machines.go +++ b/machines.go @@ -1,20 +1,20 @@ package main import ( - "fmt" "path" - "strings" + "net/url" ) func getClientAddr(name string) (string, bool) { response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - values := strings.Split(response[0].Value, ",") + m, err := url.ParseQuery(response[0].Value) - hostname := values[0] - clientPort := values[2] + if err != nil { + panic("Failed to parse machines entry") + } - addr := fmt.Sprintf("%s:%s", hostname, clientPort) + addr := m["client"][0] return addr, true } diff --git a/raft_handlers.go b/raft_handlers.go index e535d3871..dbe30e12c 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -4,7 +4,6 @@ import ( "encoding/json" "github.com/coreos/go-raft" "net/http" - "strconv" ) //------------------------------------------------------------- @@ -91,7 +90,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) w.WriteHeader(http.StatusOK) - client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort) + client := argInfo.ClientURL w.Write([]byte(client)) } @@ -108,3 +107,16 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { return } } + +// Response to the join request +func NameHttpHandler(w http.ResponseWriter, req *http.Request) { + command := &JoinCommand{} + + if err := decodeJsonRequest(req, command); err == nil { + debugf("Receive Join Request from %s", command.Name) + dispatch(command, &w, req, false) + } else { + w.WriteHeader(http.StatusInternalServerError) + return + } +} diff --git a/test.go b/test.go index 279bccb97..e34f0b6bb 100644 --- a/test.go +++ b/test.go @@ -9,6 +9,7 @@ import ( "os" "strconv" "time" + "net/url" ) var client = http.Client{ @@ -59,10 +60,10 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, argGroup := make([][]string, size) for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"} + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"} } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"} } } @@ -103,7 +104,7 @@ func destroyCluster(etcds []*os.Process) error { // func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) { leaderMap := make(map[int]string) - baseAddrFormat := "http://0.0.0.0:400%d/leader" + baseAddrFormat := "http://0.0.0.0:400%d" for { knownLeader := "unknown" @@ -151,7 +152,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) { func getLeader(addr string) (string, error) { - resp, err := client.Get(addr) + resp, err := client.Get(addr + "/leader") if err != nil { return "", err @@ -163,14 +164,31 @@ func getLeader(addr string) (string, error) { } b, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() + c := etcd.NewClient() + path := "/_etcd/machines/" + string(b) + fmt.Println(path) + fmt.Println(addr) + response, err := c.GetFrom(path, addr) + fmt.Println(response) + if err != nil { + return "", err + } + + m, err := url.ParseQuery(response[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + addr = m["server"][0] + if err != nil { return "", err } - return string(b), nil + return addr, nil } diff --git a/transporter.go b/transporter.go index 012f53171..cc04ce8c6 100644 --- a/transporter.go +++ b/transporter.go @@ -15,6 +15,19 @@ type transporter struct { client *http.Client // scheme scheme string + names map[string]*JoinCommand +} + +func (t transporter) NameToRaftURL(name string) string { + return t.names[name].RaftURL +} + +func (t transporter) NameToClientURL(name string) string { + return t.names[name].ClientURL +} + +func (t transporter) AddPeer(jc *JoinCommand) { + t.names[jc.Name] = jc } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -23,12 +36,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send LogEntries to %s ", peer.Name()) + u := t.NameToRaftURL(peer.Name()) + debugf("Send LogEntries to %s ", u) - resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) if err != nil { - debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err) + debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) } if resp != nil { @@ -48,12 +62,13 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send Vote to %s", peer.Name()) + u := t.NameToRaftURL(peer.Name()) + debugf("Send Vote to %s", u) - resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b) if err != nil { - debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err) + debugf("Cannot send VoteRequest to %s : %s", u, err) } if resp != nil { @@ -73,10 +88,11 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(), + u := t.NameToRaftURL(peer.Name()) + debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) - resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) if resp != nil { defer resp.Body.Close() @@ -95,10 +111,11 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(), + u := t.NameToRaftURL(peer.Name()) + debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) - resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) if resp != nil { defer resp.Body.Close() @@ -123,12 +140,12 @@ func (t transporter) GetLeaderClientAddress() string { // Send server side POST request func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - resp, err := t.client.Post(t.scheme+path, "application/json", body) + resp, err := t.client.Post(path, "application/json", body) return resp, err } // Send server side GET request func (t transporter) Get(path string) (*http.Response, error) { - resp, err := t.client.Get(t.scheme + path) + resp, err := t.client.Get(path) return resp, err } diff --git a/web/web.go b/web/web.go index bd7d74297..a9eb2adc8 100644 --- a/web/web.go +++ b/web/web.go @@ -24,7 +24,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) { mainTempl.Execute(c, p) } -func Start(server *raft.Server, port int) { +func Start(server *raft.Server, webURL string) { + port := "4002" mainTempl = template.Must(template.New("index.html").Parse(index_html)) s = server From 0bb9fe9f50a4c1163025cffc3b2aa048c9337ab7 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Fri, 9 Aug 2013 17:10:09 -0700 Subject: [PATCH 4/6] hack(go-etcd): stop appending schema to everything trying to get the new tests to pass. --- third_party/github.com/coreos/go-etcd/etcd/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 703b4cec4..b6a66ccb4 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -139,7 +139,6 @@ func (c *Client) internalSyncCluster(machines []string) bool { // serverName should contain both hostName and port func (c *Client) createHttpPath(serverName string, _path string) string { httpPath := path.Join(serverName, _path) - httpPath = c.config.Scheme + "://" + httpPath return httpPath } From d3471eec7fc657d7be0ea2b672b8ca8bda5d51c9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 9 Aug 2013 21:06:16 -0700 Subject: [PATCH 5/6] separate_id --- client_handlers.go | 5 +-- command.go | 10 +++--- etcd.go | 23 ++++++------- etcd_long_test.go | 5 +-- etcd_test.go | 4 +-- machines.go | 8 ++--- name_url_map.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++ raft_handlers.go | 14 +++----- test.go | 27 +++------------- transporter.go | 26 +++++---------- 10 files changed, 124 insertions(+), 78 deletions(-) create mode 100644 name_url_map.go diff --git a/client_handlers.go b/client_handlers.go index 4833a9055..d5f9ea23b 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -194,13 +194,14 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) // command? //-------------------------------------- -// Handler to return the current leader name +// Handler to return the current leader's raft address func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) { leader := raftServer.Leader() if leader != "" { w.WriteHeader(http.StatusOK) - w.Write([]byte(raftServer.Leader())) + raftURL, _ := nameToRaftURL(leader) + w.Write([]byte(raftURL)) } else { // not likely, but it may happen diff --git a/command.go b/command.go index 29118c8bb..860b211d6 100644 --- a/command.go +++ b/command.go @@ -110,9 +110,9 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // JoinCommand type JoinCommand struct { - Name string `json:"name"` - RaftURL string `json:"raftURL"` - ClientURL string `json:"clientURL"` + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` } // The name of the join command in the log @@ -136,14 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { return []byte("join fail"), fmt.Errorf(errors[103]) } - raftTransporter.AddPeer(c) + addNameToURL(c.Name, c.RaftURL, c.EtcdURL) // add peer in raft err := raftServer.AddPeer(c.Name) // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL) + value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) return []byte("join success"), err diff --git a/etcd.go b/etcd.go index 8e171d4e9..95759bc16 100644 --- a/etcd.go +++ b/etcd.go @@ -56,7 +56,6 @@ func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging") - flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") @@ -112,11 +111,11 @@ const ( //------------------------------------------------------------------------------ type Info struct { - Name string `json:"name"` + Name string `json:"name"` - RaftURL string `json:"raftURL"` - ClientURL string `json:"clientURL"` - WebURL string `json:"webURL"` + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` + WebURL string `json:"webURL"` ServerCertFile string `json:"serverCertFile"` ServerKeyFile string `json:"serverKeyFile"` @@ -289,9 +288,9 @@ func startRaft(tlsConfs []*tls.Config) { // leader need to join self as a peer for { command := &JoinCommand{ - Name: raftServer.Name(), - RaftURL: argInfo.RaftURL, - ClientURL: argInfo.ClientURL, + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + EtcdURL: argInfo.ClientURL, } _, err := raftServer.Do(command) if err == nil { @@ -359,8 +358,6 @@ func startRaft(tlsConfs []*tls.Config) { func newTransporter(tlsConf *tls.Config) transporter { t := transporter{} - t.names = make(map[string]*JoinCommand) - if tlsConf == nil { t.scheme = "http://" @@ -589,9 +586,9 @@ func joinCluster(s *raft.Server, serverName string) error { var b bytes.Buffer command := &JoinCommand{ - Name: s.Name(), - RaftURL: info.RaftURL, - ClientURL: info.ClientURL, + Name: s.Name(), + RaftURL: info.RaftURL, + EtcdURL: info.ClientURL, } json.NewEncoder(&b).Encode(command) diff --git a/etcd_long_test.go b/etcd_long_test.go index 0247332ad..ff59caadc 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -34,10 +34,11 @@ func TestKillLeader(t *testing.T) { var totalTime time.Duration - leader := "127.0.0.1:7001" + leader := "http://127.0.0.1:7001" for i := 0; i < clusterSize; i++ { - port, _ := strconv.Atoi(strings.Split(leader, ":")[1]) + fmt.Println("leader is ", leader) + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) num := port - 7001 fmt.Println("kill server ", num) etcds[num].Kill() diff --git a/etcd_test.go b/etcd_test.go index 4d6381fc8..c6c68aac0 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -14,7 +14,7 @@ import ( func TestSingleNode(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - args := []string{"etcd", "-h=127.0.0.1", "-f", "-d=/tmp/node1"} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"} process, err := os.StartProcess("etcd", args, procAttr) if err != nil { @@ -56,7 +56,7 @@ func TestSingleNode(t *testing.T) { func TestSingleNodeRecovery(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - args := []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"} + args := []string{"etcd", "-n=node1", "-d=/tmp/node1"} process, err := os.StartProcess("etcd", append(args, "-f"), procAttr) if err != nil { diff --git a/machines.go b/machines.go index bb2bdb3c6..a104d68a8 100644 --- a/machines.go +++ b/machines.go @@ -1,20 +1,20 @@ package main import ( - "path" "net/url" + "path" ) func getClientAddr(name string) (string, bool) { - response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) + resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - m, err := url.ParseQuery(response[0].Value) + m, err := url.ParseQuery(resps[0].Value) if err != nil { panic("Failed to parse machines entry") } - addr := m["client"][0] + addr := m["etcd"][0] return addr, true } diff --git a/name_url_map.go b/name_url_map.go new file mode 100644 index 000000000..6c90c75c6 --- /dev/null +++ b/name_url_map.go @@ -0,0 +1,80 @@ +package main + +import ( + "net/url" + "path" +) + +// we map node name to url +type nodeInfo struct { + raftURL string + etcdURL string +} + +var namesMap = make(map[string]*nodeInfo) + +// nameToEtcdURL maps node name to its etcd http address +func nameToEtcdURL(name string) (string, bool) { + + if info, ok := namesMap[name]; ok { + // first try to read from the map + return info.etcdURL, true + } else { + // if fails, try to recover from etcd storage + key := path.Join("/_etcd/machines", name) + + resps, err := etcdStore.RawGet(key) + + if err != nil { + return "", false + } + + m, err := url.ParseQuery(resps[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + etcdURL := m["etcd"][0] + + return etcdURL, true + + } +} + +// nameToRaftURL maps node name to its raft http address +func nameToRaftURL(name string) (string, bool) { + if info, ok := namesMap[name]; ok { + // first try to read from the map + return info.raftURL, true + + } else { + // if fails, try to recover from etcd storage + key := path.Join("/_etcd/machines", name) + + resps, err := etcdStore.RawGet(key) + + if err != nil { + return "", false + } + + m, err := url.ParseQuery(resps[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + raftURL := m["raft"][0] + + return raftURL, true + + } +} + +// addNameToURL add a name that maps to raftURL and etcdURL +func addNameToURL(name string, raftURL string, etcdURL string) { + namesMap[name] = &nodeInfo{ + raftURL: raftURL, + etcdURL: etcdURL, + } +} diff --git a/raft_handlers.go b/raft_handlers.go index dbe30e12c..0b986a0ba 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -108,15 +108,9 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { } } -// Response to the join request +// Response to the name request func NameHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommand{} - - if err := decodeJsonRequest(req, command); err == nil { - debugf("Receive Join Request from %s", command.Name) - dispatch(command, &w, req, false) - } else { - w.WriteHeader(http.StatusInternalServerError) - return - } + debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name()) + w.WriteHeader(http.StatusOK) + w.Write([]byte(raftServer.Name())) } diff --git a/test.go b/test.go index 4166d81af..b34464745 100644 --- a/test.go +++ b/test.go @@ -9,7 +9,7 @@ import ( "os" "strconv" "time" - "net/url" + //"net/url" ) var client = http.Client{ @@ -60,7 +60,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, argGroup := make([][]string, size) for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"} + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"} } else { strI := strconv.Itoa(i + 1) argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"} @@ -75,7 +75,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, if err != nil { return nil, nil, err } - + // TODOBP: Change this sleep to wait until the master is up. // The problem is that if the master isn't up then the children // have to retry. This retry can take upwards of 15 seconds @@ -164,31 +164,14 @@ func getLeader(addr string) (string, error) { } b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() - c := etcd.NewClient() - path := "/_etcd/machines/" + string(b) - fmt.Println(path) - fmt.Println(addr) - response, err := c.GetFrom(path, addr) - fmt.Println(response) if err != nil { return "", err } - m, err := url.ParseQuery(response[0].Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - addr = m["server"][0] - - if err != nil { - return "", err - } - - return addr, nil + return string(b), nil } diff --git a/transporter.go b/transporter.go index cc04ce8c6..e79c592ae 100644 --- a/transporter.go +++ b/transporter.go @@ -15,19 +15,6 @@ type transporter struct { client *http.Client // scheme scheme string - names map[string]*JoinCommand -} - -func (t transporter) NameToRaftURL(name string) string { - return t.names[name].RaftURL -} - -func (t transporter) NameToClientURL(name string) string { - return t.names[name].ClientURL -} - -func (t transporter) AddPeer(jc *JoinCommand) { - t.names[jc.Name] = jc } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -36,7 +23,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send LogEntries to %s ", u) resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) @@ -62,7 +49,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send Vote to %s", u) resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -88,7 +75,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) @@ -111,7 +98,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u := t.NameToRaftURL(peer.Name()) + u, _ := nameToRaftURL(peer.Name()) debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) @@ -129,7 +116,10 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft // Get the client address of the leader in the cluster func (t transporter) GetLeaderClientAddress() string { - resp, _ := t.Get(raftServer.Leader() + "/client") + + u, _ := nameToRaftURL(raftServer.Leader()) + + resp, _ := t.Get(fmt.Sprintf("%s/client", u)) if resp != nil { body, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() From ce3c55ba3f66ece7af529057dae7141747f7c186 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 9 Aug 2013 23:03:49 -0700 Subject: [PATCH 6/6] refactor --- etcd.go | 20 ++++---- client_handlers.go => etcd_handlers.go | 8 +-- machines.go | 2 +- name_url_map.go | 68 +++++++++++--------------- raft_handlers.go | 9 ++-- test.go | 1 - transporter.go | 15 ------ web/web.go | 8 +-- 8 files changed, 52 insertions(+), 79 deletions(-) rename client_handlers.go => etcd_handlers.go (97%) diff --git a/etcd.go b/etcd.go index 95759bc16..0c76a8072 100644 --- a/etcd.go +++ b/etcd.go @@ -60,7 +60,7 @@ func init() { flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") flag.StringVar(&argInfo.Name, "n", "", "the node name (required)") - flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients") + flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the port to communicate with clients") flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers") flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface") @@ -113,9 +113,9 @@ const ( type Info struct { Name string `json:"name"` - RaftURL string `json:"raftURL"` - ClientURL string `json:"clientURL"` - WebURL string `json:"webURL"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` + WebURL string `json:"webURL"` ServerCertFile string `json:"serverCertFile"` ServerKeyFile string `json:"serverKeyFile"` @@ -208,7 +208,7 @@ func main() { } argInfo.RaftURL = checkURL(argInfo.RaftURL, "http") - argInfo.ClientURL = checkURL(argInfo.ClientURL, "http") + argInfo.EtcdURL = checkURL(argInfo.EtcdURL, "http") // Setup commands. registerCommands() @@ -290,7 +290,7 @@ func startRaft(tlsConfs []*tls.Config) { command := &JoinCommand{ Name: raftServer.Name(), RaftURL: argInfo.RaftURL, - EtcdURL: argInfo.ClientURL, + EtcdURL: argInfo.EtcdURL, } _, err := raftServer.Do(command) if err == nil { @@ -398,7 +398,7 @@ func startRaftTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/log/append", AppendEntriesHttpHandler) http.HandleFunc("/snapshot", SnapshotHttpHandler) http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) - http.HandleFunc("/client", ClientHttpHandler) + http.HandleFunc("/etcdURL", EtcdURLHttpHandler) u, _ := url.Parse(info.RaftURL) fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) @@ -426,8 +426,8 @@ func startEtcdTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/stats", StatsHttpHandler) http.HandleFunc("/test/", TestHttpHandler) - u, _ := url.Parse(info.ClientURL) - fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + u, _ := url.Parse(info.EtcdURL) + fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u) if tlsConf == nil { fatal(http.ListenAndServe(u.Host, nil)) @@ -588,7 +588,7 @@ func joinCluster(s *raft.Server, serverName string) error { command := &JoinCommand{ Name: s.Name(), RaftURL: info.RaftURL, - EtcdURL: info.ClientURL, + EtcdURL: info.EtcdURL, } json.NewEncoder(&b).Encode(command) diff --git a/client_handlers.go b/etcd_handlers.go similarity index 97% rename from client_handlers.go rename to etcd_handlers.go index d5f9ea23b..e330ab926 100644 --- a/client_handlers.go +++ b/etcd_handlers.go @@ -9,7 +9,7 @@ import ( ) //------------------------------------------------------------------- -// Handlers to handle etcd-store related request via raft client port +// Handlers to handle etcd-store related request via etcd url //------------------------------------------------------------------- // Multiplex GET/POST/DELETE request to corresponding handlers @@ -171,7 +171,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) var url string if client { - clientAddr, _ := getClientAddr(raftServer.Leader()) + clientAddr, _ := getEtcdURL(raftServer.Leader()) url = clientAddr + path } else { url = raftServer.Leader() + path @@ -216,14 +216,14 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { // Add itself to the machine list first // Since peer map does not contain the server itself - machines, _ := getClientAddr(raftServer.Name()) + machines, _ := getEtcdURL(raftServer.Name()) // Add all peers to the list and separate by comma // We do not use json here since we accept machines list // in the command line separate by comma. for peerName, _ := range peers { - if addr, ok := getClientAddr(peerName); ok { + if addr, ok := getEtcdURL(peerName); ok { machines = machines + "," + addr } } diff --git a/machines.go b/machines.go index a104d68a8..0b6681f3f 100644 --- a/machines.go +++ b/machines.go @@ -5,7 +5,7 @@ import ( "path" ) -func getClientAddr(name string) (string, bool) { +func getEtcdURL(name string) (string, bool) { resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) m, err := url.ParseQuery(resps[0].Value) diff --git a/name_url_map.go b/name_url_map.go index 6c90c75c6..7acd1848b 100644 --- a/name_url_map.go +++ b/name_url_map.go @@ -19,27 +19,11 @@ func nameToEtcdURL(name string) (string, bool) { if info, ok := namesMap[name]; ok { // first try to read from the map return info.etcdURL, true - } else { - // if fails, try to recover from etcd storage - key := path.Join("/_etcd/machines", name) - - resps, err := etcdStore.RawGet(key) - - if err != nil { - return "", false - } - - m, err := url.ParseQuery(resps[0].Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - etcdURL := m["etcd"][0] - - return etcdURL, true - } + + // if fails, try to recover from etcd storage + return readURL(name, "etcd") + } // nameToRaftURL maps node name to its raft http address @@ -48,27 +32,10 @@ func nameToRaftURL(name string) (string, bool) { // first try to read from the map return info.raftURL, true - } else { - // if fails, try to recover from etcd storage - key := path.Join("/_etcd/machines", name) - - resps, err := etcdStore.RawGet(key) - - if err != nil { - return "", false - } - - m, err := url.ParseQuery(resps[0].Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - raftURL := m["raft"][0] - - return raftURL, true - } + + // if fails, try to recover from etcd storage + return readURL(name, "raft") } // addNameToURL add a name that maps to raftURL and etcdURL @@ -78,3 +45,24 @@ func addNameToURL(name string, raftURL string, etcdURL string) { etcdURL: etcdURL, } } + +func readURL(nodeName string, urlName string) (string, bool) { + // if fails, try to recover from etcd storage + key := path.Join("/_etcd/machines", nodeName) + + resps, err := etcdStore.RawGet(key) + + if err != nil { + return "", false + } + + m, err := url.ParseQuery(resps[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + url := m[urlName][0] + + return url, true +} diff --git a/raft_handlers.go b/raft_handlers.go index 0b986a0ba..8f52e4e30 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -86,12 +86,11 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) } -// Get the port that listening for client connecting of the server -func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) +// Get the port that listening for etcd connecting of the server +func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { + debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name()) w.WriteHeader(http.StatusOK) - client := argInfo.ClientURL - w.Write([]byte(client)) + w.Write([]byte(argInfo.EtcdURL)) } // Response to the join request diff --git a/test.go b/test.go index b34464745..ae82d637a 100644 --- a/test.go +++ b/test.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "time" - //"net/url" ) var client = http.Client{ diff --git a/transporter.go b/transporter.go index e79c592ae..9e12b9012 100644 --- a/transporter.go +++ b/transporter.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/coreos/go-raft" "io" - "io/ioutil" "net/http" ) @@ -114,20 +113,6 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft return aersp } -// Get the client address of the leader in the cluster -func (t transporter) GetLeaderClientAddress() string { - - u, _ := nameToRaftURL(raftServer.Leader()) - - resp, _ := t.Get(fmt.Sprintf("%s/client", u)) - if resp != nil { - body, _ := ioutil.ReadAll(resp.Body) - resp.Body.Close() - return string(body) - } - return "" -} - // Send server side POST request func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { resp, err := t.client.Post(path, "application/json", body) diff --git a/web/web.go b/web/web.go index a9eb2adc8..63086f6a6 100644 --- a/web/web.go +++ b/web/web.go @@ -6,6 +6,7 @@ import ( "github.com/coreos/go-raft" "html/template" "net/http" + "net/url" ) var s *raft.Server @@ -25,7 +26,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) { } func Start(server *raft.Server, webURL string) { - port := "4002" + u, _ := url.Parse(webURL) + mainTempl = template.Must(template.New("index.html").Parse(index_html)) s = server @@ -33,6 +35,6 @@ func Start(server *raft.Server, webURL string) { http.HandleFunc("/", mainHandler) http.Handle("/ws", websocket.Handler(wsHandler)) - fmt.Println("web listening at port ", port) - http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + fmt.Printf("etcd web server listening on %s\n", u) + http.ListenAndServe(u.Host, nil) }