This commit is contained in:
Xiang Li 2013-08-09 23:03:49 -07:00
parent d3471eec7f
commit ce3c55ba3f
8 changed files with 52 additions and 79 deletions

20
etcd.go
View File

@ -60,7 +60,7 @@ func init() {
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers")
flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface")
@ -113,9 +113,9 @@ const (
type Info struct {
Name string `json:"name"`
RaftURL string `json:"raftURL"`
ClientURL string `json:"clientURL"`
WebURL string `json:"webURL"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
WebURL string `json:"webURL"`
ServerCertFile string `json:"serverCertFile"`
ServerKeyFile string `json:"serverKeyFile"`
@ -208,7 +208,7 @@ func main() {
}
argInfo.RaftURL = checkURL(argInfo.RaftURL, "http")
argInfo.ClientURL = checkURL(argInfo.ClientURL, "http")
argInfo.EtcdURL = checkURL(argInfo.EtcdURL, "http")
// Setup commands.
registerCommands()
@ -290,7 +290,7 @@ func startRaft(tlsConfs []*tls.Config) {
command := &JoinCommand{
Name: raftServer.Name(),
RaftURL: argInfo.RaftURL,
EtcdURL: argInfo.ClientURL,
EtcdURL: argInfo.EtcdURL,
}
_, err := raftServer.Do(command)
if err == nil {
@ -398,7 +398,7 @@ func startRaftTransport(info Info, tlsConf *tls.Config) {
http.HandleFunc("/log/append", AppendEntriesHttpHandler)
http.HandleFunc("/snapshot", SnapshotHttpHandler)
http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
http.HandleFunc("/client", ClientHttpHandler)
http.HandleFunc("/etcdURL", EtcdURLHttpHandler)
u, _ := url.Parse(info.RaftURL)
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
@ -426,8 +426,8 @@ func startEtcdTransport(info Info, tlsConf *tls.Config) {
http.HandleFunc("/stats", StatsHttpHandler)
http.HandleFunc("/test/", TestHttpHandler)
u, _ := url.Parse(info.ClientURL)
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
u, _ := url.Parse(info.EtcdURL)
fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
if tlsConf == nil {
fatal(http.ListenAndServe(u.Host, nil))
@ -588,7 +588,7 @@ func joinCluster(s *raft.Server, serverName string) error {
command := &JoinCommand{
Name: s.Name(),
RaftURL: info.RaftURL,
EtcdURL: info.ClientURL,
EtcdURL: info.EtcdURL,
}
json.NewEncoder(&b).Encode(command)

View File

@ -9,7 +9,7 @@ import (
)
//-------------------------------------------------------------------
// Handlers to handle etcd-store related request via raft client port
// Handlers to handle etcd-store related request via etcd url
//-------------------------------------------------------------------
// Multiplex GET/POST/DELETE request to corresponding handlers
@ -171,7 +171,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
var url string
if client {
clientAddr, _ := getClientAddr(raftServer.Leader())
clientAddr, _ := getEtcdURL(raftServer.Leader())
url = clientAddr + path
} else {
url = raftServer.Leader() + path
@ -216,14 +216,14 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
// Add itself to the machine list first
// Since peer map does not contain the server itself
machines, _ := getClientAddr(raftServer.Name())
machines, _ := getEtcdURL(raftServer.Name())
// Add all peers to the list and separate by comma
// We do not use json here since we accept machines list
// in the command line separate by comma.
for peerName, _ := range peers {
if addr, ok := getClientAddr(peerName); ok {
if addr, ok := getEtcdURL(peerName); ok {
machines = machines + "," + addr
}
}

View File

@ -5,7 +5,7 @@ import (
"path"
)
func getClientAddr(name string) (string, bool) {
func getEtcdURL(name string) (string, bool) {
resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
m, err := url.ParseQuery(resps[0].Value)

View File

@ -19,27 +19,11 @@ func nameToEtcdURL(name string) (string, bool) {
if info, ok := namesMap[name]; ok {
// first try to read from the map
return info.etcdURL, true
} else {
// if fails, try to recover from etcd storage
key := path.Join("/_etcd/machines", name)
resps, err := etcdStore.RawGet(key)
if err != nil {
return "", false
}
m, err := url.ParseQuery(resps[0].Value)
if err != nil {
panic("Failed to parse machines entry")
}
etcdURL := m["etcd"][0]
return etcdURL, true
}
// if fails, try to recover from etcd storage
return readURL(name, "etcd")
}
// nameToRaftURL maps node name to its raft http address
@ -48,27 +32,10 @@ func nameToRaftURL(name string) (string, bool) {
// first try to read from the map
return info.raftURL, true
} else {
// if fails, try to recover from etcd storage
key := path.Join("/_etcd/machines", name)
resps, err := etcdStore.RawGet(key)
if err != nil {
return "", false
}
m, err := url.ParseQuery(resps[0].Value)
if err != nil {
panic("Failed to parse machines entry")
}
raftURL := m["raft"][0]
return raftURL, true
}
// if fails, try to recover from etcd storage
return readURL(name, "raft")
}
// addNameToURL add a name that maps to raftURL and etcdURL
@ -78,3 +45,24 @@ func addNameToURL(name string, raftURL string, etcdURL string) {
etcdURL: etcdURL,
}
}
func readURL(nodeName string, urlName string) (string, bool) {
// if fails, try to recover from etcd storage
key := path.Join("/_etcd/machines", nodeName)
resps, err := etcdStore.RawGet(key)
if err != nil {
return "", false
}
m, err := url.ParseQuery(resps[0].Value)
if err != nil {
panic("Failed to parse machines entry")
}
url := m[urlName][0]
return url, true
}

View File

@ -86,12 +86,11 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
// Get the port that listening for client connecting of the server
func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
// Get the port that listening for etcd connecting of the server
func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name())
w.WriteHeader(http.StatusOK)
client := argInfo.ClientURL
w.Write([]byte(client))
w.Write([]byte(argInfo.EtcdURL))
}
// Response to the join request

View File

@ -9,7 +9,6 @@ import (
"os"
"strconv"
"time"
//"net/url"
)
var client = http.Client{

View File

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/coreos/go-raft"
"io"
"io/ioutil"
"net/http"
)
@ -114,20 +113,6 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
return aersp
}
// Get the client address of the leader in the cluster
func (t transporter) GetLeaderClientAddress() string {
u, _ := nameToRaftURL(raftServer.Leader())
resp, _ := t.Get(fmt.Sprintf("%s/client", u))
if resp != nil {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return string(body)
}
return ""
}
// Send server side POST request
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
resp, err := t.client.Post(path, "application/json", body)

View File

@ -6,6 +6,7 @@ import (
"github.com/coreos/go-raft"
"html/template"
"net/http"
"net/url"
)
var s *raft.Server
@ -25,7 +26,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
}
func Start(server *raft.Server, webURL string) {
port := "4002"
u, _ := url.Parse(webURL)
mainTempl = template.Must(template.New("index.html").Parse(index_html))
s = server
@ -33,6 +35,6 @@ func Start(server *raft.Server, webURL string) {
http.HandleFunc("/", mainHandler)
http.Handle("/ws", websocket.Handler(wsHandler))
fmt.Println("web listening at port ", port)
http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
fmt.Printf("etcd web server listening on %s\n", u)
http.ListenAndServe(u.Host, nil)
}