mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
redirect join to server port
This commit is contained in:
parent
8af746ef6d
commit
45af72c941
@ -51,7 +51,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
|||||||
(*w).WriteHeader(http.StatusInternalServerError)
|
(*w).WriteHeader(http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatch(command, w, req)
|
dispatch(command, w, req, true)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,7 +77,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatch(command, &w, req)
|
dispatch(command, &w, req, true)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,11 +90,11 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
|||||||
command := &DeleteCommand{}
|
command := &DeleteCommand{}
|
||||||
command.Key = key
|
command.Key = key
|
||||||
|
|
||||||
dispatch(command, w, req)
|
dispatch(command, w, req, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispatch the command to leader
|
// Dispatch the command to leader
|
||||||
func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
|
func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
|
||||||
if raftServer.State() == "leader" {
|
if raftServer.State() == "leader" {
|
||||||
if body, err := raftServer.Do(c); err != nil {
|
if body, err := raftServer.Do(c); err != nil {
|
||||||
warn("Commit failed %v", err)
|
warn("Commit failed %v", err)
|
||||||
@ -132,7 +132,13 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
|
|||||||
scheme = "http://"
|
scheme = "http://"
|
||||||
}
|
}
|
||||||
|
|
||||||
url := scheme + raftTransporter.GetLeaderClientAddress() + path
|
var url string
|
||||||
|
|
||||||
|
if client {
|
||||||
|
url = scheme + raftTransporter.GetLeaderClientAddress() + path
|
||||||
|
} else {
|
||||||
|
url = scheme + raftServer.Leader() + path
|
||||||
|
}
|
||||||
|
|
||||||
debug("Redirect to %s", url)
|
debug("Redirect to %s", url)
|
||||||
|
|
||||||
|
22
etcd.go
22
etcd.go
@ -425,6 +425,7 @@ func getInfo(path string) *Info {
|
|||||||
|
|
||||||
// Delete the old configuration if exist
|
// Delete the old configuration if exist
|
||||||
if ignore {
|
if ignore {
|
||||||
|
|
||||||
logPath := fmt.Sprintf("%s/log", path)
|
logPath := fmt.Sprintf("%s/log", path)
|
||||||
snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
|
snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
|
||||||
os.Remove(infoPath)
|
os.Remove(infoPath)
|
||||||
@ -507,11 +508,20 @@ func joinCluster(s *raft.Server, serverName string) error {
|
|||||||
json.NewEncoder(&b).Encode(command)
|
json.NewEncoder(&b).Encode(command)
|
||||||
|
|
||||||
// t must be ok
|
// t must be ok
|
||||||
t, _ := raftServer.Transporter().(transporter)
|
t, ok := raftServer.Transporter().(transporter)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
panic("wrong type")
|
||||||
|
}
|
||||||
|
|
||||||
debug("Send Join Request to %s", serverName)
|
debug("Send Join Request to %s", serverName)
|
||||||
|
|
||||||
resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
|
resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
|
||||||
|
|
||||||
|
debug("Finish Join Request to %s", serverName)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
fmt.Println(err, resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to join: %v", err)
|
return fmt.Errorf("Unable to join: %v", err)
|
||||||
}
|
}
|
||||||
@ -520,17 +530,17 @@ func joinCluster(s *raft.Server, serverName string) error {
|
|||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if resp.StatusCode == http.StatusServiceUnavailable {
|
|
||||||
address, err := ioutil.ReadAll(resp.Body)
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||||
if err != nil {
|
fmt.Println("redirect")
|
||||||
warn("Cannot Read Leader info: %v", err)
|
address = resp.Header.Get("Location")
|
||||||
}
|
|
||||||
debug("Leader is %s", address)
|
debug("Leader is %s", address)
|
||||||
debug("Send Join Request to %s", address)
|
debug("Send Join Request to %s", address)
|
||||||
json.NewEncoder(&b).Encode(command)
|
json.NewEncoder(&b).Encode(command)
|
||||||
resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
|
resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Unable to join: %v", err)
|
return fmt.Errorf("Unable to join: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
if err := decodeJsonRequest(req, command); err == nil {
|
if err := decodeJsonRequest(req, command); err == nil {
|
||||||
debug("Receive Join Request from %s", command.Name)
|
debug("Receive Join Request from %s", command.Name)
|
||||||
dispatch(command, &w, req)
|
dispatch(command, &w, req, false)
|
||||||
} else {
|
} else {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
Loading…
x
Reference in New Issue
Block a user