From 72c1a6135d191b5b72a5e882c5cf2f3d51a6d33d Mon Sep 17 00:00:00 2001 From: Theo Hultberg Date: Thu, 1 Aug 2013 16:32:05 +0200 Subject: [PATCH 1/3] Fix typo in the readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bfa089908..2f05cea73 100644 --- a/README.md +++ b/README.md @@ -339,8 +339,8 @@ We use -s to specify server port and -c to specify client port and -d to specify Let the join two more nodes to this cluster using the -C argument: ```sh -./etcd -c 4002 -s 7002 -C 127.0.0.1:7001 -d nod/node2 -./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nod/node3 +./etcd -c 4002 -s 7002 -C 127.0.0.1:7001 -d nodes/node2 +./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nodes/node3 ``` Get the machines in the cluster From 93750397422effd15fdef8f378ceb6ca18f434ec Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Aug 2013 22:40:56 +0200 Subject: [PATCH 2/3] update fatal to the same stile as in line 410 --- etcd.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/etcd.go b/etcd.go index 0fb3d2e97..511d70da7 100644 --- a/etcd.go +++ b/etcd.go @@ -423,11 +423,7 @@ func startRaftTransport(port int, st int) { Addr: fmt.Sprintf(":%d", port), } fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port) - err := server.ListenAndServeTLS(serverCertFile, serverKeyFile) - - if err != nil { - log.Fatal(err) - } + log.Fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile), nil) } } @@ -460,11 +456,7 @@ func startClientTransport(port int, st int) { Addr: fmt.Sprintf(":%d", port), } fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort) - err := server.ListenAndServeTLS(clientCertFile, clientKeyFile) - - if err != nil { - fatal(fmt.Sprintln(err)) - } + log.Fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile), nil) } } From 13af54fdd06ddc5ad2d7e2ddccb199c31e271fd1 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Aug 2013 23:28:03 +0200 Subject: [PATCH 3/3] revise log handling All the log-related operations are calling functions in util.go. This makes the consistancy of the logs. --- client_handlers.go | 14 ++++++------- etcd.go | 51 +++++++++++++++++++++++----------------------- raft_handlers.go | 24 +++++++++++----------- store/keywords.go | 2 +- transporter.go | 12 +++++------ util.go | 23 +++++++++++++++++---- 6 files changed, 70 insertions(+), 56 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index bfa0c08d5..b2a893330 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -43,7 +43,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { return } - debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key) value := req.FormValue("value") @@ -90,7 +90,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key) command := &DeleteCommand{} command.Key = key @@ -170,7 +170,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) url = scheme + raftServer.Leader() + path } - debug("Redirect to %s", url) + debugf("Redirect to %s", url) http.Redirect(*w, req, url, http.StatusTemporaryRedirect) return @@ -229,7 +229,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key) command := &GetCommand{} command.Key = key @@ -266,13 +266,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { command.Key = key if req.Method == "GET" { - debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key) + debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key) command.SinceIndex = 0 } else if req.Method == "POST" { // watch from a specific index - debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key) + debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key) content := req.FormValue("index") sinceIndex, err := strconv.ParseUint(string(content), 10, 64) @@ -288,7 +288,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } if body, err := command.Apply(raftServer); err != nil { - warn("Unable to do watch command: %v", err) + warnf("Unable to do watch command: %v", err) w.WriteHeader(http.StatusInternalServerError) } else { w.WriteHeader(http.StatusOK) diff --git a/etcd.go b/etcd.go index 511d70da7..ea6f3ecaa 100644 --- a/etcd.go +++ b/etcd.go @@ -12,7 +12,6 @@ import ( "github.com/coreos/etcd/web" "github.com/coreos/go-raft" "io/ioutil" - "log" "net" "net/http" "os" @@ -169,7 +168,7 @@ func main() { if cpuprofile != "" { f, err := os.Create(cpuprofile) if err != nil { - log.Fatal(err) + fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() @@ -196,7 +195,7 @@ func main() { } else if machinesFile != "" { b, err := ioutil.ReadFile(machinesFile) if err != nil { - fatal("Unable to read the given machines file: %s", err) + fatalf("Unable to read the given machines file: %s", err) } cluster = strings.Split(string(b), ",") } @@ -206,7 +205,7 @@ func main() { // Read server info from file or grab it from user. if err := os.MkdirAll(dirPath, 0744); err != nil { - fatal("Unable to create path: %s", err) + fatalf("Unable to create path: %s", err) } info = getInfo(dirPath) @@ -249,7 +248,7 @@ func startRaft(securityType int) { raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) if err != nil { - fatal(fmt.Sprintln(err)) + fatal(err) } // LoadSnapshot @@ -257,9 +256,9 @@ func startRaft(securityType int) { err = raftServer.LoadSnapshot() if err == nil { - debug("%s finished load snapshot", raftServer.Name()) + debugf("%s finished load snapshot", raftServer.Name()) } else { - debug(err.Error()) + debug(err) } } @@ -290,7 +289,7 @@ func startRaft(securityType int) { break } } - debug("%s start as a leader", raftServer.Name()) + debugf("%s start as a leader", raftServer.Name()) // start as a follower in a existing cluster } else { @@ -310,7 +309,7 @@ func startRaft(securityType int) { fmt.Println(err) os.Exit(1) } - debug("cannot join to cluster via machine %s %s", machine, err) + debugf("cannot join to cluster via machine %s %s", machine, err) } else { success = true break @@ -321,18 +320,18 @@ func startRaft(securityType int) { break } - warn("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL) + warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL) time.Sleep(time.Second * RETRYINTERVAL) } if err != nil { - fatal("Cannot join the cluster via given machines after %x retries", retryTimes) + fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) } - debug("%s success join to the cluster", raftServer.Name()) + debugf("%s success join to the cluster", raftServer.Name()) } } else { // rejoin the previous cluster - debug("%s restart as a follower", raftServer.Name()) + debugf("%s restart as a follower", raftServer.Name()) } // open the snapshot @@ -368,7 +367,7 @@ func createTransporter(st int) transporter { tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile) if err != nil { - fatal(fmt.Sprintln(err)) + fatal(err) } tr := &http.Transport{ @@ -407,11 +406,11 @@ func startRaftTransport(port int, st int) { case HTTP: fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) + fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) case HTTPS: fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port) - log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil)) + fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil)) case HTTPSANDVERIFY: @@ -423,7 +422,7 @@ func startRaftTransport(port int, st int) { Addr: fmt.Sprintf(":%d", port), } fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port) - log.Fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile), nil) + fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile)) } } @@ -440,7 +439,7 @@ func startClientTransport(port int, st int) { case HTTP: fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) + fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) case HTTPS: fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort) @@ -456,7 +455,7 @@ func startClientTransport(port int, st int) { Addr: fmt.Sprintf(":%d", port), } fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort) - log.Fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile), nil) + fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile)) } } @@ -529,10 +528,10 @@ func getInfo(path string) *Info { if file, err := os.Open(infoPath); err == nil { if content, err := ioutil.ReadAll(file); err != nil { - fatal("Unable to read info: %v", err) + fatalf("Unable to read info: %v", err) } else { if err = json.Unmarshal(content, &info); err != nil { - fatal("Unable to parse info: %v", err) + fatalf("Unable to parse info: %v", err) } } file.Close() @@ -564,7 +563,7 @@ func getInfo(path string) *Info { content, _ := json.Marshal(info) content = []byte(string(content) + "\n") if err := ioutil.WriteFile(infoPath, content, 0644); err != nil { - fatal("Unable to write info to file: %v", err) + fatalf("Unable to write info to file: %v", err) } } @@ -580,7 +579,7 @@ func createCertPool(CAFile string) *x509.CertPool { cert, err := x509.ParseCertificate(block.Bytes) if err != nil { - fatal(fmt.Sprintln(err)) + fatal(err) } certPool := x509.NewCertPool() @@ -609,7 +608,7 @@ func joinCluster(s *raft.Server, serverName string) error { panic("wrong type") } - debug("Send Join Request to %s", serverName) + debugf("Send Join Request to %s", serverName) resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b) @@ -624,8 +623,8 @@ func joinCluster(s *raft.Server, serverName string) error { } if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") - debug("Leader is %s", address) - debug("Send Join Request to %s", address) + debugf("Leader is %s", address) + debugf("Send Join Request to %s", address) json.NewEncoder(&b).Encode(command) resp, err = t.Post(fmt.Sprintf("%s/join", address), &b) } else if resp.StatusCode == http.StatusBadRequest { diff --git a/raft_handlers.go b/raft_handlers.go index face1d955..dc8003147 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -13,7 +13,7 @@ import ( // Get all the current logs func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name()) + debugf("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name()) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(raftServer.LogEntries()) @@ -24,14 +24,14 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) if err == nil { - debug("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName) + debugf("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName) if resp := raftServer.RequestVote(rvreq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return } } - warn("[vote] ERROR: %v", err) + warnf("[vote] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) } @@ -41,17 +41,17 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries)) + debugf("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries)) if resp := raftServer.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) if !resp.Success { - debug("[Append Entry] Step back") + debugf("[Append Entry] Step back") } return } } - warn("[Append Entry] ERROR: %v", err) + warnf("[Append Entry] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) } @@ -60,14 +60,14 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name()) + debugf("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name()) if resp := raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return } } - warn("[Snapshot] ERROR: %v", err) + warnf("[Snapshot] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) } @@ -76,20 +76,20 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRecoveryRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debug("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name()) + debugf("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name()) if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) return } } - warn("[Snapshot] ERROR: %v", err) + warnf("[Snapshot] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) } // Get the port that listening for client connecting of the server func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) + debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) w.WriteHeader(http.StatusOK) client := hostname + ":" + strconv.Itoa(clientPort) w.Write([]byte(client)) @@ -101,7 +101,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { - debug("Receive Join Request from %s", command.Name) + debugf("Receive Join Request from %s", command.Name) dispatch(command, &w, req, false) } else { w.WriteHeader(http.StatusInternalServerError) diff --git a/store/keywords.go b/store/keywords.go index c946a5e7d..2e4ceb75b 100644 --- a/store/keywords.go +++ b/store/keywords.go @@ -8,7 +8,7 @@ import ( // keywords for internal useage // Key for string keyword; Value for only checking prefix var keywords = map[string]bool{ - "/_etcd": true, + "/_etcd": true, "/ephemeralNodes": true, } diff --git a/transporter.go b/transporter.go index 460ce4de7..012f53171 100644 --- a/transporter.go +++ b/transporter.go @@ -23,12 +23,12 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debug("Send LogEntries to %s ", peer.Name()) + debugf("Send LogEntries to %s ", peer.Name()) resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b) if err != nil { - debug("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err) + debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err) } if resp != nil { @@ -48,12 +48,12 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debug("Send Vote to %s", peer.Name()) + debugf("Send Vote to %s", peer.Name()) resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b) if err != nil { - debug("Cannot send VoteRequest to %s : %s", peer.Name(), err) + debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err) } if resp != nil { @@ -73,7 +73,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(), + debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(), req.LastTerm, req.LastIndex) resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b) @@ -95,7 +95,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debug("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(), + debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(), req.LastTerm, req.LastIndex) resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b) diff --git a/util.go b/util.go index ced4b088d..d82dea498 100644 --- a/util.go +++ b/util.go @@ -31,7 +31,7 @@ func webHelper() { func decodeJsonRequest(req *http.Request, data interface{}) error { decoder := json.NewDecoder(req.Body) if err := decoder.Decode(&data); err != nil && err != io.EOF { - warn("Malformed json request: %v", err) + warnf("Malformed json request: %v", err) return fmt.Errorf("Malformed json request: %v", err) } return nil @@ -57,17 +57,32 @@ func init() { logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds) } -func debug(msg string, v ...interface{}) { +func debugf(msg string, v ...interface{}) { if verbose { logger.Printf("DEBUG "+msg+"\n", v...) } } -func warn(msg string, v ...interface{}) { +func debug(v ...interface{}) { + if verbose { + logger.Println("DEBUG " + fmt.Sprint(v...)) + } +} + +func warnf(msg string, v ...interface{}) { logger.Printf("WARN "+msg+"\n", v...) } -func fatal(msg string, v ...interface{}) { +func warn(v ...interface{}) { + logger.Println("WARN " + fmt.Sprint(v...)) +} + +func fatalf(msg string, v ...interface{}) { logger.Printf("FATAL "+msg+"\n", v...) os.Exit(1) } + +func fatal(v ...interface{}) { + logger.Println("FATAL " + fmt.Sprint(v...)) + os.Exit(1) +}