From 58501151da1ac2dcbc804f6166801ed828892c4d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 16 Jul 2013 18:03:21 -0700 Subject: [PATCH 1/4] change snapshot api due to the change in go-raft --- client_handlers.go | 4 +++- command.go | 1 - etcd.go | 8 ++++++-- raft_handlers.go | 18 +++++++++++++++++- transporter.go | 22 ++++++++++++++++++++++ 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index c6bc61fc0..522046e37 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "github.com/coreos/etcd/store" "net/http" "strconv" @@ -110,11 +111,12 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) return } (*w).WriteHeader(http.StatusInternalServerError) - (*w).Write(newJsonError(300, "No Leader")) + (*w).Write(newJsonError(300, err.Error())) return } else { if body == nil { + fmt.Println("empty but not err!") http.NotFound((*w), req) } else { body, ok := body.([]byte) diff --git a/command.go b/command.go index 4b92bab16..5d01e257a 100644 --- a/command.go +++ b/command.go @@ -120,6 +120,5 @@ func (c *JoinCommand) CommandName() string { func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { err := raftServer.AddPeer(c.Name) addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort) - return []byte("join success"), err } diff --git a/etcd.go b/etcd.go index d8f2d5a7c..83a0178b8 100644 --- a/etcd.go +++ b/etcd.go @@ -249,7 +249,7 @@ func startRaft(securityType int) { err = joinCluster(raftServer, machine) if err != nil { - debug("cannot join to cluster via machine %s", machine) + debug("cannot join to cluster via machine %s %s", machine, err) } else { break } @@ -267,7 +267,7 @@ func startRaft(securityType int) { } // open the snapshot - // go server.Snapshot() + go raftServer.Snapshot() // start to response to raft requests go startRaftTransport(info.RaftPort, securityType) @@ -332,6 +332,7 @@ func startRaftTransport(port int, st int) { http.HandleFunc("/log", GetLogHttpHandler) http.HandleFunc("/log/append", AppendEntriesHttpHandler) http.HandleFunc("/snapshot", SnapshotHttpHandler) + http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) http.HandleFunc("/client", ClientHttpHandler) switch st { @@ -566,6 +567,9 @@ func joinCluster(s *raft.Server, serverName string) error { json.NewEncoder(&b).Encode(command) resp, err = t.Post(fmt.Sprintf("%s/join", address), &b) } else { + b, _ := ioutil.ReadAll(resp.Body) + fmt.Println(string(b)) + resp.Body.Close() return fmt.Errorf("Unable to join") } } diff --git a/raft_handlers.go b/raft_handlers.go index 2a79dece4..cfc207f85 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -61,7 +61,23 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { debug("[recv] POST http://%s/snapshot/ ", raftServer.Name()) - if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil { + if resp := raftServer.SnapshotRequest(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + warn("[Snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +// Response to recover from snapshot request +func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.SnapshotRecoveryRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + debug("[recv] POST http://%s/snapshotRecovery/ ", raftServer.Name()) + if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return diff --git a/transporter.go b/transporter.go index 7e1ac56d8..c4cd0bfc2 100644 --- a/transporter.go +++ b/transporter.go @@ -89,6 +89,28 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r return aersp } +// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. +func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { + var aersp *raft.SnapshotRecoveryResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + debug("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(), + req.LastTerm, req.LastIndex) + + resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b) + + if resp != nil { + defer resp.Body.Close() + aersp = &raft.SnapshotRecoveryResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + + return aersp + } + } + return aersp +} + // Get the client address of the leader in the cluster func (t transporter) GetLeaderClientAddress() string { resp, _ := t.Get(raftServer.Leader() + "/client") From 732f80cedad9bb399d14c2d2e1d13a1811a7c025 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 19 Jul 2013 08:55:40 -0700 Subject: [PATCH 2/4] close experimental snapshot before merge to the master --- etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcd.go b/etcd.go index 83a0178b8..3a2776b43 100644 --- a/etcd.go +++ b/etcd.go @@ -267,7 +267,7 @@ func startRaft(securityType int) { } // open the snapshot - go raftServer.Snapshot() + //go raftServer.Snapshot() // start to response to raft requests go startRaftTransport(info.RaftPort, securityType) From 749a89d5a5e47b5a1df594360abc1d242fc5adbc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 19 Jul 2013 11:56:34 -0700 Subject: [PATCH 3/4] change SnapshotRequest to RequestSnapshot --- raft_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_handlers.go b/raft_handlers.go index cfc207f85..e3acb1c41 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -61,7 +61,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { debug("[recv] POST http://%s/snapshot/ ", raftServer.Name()) - if resp := raftServer.SnapshotRequest(aereq); resp != nil { + if resp := raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return From d2382c232e60689af4bb4c632d809f8dec5ba7fd Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 20 Jul 2013 16:29:27 -0700 Subject: [PATCH 4/4] not hardcode debug scheme --- client_handlers.go | 3 +-- etcd.go | 25 +++++++++++++++---------- raft_handlers.go | 12 ++++++------ transporter.go | 1 - 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 522046e37..6c1330cc4 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "github.com/coreos/etcd/store" "net/http" "strconv" @@ -116,10 +115,10 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) } else { if body == nil { - fmt.Println("empty but not err!") http.NotFound((*w), req) } else { body, ok := body.([]byte) + // this should not happen if !ok { panic("wrong type") } diff --git a/etcd.go b/etcd.go index 3a2776b43..20701f954 100644 --- a/etcd.go +++ b/etcd.go @@ -52,6 +52,8 @@ var ignore bool var maxSize int +var snapshot bool + func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") @@ -75,6 +77,8 @@ func init() { flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node") + flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot") + flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer") } @@ -207,13 +211,15 @@ func startRaft(securityType int) { } // LoadSnapshot - // err = raftServer.LoadSnapshot() + if snapshot { + err = raftServer.LoadSnapshot() - // if err == nil { - // debug("%s finished load snapshot", raftServer.Name()) - // } else { - // debug(err) - // } + if err == nil { + debug("%s finished load snapshot", raftServer.Name()) + } else { + debug(err.Error()) + } + } raftServer.Initialize() raftServer.SetElectionTimeout(ELECTIONTIMTOUT) @@ -267,7 +273,9 @@ func startRaft(securityType int) { } // open the snapshot - //go raftServer.Snapshot() + if snapshot { + go raftServer.Snapshot() + } // start to response to raft requests go startRaftTransport(info.RaftPort, securityType) @@ -567,9 +575,6 @@ func joinCluster(s *raft.Server, serverName string) error { json.NewEncoder(&b).Encode(command) resp, err = t.Post(fmt.Sprintf("%s/join", address), &b) } else { - b, _ := ioutil.ReadAll(resp.Body) - fmt.Println(string(b)) - resp.Body.Close() return fmt.Errorf("Unable to join") } } diff --git a/raft_handlers.go b/raft_handlers.go index e3acb1c41..face1d955 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -13,7 +13,7 @@ import ( // Get all the current logs func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] GET http://%v/log", raftServer.Name()) + debug("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name()) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(raftServer.LogEntries()) @@ -24,7 +24,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) if err == nil { - debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName) + debug("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName) if resp := raftServer.RequestVote(rvreq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -41,7 +41,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries)) + debug("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries)) if resp := raftServer.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -60,7 +60,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST http://%s/snapshot/ ", raftServer.Name()) + debug("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name()) if resp := raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -76,7 +76,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRecoveryRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST http://%s/snapshotRecovery/ ", raftServer.Name()) + debug("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name()) if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -89,7 +89,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { // Get the port that listening for client connecting of the server func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] Get http://%v/client/ ", raftServer.Name()) + debug("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) w.WriteHeader(http.StatusOK) client := hostname + ":" + strconv.Itoa(clientPort) w.Write([]byte(client)) diff --git a/transporter.go b/transporter.go index c4cd0bfc2..460ce4de7 100644 --- a/transporter.go +++ b/transporter.go @@ -104,7 +104,6 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft defer resp.Body.Close() aersp = &raft.SnapshotRecoveryResponse{} if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp } }