From 64eeca39412d7d6fd0fbc08eeead23d14286cccc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 10 Aug 2013 23:38:35 -0700 Subject: [PATCH 01/12] add snpshot --- snapshot.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 snapshot.go diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 000000000..16a3d8d19 --- /dev/null +++ b/snapshot.go @@ -0,0 +1,34 @@ +package main + +import ( + "time" + "fmt" +) + +type snapshotConf struct { + // basic + checkingInterval time.Duration + lastWrites uint64 + writesThr uint64 +} + +var snapConf *snapshotConf + +func newSnapshotConf() *snapshotConf { + return &snapshotConf {time.Second*3, etcdStore.TotalWrites(), 20*1000} +} + +func monitorSnapshot() { + for { + time.Sleep(snapConf.checkingInterval) + currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites + + if currentWrites > snapConf.writesThr { + raftServer.TakeSnapshot() + snapConf.lastWrites = etcdStore.TotalWrites() + + } else { + fmt.Println(currentWrites) + } + } +} \ No newline at end of file From 434b0045db750a47373bad7e61b1d6ce0fbe61df Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 10 Aug 2013 23:37:26 -0700 Subject: [PATCH 02/12] add snapshot --- etcd.go | 3 ++- etcd_handlers.go | 1 + store/stats.go | 10 +++++++++- third_party/github.com/coreos/go-raft/server.go | 11 +---------- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/etcd.go b/etcd.go index 9367def86..50acc282a 100644 --- a/etcd.go +++ b/etcd.go @@ -234,6 +234,7 @@ func main() { // Create etcd key-value store etcdStore = store.CreateStore(maxSize) + snapConf = newSnapshotConf() startRaft(raftTLSConfig) @@ -346,7 +347,7 @@ func startRaft(tlsConfig TLSConfig) { // open the snapshot if snapshot { - go raftServer.Snapshot() + go monitorSnapshot() } // start to response to raft requests diff --git a/etcd_handlers.go b/etcd_handlers.go index 704cb811a..3936bf493 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -109,6 +109,7 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { if raftServer.State() == "leader" { if body, err := raftServer.Do(c); err != nil { + if _, ok := err.(store.NotFoundError); ok { (*w).WriteHeader(http.StatusNotFound) (*w).Write(newJsonError(100, err.Error())) diff --git a/store/stats.go b/store/stats.go index 15b71e06e..3270ae84a 100644 --- a/store/stats.go +++ b/store/stats.go @@ -18,8 +18,16 @@ type EtcdStats struct { TestAndSets uint64 `json:"testAndSets"` } -// Stats returns the basic statistics information of etcd storage +// Stats returns the basic statistics information of etcd storage since its recent start func (s *Store) Stats() []byte { b, _ := json.Marshal(s.BasicStats) return b } + +// TotalWrites returns the total write operations +// It helps with snapshot +func (s *Store) TotalWrites() uint64 { + bs := s.BasicStats + + return bs.Deletes + bs.Sets + bs.TestAndSets +} \ No newline at end of file diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index ab6aaba9f..fbf5c94b2 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -1025,16 +1025,7 @@ func (s *Server) RemovePeer(name string) error { // Log compaction //-------------------------------------- -// The background snapshot function -func (s *Server) Snapshot() { - for { - // TODO: change this... to something reasonable - time.Sleep(1 * time.Second) - s.takeSnapshot() - } -} - -func (s *Server) takeSnapshot() error { +func (s *Server) TakeSnapshot() error { //TODO put a snapshot mutex s.debugln("take Snapshot") if s.currentSnapshot != nil { From d3649d3254e37907f4e564938dcc8cccf5ab662c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 09:48:12 -0700 Subject: [PATCH 03/12] gofmt --- etcd.go | 4 ++-- etcd_handlers.go | 2 +- snapshot.go | 14 +++++++------- store/stats.go | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/etcd.go b/etcd.go index 50acc282a..042663629 100644 --- a/etcd.go +++ b/etcd.go @@ -364,7 +364,7 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter { t.scheme = scheme tr := &http.Transport{ - Dial: dialTimeout, + Dial: dialTimeout, } if scheme == "https" { @@ -594,7 +594,7 @@ func joinCluster(s *raft.Server, raftURL string) error { joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"} debugf("Send Join Request to %s", raftURL) - + resp, err := t.Post(joinURL.String(), &b) for { diff --git a/etcd_handlers.go b/etcd_handlers.go index 3936bf493..b391f2ada 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -109,7 +109,7 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { if raftServer.State() == "leader" { if body, err := raftServer.Do(c); err != nil { - + if _, ok := err.(store.NotFoundError); ok { (*w).WriteHeader(http.StatusNotFound) (*w).Write(newJsonError(100, err.Error())) diff --git a/snapshot.go b/snapshot.go index 16a3d8d19..6697b0203 100644 --- a/snapshot.go +++ b/snapshot.go @@ -1,25 +1,25 @@ package main import ( - "time" "fmt" + "time" ) type snapshotConf struct { - // basic + // basic checkingInterval time.Duration - lastWrites uint64 - writesThr uint64 + lastWrites uint64 + writesThr uint64 } var snapConf *snapshotConf func newSnapshotConf() *snapshotConf { - return &snapshotConf {time.Second*3, etcdStore.TotalWrites(), 20*1000} + return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000} } func monitorSnapshot() { - for { + for { time.Sleep(snapConf.checkingInterval) currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites @@ -31,4 +31,4 @@ func monitorSnapshot() { fmt.Println(currentWrites) } } -} \ No newline at end of file +} diff --git a/store/stats.go b/store/stats.go index 3270ae84a..b57f4db3d 100644 --- a/store/stats.go +++ b/store/stats.go @@ -30,4 +30,4 @@ func (s *Store) TotalWrites() uint64 { bs := s.BasicStats return bs.Deletes + bs.Sets + bs.TestAndSets -} \ No newline at end of file +} From e3dae8fcf977e8e346e59b8e193231aad246bfa7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 09:53:02 -0700 Subject: [PATCH 04/12] do not print out debug info when testing --- test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.go b/test.go index 59e505559..157e33303 100644 --- a/test.go +++ b/test.go @@ -70,7 +70,7 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"} + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"} if ssl { argGroup[i] = append(argGroup[i], sslServer1...) } From 1124fe21a0fea76e782f8ed4e5747d90146f4881 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 10:18:40 -0700 Subject: [PATCH 05/12] cleaning up --- etcd.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/etcd.go b/etcd.go index 042663629..1ad853215 100644 --- a/etcd.go +++ b/etcd.go @@ -87,14 +87,14 @@ func init() { } const ( - ELECTIONTIMEOUT = 200 * time.Millisecond - HEARTBEATTIMEOUT = 50 * time.Millisecond + ElectionTimeout = 200 * time.Millisecond + HeartbeatTimeout = 50 * time.Millisecond // Timeout for internal raft http connection // The original timeout for http is 45 seconds // which is too long for our usage. - HTTPTIMEOUT = 10 * time.Second - RETRYINTERVAL = 10 + HTTPTimeout = 10 * time.Second + RetryInterval = 10 ) //------------------------------------------------------------------------------ @@ -120,6 +120,12 @@ type Info struct { EtcdTLS TLSInfo `json:"etcdTLS"` } +type TLSConfig struct { + Scheme string + Server tls.Config + Client tls.Config +} + //------------------------------------------------------------------------------ // // Variables @@ -276,8 +282,8 @@ func startRaft(tlsConfig TLSConfig) { } } - raftServer.SetElectionTimeout(ELECTIONTIMEOUT) - raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT) + raftServer.SetElectionTimeout(ElectionTimeout) + raftServer.SetHeartbeatTimeout(HeartbeatTimeout) raftServer.Start() @@ -331,8 +337,8 @@ func startRaft(tlsConfig TLSConfig) { break } - warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL) - time.Sleep(time.Second * RETRYINTERVAL) + warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) } if err != nil { fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) @@ -379,7 +385,7 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter { // Dial with timeout func dialTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, HTTPTIMEOUT) + return net.DialTimeout(network, addr, HTTPTimeout) } // Start to listen and response raft command @@ -446,12 +452,6 @@ func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) { // Config //-------------------------------------- -type TLSConfig struct { - Scheme string - Server tls.Config - Client tls.Config -} - func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) { var keyFile, certFile, CAFile string var tlsCert tls.Certificate @@ -550,7 +550,11 @@ func getInfo(path string) *Info { return info } -// Create client auth certpool +// newCertPool creates x509 certPool and corresponding Auth Type. +// If the given CAfile is valid, add the cert into the pool and verify the clients' +// certs against the cert in the pool. +// If the given CAfile is empty, do not verify the clients' cert. +// If the given CAfile is not valid, fatal. func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { if CAFile == "" { return tls.NoClientCert, nil From fa6c8f4f1829437e9697cafd44deb486bd0adcbe Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 11:04:15 -0700 Subject: [PATCH 06/12] fix naming in long_test.go --- etcd_long_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etcd_long_test.go b/etcd_long_test.go index 1589bd153..15dbcc185 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -57,8 +57,8 @@ func TestKillLeader(t *testing.T) { totalTime += take avgTime := totalTime / (time.Duration)(i+1) - fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMEOUT) - fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMEOUT) + fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout) + fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout) etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr) } } From 6120fa634ee9f8b15d2e915e0d64d50ec3c23fed Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 11:40:45 -0700 Subject: [PATCH 07/12] remove duplicate codes --- etcd_handlers.go | 4 ++-- machines.go | 14 -------------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index b391f2ada..dec24031e 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -218,14 +218,14 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { // Add itself to the machine list first // Since peer map does not contain the server itself - machines, _ := getEtcdURL(raftServer.Name()) + machines, _ := nameToEtcdURL(raftServer.Name()) // Add all peers to the list and separate by comma // We do not use json here since we accept machines list // in the command line separate by comma. for peerName, _ := range peers { - if addr, ok := getEtcdURL(peerName); ok { + if addr, ok := nameToEtcdURL(peerName); ok { machines = machines + "," + addr } } diff --git a/machines.go b/machines.go index 0b6681f3f..3b4fc39db 100644 --- a/machines.go +++ b/machines.go @@ -5,20 +5,6 @@ import ( "path" ) -func getEtcdURL(name string) (string, bool) { - resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - - m, err := url.ParseQuery(resps[0].Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - addr := m["etcd"][0] - - return addr, true -} - // machineNum returns the number of machines in the cluster func machineNum() int { response, _ := etcdStore.RawGet("_etcd/machines") From 8f3e6f340f5f607416aee51968f818cb410e6ba9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 11:42:38 -0700 Subject: [PATCH 08/12] remove duplicate codes --- machines.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/machines.go b/machines.go index 3b4fc39db..f19ed31db 100644 --- a/machines.go +++ b/machines.go @@ -1,10 +1,5 @@ package main -import ( - "net/url" - "path" -) - // machineNum returns the number of machines in the cluster func machineNum() int { response, _ := etcdStore.RawGet("_etcd/machines") From 8e48a20c85dc9c496f1300ea928f4dcf623215a6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 11 Aug 2013 11:56:18 -0700 Subject: [PATCH 09/12] clean up trans.go --- etcd.go | 8 +++----- raft_handlers.go | 22 +++++++++++++++------- transporter.go | 2 -- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/etcd.go b/etcd.go index 1ad853215..c05b357e8 100644 --- a/etcd.go +++ b/etcd.go @@ -320,7 +320,7 @@ func startRaft(tlsConfig TLSConfig) { if len(machine) == 0 { continue } - err = joinCluster(raftServer, machine) + err = joinCluster(raftServer, machine, tlsConfig.Scheme) if err != nil { if err.Error() == errors[103] { fmt.Println(err) @@ -367,8 +367,6 @@ func startRaft(tlsConfig TLSConfig) { func newTransporter(scheme string, tlsConf tls.Config) transporter { t := transporter{} - t.scheme = scheme - tr := &http.Transport{ Dial: dialTimeout, } @@ -577,7 +575,7 @@ func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { } // Send join requests to the leader. -func joinCluster(s *raft.Server, raftURL string) error { +func joinCluster(s *raft.Server, raftURL string, scheme string) error { var b bytes.Buffer command := &JoinCommand{ @@ -595,7 +593,7 @@ func joinCluster(s *raft.Server, raftURL string) error { panic("wrong type") } - joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"} + joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} debugf("Send Join Request to %s", raftURL) diff --git a/raft_handlers.go b/raft_handlers.go index 8f52e4e30..d452b3e53 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -12,7 +12,8 @@ import ( // Get all the current logs func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name()) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] GET %s/log", u) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(raftServer.LogEntries()) @@ -23,7 +24,8 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) if err == nil { - debugf("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] POST %s/vote [%s]", u, rvreq.CandidateName) if resp := raftServer.RequestVote(rvreq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -40,7 +42,8 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { - debugf("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries)) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] POST %s/log/append [%d]", u, len(aereq.Entries)) if resp := raftServer.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -59,7 +62,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debugf("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name()) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] POST %s/snapshot/ ", u) if resp := raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -75,7 +79,8 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRecoveryRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - debugf("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name()) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] POST %s/snapshotRecovery/ ", u) if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -88,7 +93,8 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { // Get the port that listening for etcd connecting of the server func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name()) + u, _ := nameToRaftURL(raftServer.Name()) + debugf("[recv] Get %s/etcdURL/ ", u) w.WriteHeader(http.StatusOK) w.Write([]byte(argInfo.EtcdURL)) } @@ -109,7 +115,9 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { // Response to the name request func NameHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name()) + u, _ := nameToRaftURL(raftServer.Name()) + + debugf("[recv] Get %s/name/ ", u) w.WriteHeader(http.StatusOK) w.Write([]byte(raftServer.Name())) } diff --git a/transporter.go b/transporter.go index 211774e5a..59a385bb3 100644 --- a/transporter.go +++ b/transporter.go @@ -12,8 +12,6 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { client *http.Client - // scheme - scheme string } // Sends AppendEntries RPCs to a peer when the server is the leader. From 32d1681b6bf9c076387bd0527005f4ad5f30fe65 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 12 Aug 2013 10:16:30 -0700 Subject: [PATCH 10/12] change name to url in http handlers --- etcd_handlers.go | 25 +++++-------------------- raft_handlers.go | 22 +++++++--------------- util.go | 20 ++++++++++++++++++++ 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index dec24031e..bdac83309 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -5,7 +5,6 @@ import ( "github.com/coreos/etcd/store" "net/http" "strconv" - "time" ) //------------------------------------------------------------------- @@ -45,7 +44,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { return } - debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] POST %v/v1/keys/%s", info.EtcdURL, key) value := req.FormValue("value") @@ -96,7 +95,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] DELETE %v/v1/keys/%s", info.EtcdURL, key) command := &DeleteCommand{ Key: key, @@ -251,7 +250,7 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) { func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] GET %s/v1/keys/%s", info.EtcdURL, key) command := &GetCommand{ Key: key, @@ -290,13 +289,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } if req.Method == "GET" { - debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key) + debugf("[recv] GET %s/watch/%s", info.EtcdURL, key) command.SinceIndex = 0 } else if req.Method == "POST" { // watch from a specific index - debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key) + debugf("[recv] POST %s/watch/%s", info.EtcdURL, key) content := req.FormValue("index") sinceIndex, err := strconv.ParseUint(string(content), 10, 64) @@ -340,17 +339,3 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusBadRequest) } - -// Convert string duration to time format -func durationToExpireTime(strDuration string) (time.Time, error) { - if strDuration != "" { - duration, err := strconv.Atoi(strDuration) - - if err != nil { - return time.Unix(0, 0), err - } - return time.Now().Add(time.Second * (time.Duration)(duration)), nil - } else { - return time.Unix(0, 0), nil - } -} diff --git a/raft_handlers.go b/raft_handlers.go index d452b3e53..ecdbb0172 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -12,8 +12,7 @@ import ( // Get all the current logs func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] GET %s/log", u) + debugf("[recv] GET %s/log", info.RaftURL) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(raftServer.LogEntries()) @@ -24,8 +23,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) if err == nil { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] POST %s/vote [%s]", u, rvreq.CandidateName) + debugf("[recv] POST %s/vote [%s]", info.RaftURL, rvreq.CandidateName) if resp := raftServer.RequestVote(rvreq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -42,8 +40,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] POST %s/log/append [%d]", u, len(aereq.Entries)) + debugf("[recv] POST %s/log/append [%d]", info.RaftURL, len(aereq.Entries)) if resp := raftServer.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -62,8 +59,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] POST %s/snapshot/ ", u) + debugf("[recv] POST %s/snapshot/ ", info.RaftURL) if resp := raftServer.RequestSnapshot(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -79,8 +75,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRecoveryRequest{} err := decodeJsonRequest(req, aereq) if err == nil { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] POST %s/snapshotRecovery/ ", u) + debugf("[recv] POST %s/snapshotRecovery/ ", info.RaftURL) if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) @@ -93,8 +88,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { // Get the port that listening for etcd connecting of the server func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - u, _ := nameToRaftURL(raftServer.Name()) - debugf("[recv] Get %s/etcdURL/ ", u) + debugf("[recv] Get %s/etcdURL/ ", info.RaftURL) w.WriteHeader(http.StatusOK) w.Write([]byte(argInfo.EtcdURL)) } @@ -115,9 +109,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { // Response to the name request func NameHttpHandler(w http.ResponseWriter, req *http.Request) { - u, _ := nameToRaftURL(raftServer.Name()) - - debugf("[recv] Get %s/name/ ", u) + debugf("[recv] Get %s/name/ ", info.RaftURL) w.WriteHeader(http.StatusOK) w.Write([]byte(raftServer.Name())) } diff --git a/util.go b/util.go index 716a0854c..29b452126 100644 --- a/util.go +++ b/util.go @@ -8,8 +8,28 @@ import ( "log" "net/http" "os" + "strconv" + "time" ) +//-------------------------------------- +// etcd http Helper +//-------------------------------------- + +// Convert string duration to time format +func durationToExpireTime(strDuration string) (time.Time, error) { + if strDuration != "" { + duration, err := strconv.Atoi(strDuration) + + if err != nil { + return time.Unix(0, 0), err + } + return time.Now().Add(time.Second * (time.Duration)(duration)), nil + } else { + return time.Unix(0, 0), nil + } +} + //-------------------------------------- // Web Helper //-------------------------------------- From 969c8ba8ca58169730dc1ebcb779a71562327b12 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 12 Aug 2013 10:29:50 -0700 Subject: [PATCH 11/12] log remoteAddr in etcdHttpHandler --- etcd_handlers.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index bdac83309..1c45d3487 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" "net/http" "strconv" ) @@ -44,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { return } - debugf("[recv] POST %v/v1/keys/%s", info.EtcdURL, key) + debugf("[recv] POST %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr) value := req.FormValue("value") @@ -95,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] DELETE %v/v1/keys/%s", info.EtcdURL, key) + debugf("[recv] DELETE %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr) command := &DeleteCommand{ Key: key, @@ -106,7 +107,7 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { // Dispatch the command to leader func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { - if raftServer.State() == "leader" { + if raftServer.State() == raft.Leader { if body, err := raftServer.Do(c); err != nil { if _, ok := err.(store.NotFoundError); ok { @@ -162,12 +163,6 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { path := req.URL.Path - var scheme string - - if scheme = req.URL.Scheme; scheme == "" { - scheme = "http://" - } - var url string if etcd { @@ -217,7 +212,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { // Add itself to the machine list first // Since peer map does not contain the server itself - machines, _ := nameToEtcdURL(raftServer.Name()) + machines := info.EtcdURL // Add all peers to the list and separate by comma // We do not use json here since we accept machines list @@ -250,7 +245,7 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) { func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] GET %s/v1/keys/%s", info.EtcdURL, key) + debugf("[recv] GET %s/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr) command := &GetCommand{ Key: key, @@ -289,13 +284,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } if req.Method == "GET" { - debugf("[recv] GET %s/watch/%s", info.EtcdURL, key) + debugf("[recv] GET %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr) command.SinceIndex = 0 } else if req.Method == "POST" { // watch from a specific index - debugf("[recv] POST %s/watch/%s", info.EtcdURL, key) + debugf("[recv] POST %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr) content := req.FormValue("index") sinceIndex, err := strconv.ParseUint(string(content), 10, 64) From 58e9e0c5578a1919bf1f055aab561657ac857496 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 12 Aug 2013 10:41:44 -0700 Subject: [PATCH 12/12] add comments in snapshot.go --- snapshot.go | 16 +++++++++------- util.go | 1 + 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/snapshot.go b/snapshot.go index 6697b0203..559e1bc64 100644 --- a/snapshot.go +++ b/snapshot.go @@ -1,20 +1,25 @@ package main import ( - "fmt" "time" ) +// basic conf. +// TODO: find a good policy to do snapshot type snapshotConf struct { - // basic + // Etcd will check if snapshot is need every checkingInterval checkingInterval time.Duration - lastWrites uint64 - writesThr uint64 + // The number of writes when the last snapshot happened + lastWrites uint64 + // If the incremental number of writes since the last snapshot + // exceeds the write Threshold, etcd will do a snapshot + writesThr uint64 } var snapConf *snapshotConf func newSnapshotConf() *snapshotConf { + // check snapshot every 3 seconds and the threshold is 20K return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000} } @@ -26,9 +31,6 @@ func monitorSnapshot() { if currentWrites > snapConf.writesThr { raftServer.TakeSnapshot() snapConf.lastWrites = etcdStore.TotalWrites() - - } else { - fmt.Println(currentWrites) } } } diff --git a/util.go b/util.go index 29b452126..e57dfca59 100644 --- a/util.go +++ b/util.go @@ -25,6 +25,7 @@ func durationToExpireTime(strDuration string) (time.Time, error) { return time.Unix(0, 0), err } return time.Now().Add(time.Second * (time.Duration)(duration)), nil + } else { return time.Unix(0, 0), nil }