From 71c0ffec3ad952440769d85aa4564cd1f69c4444 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Jun 2013 15:29:25 -0700 Subject: [PATCH] election new leader and rejoin works --- handlers.go | 36 ++++++++++++++++++++---------------- raftd.go | 24 ++++++++++++++---------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/handlers.go b/handlers.go index 871bacb1c..286f8e97c 100644 --- a/handlers.go +++ b/handlers.go @@ -22,22 +22,6 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { json.NewEncoder(w).Encode(server.LogEntries()) } -func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] POST http://%v/join", server.Name()) - command := &JoinCommand{} - if err := decodeJsonRequest(req, command); err == nil { - if _, err= server.Do(command); err != nil { - warn("raftd: Unable to join: %v", err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - } else { - warn("[join] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) - } -} - func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) @@ -59,6 +43,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) debug("My role is %s", server.State()) if resp, _ := server.AppendEntries(aereq); resp != nil { + debug("write back success") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) if !resp.Success { @@ -68,6 +53,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { } } warn("[append] ERROR: %v", err) + debug("write back") w.WriteHeader(http.StatusInternalServerError) } @@ -86,6 +72,24 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) } + +func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { + debug("[recv] POST http://%v/join", server.Name()) + command := &JoinCommand{} + if err := decodeJsonRequest(req, command); err == nil { + if _, err= server.Do(command); err != nil { + warn("raftd: Unable to join: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + } else { + warn("[join] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } +} + + func SetHttpHandler(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/set/"):] diff --git a/raftd.go b/raftd.go index befc774a3..f12104521 100644 --- a/raftd.go +++ b/raftd.go @@ -109,23 +109,31 @@ func main() { fmt.Println("3 join as ", server.State(), " term ", server.Term()) if leaderHost == "" { fmt.Println("init") - server.SetElectionTimeout(1 * time.Second) + server.SetElectionTimeout(10 * time.Second) server.SetHeartbeatTimeout(1 * time.Second) server.StartLeader() + // join self + + command := &JoinCommand{} + command.Name = server.Name() + + server.Do(command) } else { - server.SetElectionTimeout(1 * time.Second) + server.SetElectionTimeout(10 * time.Second) server.SetHeartbeatTimeout(1 * time.Second) server.StartFollower() fmt.Println("4 join as ", server.State(), " term ", server.Term()) Join(server, leaderHost) fmt.Println("success join") } + } else { + server.SetElectionTimeout(10 * time.Second) + server.SetHeartbeatTimeout(1 * time.Second) + server.StartFollower() } + // open snapshot //go server.Snapshot() - - // Create HTTP interface. - //r := mux.NewRouter() // internal commands http.HandleFunc("/join", JoinHttpHandler) @@ -136,14 +144,10 @@ func main() { // external commands http.HandleFunc("/set/", SetHttpHandler) - //r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET") + http.HandleFunc("/get/", GetHttpHandler) http.HandleFunc("/delete/", DeleteHttpHandler) http.HandleFunc("/watch/", WatchHttpHandler) - //http.Handle("/", r) - - http.HandleFunc("/get/", GetHttpHandler) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil)) }