From a97590ff50858f64cce77f94d7fcc1952f80017b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 17:19:45 -0700 Subject: [PATCH 01/22] basic stats --- command.go | 9 ++++++--- etcd.go | 5 +++-- etcd_handlers.go | 8 +++++--- etcd_test.go | 5 +++-- raft_handlers.go | 3 ++- raft_server.go | 31 +++++++++++++++++-------------- store/store.go | 3 ++- transporter.go | 15 ++++++++++++++- util.go | 3 ++- 9 files changed, 54 insertions(+), 28 deletions(-) diff --git a/command.go b/command.go index b9c3a83f6..5259311fc 100644 --- a/command.go +++ b/command.go @@ -4,12 +4,13 @@ import ( "encoding/binary" "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "os" "path" "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) const commandPrefix = "etcd:" @@ -168,6 +169,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) + r.peersStats[c.Name] = &peerStats{} return b, err } @@ -193,6 +195,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) + r.peersStats[c.Name] = nil if err != nil { return []byte{0}, err diff --git a/etcd.go b/etcd.go index 39743d5ab..6b6c0dc35 100644 --- a/etcd.go +++ b/etcd.go @@ -3,12 +3,13 @@ package main import ( "crypto/tls" "flag" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "io/ioutil" "os" "strings" "time" + + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) //------------------------------------------------------------------------------ diff --git a/etcd_handlers.go b/etcd_handlers.go index 4c4673983..dcd9486fe 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -2,12 +2,13 @@ package main import ( "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" "net/http" "strconv" "strings" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) //------------------------------------------------------------------- @@ -207,6 +208,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) w.Write(etcdStore.Stats()) + w.Write(r.Stats()) return nil } diff --git a/etcd_test.go b/etcd_test.go index e61e7e4a8..caa6af84d 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "github.com/coreos/etcd/test" - "github.com/coreos/go-etcd/etcd" "math/rand" "net/http" "net/http/httptest" @@ -13,6 +11,9 @@ import ( "strings" "testing" "time" + + "github.com/coreos/etcd/test" + "github.com/coreos/go-etcd/etcd" ) // Create a single node and try to set value diff --git a/raft_handlers.go b/raft_handlers.go index 8ae9d2f87..1a92560ec 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -2,8 +2,9 @@ package main import ( "encoding/json" - "github.com/coreos/go-raft" "net/http" + + "github.com/coreos/go-raft" ) //------------------------------------------------------------- diff --git a/raft_server.go b/raft_server.go index a90c65f93..2cf5a1735 100644 --- a/raft_server.go +++ b/raft_server.go @@ -6,22 +6,24 @@ import ( "encoding/binary" "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/go-raft" "io/ioutil" "net/http" "net/url" "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" ) type raftServer struct { *raft.Server - version string - joinIndex uint64 - name string - url string - tlsConf *TLSConfig - tlsInfo *TLSInfo + version string + joinIndex uint64 + name string + url string + tlsConf *TLSConfig + tlsInfo *TLSInfo + peersStats map[string]*peerStats } var r *raftServer @@ -37,12 +39,13 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo check(err) return &raftServer{ - Server: server, - version: raftVersion, - name: name, - url: url, - tlsConf: tlsConf, - tlsInfo: tlsInfo, + Server: server, + version: raftVersion, + name: name, + url: url, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + peersStats: make(map[string]*peerStats), } } diff --git a/store/store.go b/store/store.go index d37345f4d..e3d62b9d1 100644 --- a/store/store.go +++ b/store/store.go @@ -3,11 +3,12 @@ package store import ( "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" "path" "strconv" "sync" "time" + + etcdErr "github.com/coreos/etcd/error" ) //------------------------------------------------------------------------------ diff --git a/transporter.go b/transporter.go index c49479bc8..7e67a69eb 100644 --- a/transporter.go +++ b/transporter.go @@ -5,10 +5,12 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/coreos/go-raft" "io" "net" "net/http" + "time" + + "github.com/coreos/go-raft" ) // Transporter layer for communication between raft nodes @@ -50,12 +52,23 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe u, _ := nameToRaftURL(peer.Name) debugf("Send LogEntries to %s ", u) + thisPeerStats := r.peersStats[peer.Name] + + start := time.Now() + resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) + end := time.Now() + if err != nil { debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) + thisPeerStats.Failcnt++ + } else { + thisPeerStats.Latency = float64(end.Sub(start)) / (1000000.0) } + r.peersStats[peer.Name] = thisPeerStats + if resp != nil { defer resp.Body.Close() aersp = &raft.AppendEntriesResponse{} diff --git a/util.go b/util.go index 5f86cbaa9..a7745b0d5 100644 --- a/util.go +++ b/util.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "fmt" - "github.com/coreos/etcd/web" "io" "log" "net" @@ -14,6 +13,8 @@ import ( "runtime/pprof" "strconv" "time" + + "github.com/coreos/etcd/web" ) //-------------------------------------- From 7a9fae95307d400468b8360e826753e3861b96f5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 17:22:33 -0700 Subject: [PATCH 02/22] better warn on restart the entire cluster --- raft_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft_server.go b/raft_server.go index 2cf5a1735..9009f64e0 100644 --- a/raft_server.go +++ b/raft_server.go @@ -94,7 +94,7 @@ func (r *raftServer) ListenAndServe() { } ok := joinCluster(cluster) if !ok { - warn("the whole cluster dies! restart the cluster") + warn("the entire cluster is down! this machine will restart the cluster.") } debugf("%s restart as a follower", r.name) From e4b164c32482291482394dc059d00b7d5e9518fb Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 17:40:13 -0700 Subject: [PATCH 03/22] add avg --- raft_stats.go | 17 +++++++++++++++++ transporter.go | 5 ++++- 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 raft_stats.go diff --git a/raft_stats.go b/raft_stats.go new file mode 100644 index 000000000..2f15a7a80 --- /dev/null +++ b/raft_stats.go @@ -0,0 +1,17 @@ +package main + +import ( + "encoding/json" +) + +type peerStats struct { + Latency float64 `json:"latency"` + AvgLatency float64 `json:"averageLatency"` + FailCnt uint64 `json:"failsCount"` + SuccCnt uint64 `json:"successCount"` +} + +func (r *raftServer) Stats() []byte { + b, _ := json.Marshal(r.peersStats) + return b +} diff --git a/transporter.go b/transporter.go index 7e67a69eb..914902d5f 100644 --- a/transporter.go +++ b/transporter.go @@ -62,9 +62,12 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe if err != nil { debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) - thisPeerStats.Failcnt++ + thisPeerStats.FailCnt++ } else { + total := float64(thisPeerStats.SuccCnt) * thisPeerStats.AvgLatency + thisPeerStats.SuccCnt++ thisPeerStats.Latency = float64(end.Sub(start)) / (1000000.0) + thisPeerStats.AvgLatency = (total + thisPeerStats.Latency) / float64(thisPeerStats.SuccCnt) } r.peersStats[peer.Name] = thisPeerStats From d8cd744f2fb983f8cde9fb449cbc394fc53aa874 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 19:07:18 -0700 Subject: [PATCH 04/22] fix remove node --- command.go | 2 +- raft_server.go | 5 +++++ raft_stats.go | 14 ++++++++++---- transporter.go | 13 +++++++------ 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/command.go b/command.go index 5259311fc..060457d0c 100644 --- a/command.go +++ b/command.go @@ -195,7 +195,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) - r.peersStats[c.Name] = nil + delete(r.peersStats, c.Name) if err != nil { return []byte{0}, err diff --git a/raft_server.go b/raft_server.go index 9009f64e0..6355ea86b 100644 --- a/raft_server.go +++ b/raft_server.go @@ -269,6 +269,11 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { return fmt.Errorf("Unable to join: %v", err) } +func (r *raftServer) Stats() []byte { + b, _ := json.Marshal(r.peersStats) + return b +} + // Register commands to raft server func registerCommands() { raft.RegisterCommand(&JoinCommand{}) diff --git a/raft_stats.go b/raft_stats.go index 2f15a7a80..7e2643a3d 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -1,7 +1,7 @@ package main import ( - "encoding/json" + "time" ) type peerStats struct { @@ -11,7 +11,13 @@ type peerStats struct { SuccCnt uint64 `json:"successCount"` } -func (r *raftServer) Stats() []byte { - b, _ := json.Marshal(r.peersStats) - return b +func (ps *peerStats) Fail() { + ps.FailCnt++ +} + +func (ps *peerStats) Succ(d time.Duration) { + total := float64(ps.SuccCnt) * ps.AvgLatency + ps.SuccCnt++ + ps.Latency = float64(d) / (1000000.0) + ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt) } diff --git a/transporter.go b/transporter.go index 914902d5f..909b6ea68 100644 --- a/transporter.go +++ b/transporter.go @@ -52,7 +52,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe u, _ := nameToRaftURL(peer.Name) debugf("Send LogEntries to %s ", u) - thisPeerStats := r.peersStats[peer.Name] + thisPeerStats, ok := r.peersStats[peer.Name] start := time.Now() @@ -62,12 +62,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe if err != nil { debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) - thisPeerStats.FailCnt++ + if ok { + thisPeerStats.Fail() + } } else { - total := float64(thisPeerStats.SuccCnt) * thisPeerStats.AvgLatency - thisPeerStats.SuccCnt++ - thisPeerStats.Latency = float64(end.Sub(start)) / (1000000.0) - thisPeerStats.AvgLatency = (total + thisPeerStats.Latency) / float64(thisPeerStats.SuccCnt) + if ok { + thisPeerStats.Succ(end.Sub(start)) + } } r.peersStats[peer.Name] = thisPeerStats From 9a63723bbe552a6206b0cd86b1e564822e446c03 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 20:35:56 -0700 Subject: [PATCH 05/22] add sdv --- command.go | 5 ++++- raft_stats.go | 30 ++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/command.go b/command.go index 060457d0c..6dc46bfc0 100644 --- a/command.go +++ b/command.go @@ -169,7 +169,10 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) - r.peersStats[c.Name] = &peerStats{} + + if c.Name != r.Name() { + r.peersStats[c.Name] = &peerStats{MinLatency: 1 << 63} + } return b, err } diff --git a/raft_stats.go b/raft_stats.go index 7e2643a3d..e66f72bd3 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -1,14 +1,19 @@ package main import ( + "math" "time" ) type peerStats struct { - Latency float64 `json:"latency"` - AvgLatency float64 `json:"averageLatency"` - FailCnt uint64 `json:"failsCount"` - SuccCnt uint64 `json:"successCount"` + Latency float64 `json:"latency"` + AvgLatency float64 `json:"averageLatency"` + avgLatencySquare float64 + SdvLatency float64 `json:"sdvLatency"` + MinLatency float64 `json:"minLatency"` + MaxLatency float64 `json:"maxLatency"` + FailCnt uint64 `json:"failsCount"` + SuccCnt uint64 `json:"successCount"` } func (ps *peerStats) Fail() { @@ -16,8 +21,25 @@ func (ps *peerStats) Fail() { } func (ps *peerStats) Succ(d time.Duration) { + total := float64(ps.SuccCnt) * ps.AvgLatency + totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare + ps.SuccCnt++ + ps.Latency = float64(d) / (1000000.0) + + if ps.Latency > ps.MaxLatency { + ps.MaxLatency = ps.Latency + } + + if ps.Latency < ps.MinLatency { + ps.MinLatency = ps.Latency + } + ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt) + ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt) + + // sdv = sqrt(avg(x^2) - avg(x)^2) + ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) } From 2b66641b558485232617383588d6dc584ff6618a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 19 Aug 2013 20:36:13 -0700 Subject: [PATCH 06/22] bump deps --- .../protoc-gen-go/generator/generator.go | 2 +- .../github.com/coreos/go-raft/server.go | 55 +++++++++---------- .../github.com/coreos/go-raft/snapshot.go | 3 +- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go index 5097ea57e..413f3614e 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go @@ -1781,7 +1781,7 @@ func isASCIIDigit(c byte) bool { // but it's so remote we're prepared to pretend it's nonexistent - since the // C++ generator lowercases names, it's extremely unlikely to have two fields // with different capitalizations. -// In short, _my_field_name_2 becomes XMyFieldName2. +// In short, _my_field_name_2 becomes XMyFieldName_2. func CamelCase(s string) string { if s == "" { return "" diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index c3d3fbd46..b4dab92ae 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -927,26 +927,25 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // Adds a peer to the server. func (s *Server) AddPeer(name string, connectiongString string) error { s.debugln("server.peer.add: ", name, len(s.peers)) - defer s.writeConf() + // Do not allow peers to be added twice. if s.peers[name] != nil { return nil } // Skip the Peer if it has the same name as the Server - if s.name == name { - return nil + if s.name != name { + peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) + + if s.State() == Leader { + peer.startHeartbeat() + } + + s.peers[peer.Name] = peer } - peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) - - if s.State() == Leader { - peer.startHeartbeat() - } - - s.peers[peer.Name] = peer - - s.debugln("server.peer.conf.write: ", name) + // Write the configuration to file. + s.writeConf() return nil } @@ -955,26 +954,24 @@ func (s *Server) AddPeer(name string, connectiongString string) error { func (s *Server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) - defer s.writeConf() + // Skip the Peer if it has the same name as the Server + if name != s.Name() { + // Return error if peer doesn't exist. + peer := s.peers[name] + if peer == nil { + return fmt.Errorf("raft: Peer not found: %s", name) + } - if name == s.Name() { - // when the removed node restart, it should be able - // to know it has been removed before. So we need - // to update knownCommitIndex - return nil - } - // Return error if peer doesn't exist. - peer := s.peers[name] - if peer == nil { - return fmt.Errorf("raft: Peer not found: %s", name) + // Stop peer and remove it. + if s.State() == Leader { + peer.stopHeartbeat(true) + } + + delete(s.peers, name) } - // Stop peer and remove it. - if s.State() == Leader { - peer.stopHeartbeat(true) - } - - delete(s.peers, name) + // Write the configuration to file. + s.writeConf() return nil } diff --git a/third_party/github.com/coreos/go-raft/snapshot.go b/third_party/github.com/coreos/go-raft/snapshot.go index fd41c08f0..93b1a97cd 100644 --- a/third_party/github.com/coreos/go-raft/snapshot.go +++ b/third_party/github.com/coreos/go-raft/snapshot.go @@ -6,7 +6,6 @@ import ( "fmt" "hash/crc32" "os" - "syscall" ) //------------------------------------------------------------------------------ @@ -54,7 +53,7 @@ func (ss *Snapshot) save() error { } // force the change writting to disk - syscall.Fsync(int(file.Fd())) + file.Sync() return err } From 896c944c7ea0704bb0b59ea20081cb8129c80e4a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 20 Aug 2013 14:05:23 -0700 Subject: [PATCH 07/22] add serverStats --- command.go | 2 +- raft_handlers.go | 3 +++ raft_server.go | 37 ++++++++++++++++++++++--------------- raft_stats.go | 38 +++++++++++++++++++++++++++++++++++--- transporter.go | 3 +++ 5 files changed, 64 insertions(+), 19 deletions(-) diff --git a/command.go b/command.go index 6dc46bfc0..0147811da 100644 --- a/command.go +++ b/command.go @@ -171,7 +171,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) if c.Name != r.Name() { - r.peersStats[c.Name] = &peerStats{MinLatency: 1 << 63} + r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} } return b, err diff --git a/raft_handlers.go b/raft_handlers.go index 1a92560ec..f56a3aa0b 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -42,6 +42,9 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { if err == nil { debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) + + r.serverStats.RecvAppendReq(aereq.LeaderName) + if resp := r.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) diff --git a/raft_server.go b/raft_server.go index 6355ea86b..91b304ecb 100644 --- a/raft_server.go +++ b/raft_server.go @@ -17,13 +17,14 @@ import ( type raftServer struct { *raft.Server - version string - joinIndex uint64 - name string - url string - tlsConf *TLSConfig - tlsInfo *TLSInfo - peersStats map[string]*peerStats + version string + joinIndex uint64 + name string + url string + tlsConf *TLSConfig + tlsInfo *TLSInfo + peersStats map[string]*raftPeerStats + serverStats *raftServerStats } var r *raftServer @@ -39,13 +40,14 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo check(err) return &raftServer{ - Server: server, - version: raftVersion, - name: name, - url: url, - tlsConf: tlsConf, - tlsInfo: tlsInfo, - peersStats: make(map[string]*peerStats), + Server: server, + version: raftVersion, + name: name, + url: url, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + peersStats: make(map[string]*raftPeerStats), + serverStats: &raftServerStats{}, } } @@ -270,7 +272,12 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { - b, _ := json.Marshal(r.peersStats) + sBytes, _ := json.Marshal(r.serverStats) + + pBytes, _ := json.Marshal(r.peersStats) + + b := append(sBytes, pBytes...) + return b } diff --git a/raft_stats.go b/raft_stats.go index e66f72bd3..2b1686bb6 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -3,9 +3,41 @@ package main import ( "math" "time" + + "github.com/coreos/go-raft" ) -type peerStats struct { +type raftServerStats struct { + State string + StartTime time.Time + Leader string + leaderStartTime time.Time + LeaderUptime time.Duration + RecvAppendRequestCnt uint64 + SendAppendRequestCnt uint64 +} + +func (ss *raftServerStats) RecvAppendReq(leaderName string) { + ss.State = raft.Follower + if leaderName != ss.Leader { + ss.Leader = leaderName + ss.leaderStartTime = time.Now() + } + + ss.RecvAppendRequestCnt++ +} + +func (ss *raftServerStats) SendAppendReq() { + if ss.State != raft.Leader { + ss.State = raft.Leader + ss.Leader = r.Name() + ss.leaderStartTime = time.Now() + } + + ss.SendAppendRequestCnt++ +} + +type raftPeerStats struct { Latency float64 `json:"latency"` AvgLatency float64 `json:"averageLatency"` avgLatencySquare float64 @@ -16,11 +48,11 @@ type peerStats struct { SuccCnt uint64 `json:"successCount"` } -func (ps *peerStats) Fail() { +func (ps *raftPeerStats) Fail() { ps.FailCnt++ } -func (ps *peerStats) Succ(d time.Duration) { +func (ps *raftPeerStats) Succ(d time.Duration) { total := float64(ps.SuccCnt) * ps.AvgLatency totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare diff --git a/transporter.go b/transporter.go index 909b6ea68..e476fdf05 100644 --- a/transporter.go +++ b/transporter.go @@ -47,6 +47,9 @@ func dialTimeout(network, addr string) (net.Conn, error) { func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer + + r.serverStats.SendAppendReq() + json.NewEncoder(&b).Encode(req) u, _ := nameToRaftURL(peer.Name) From f75c309d268663ac793c0f6a8d6cf8a5a2b7e187 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 20 Aug 2013 16:33:54 -0700 Subject: [PATCH 08/22] sampling sending rate --- raft_server.go | 32 ++++++++++++++++++++++++-------- raft_stats.go | 31 +++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/raft_server.go b/raft_server.go index 91b304ecb..651e5d032 100644 --- a/raft_server.go +++ b/raft_server.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "container/list" "crypto/tls" "encoding/binary" "encoding/json" @@ -40,14 +41,17 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo check(err) return &raftServer{ - Server: server, - version: raftVersion, - name: name, - url: url, - tlsConf: tlsConf, - tlsInfo: tlsInfo, - peersStats: make(map[string]*raftPeerStats), - serverStats: &raftServerStats{}, + Server: server, + version: raftVersion, + name: name, + url: url, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + peersStats: make(map[string]*raftPeerStats), + serverStats: &raftServerStats{ + StartTime: time.Now(), + sendRateQueue: list.New(), + }, } } @@ -272,6 +276,18 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { + + r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() + + queue := r.serverStats.sendRateQueue + + frontValue, _ := queue.Front().Value.(time.Time) + backValue, _ := queue.Back().Value.(time.Time) + + sampleDuration := backValue.Sub(frontValue) + + r.serverStats.SendingRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) + sBytes, _ := json.Marshal(r.serverStats) pBytes, _ := json.Marshal(r.peersStats) diff --git a/raft_stats.go b/raft_stats.go index 2b1686bb6..7c447bae6 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -1,20 +1,27 @@ package main import ( + "container/list" "math" "time" "github.com/coreos/go-raft" ) +type runtimeStats struct { +} + type raftServerStats struct { - State string - StartTime time.Time - Leader string - leaderStartTime time.Time - LeaderUptime time.Duration - RecvAppendRequestCnt uint64 - SendAppendRequestCnt uint64 + State string + StartTime time.Time + Leader string + leaderStartTime time.Time + LeaderUptime string + RecvAppendRequestCnt uint64 + SendAppendRequestCnt uint64 + SendAppendReqeustRate uint64 + sendRateQueue *list.List + SendingRate float64 } func (ss *raftServerStats) RecvAppendReq(leaderName string) { @@ -28,10 +35,18 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string) { } func (ss *raftServerStats) SendAppendReq() { + now := time.Now() if ss.State != raft.Leader { ss.State = raft.Leader ss.Leader = r.Name() - ss.leaderStartTime = time.Now() + ss.leaderStartTime = now + } + + if ss.sendRateQueue.Len() < 200 { + ss.sendRateQueue.PushBack(now) + } else { + ss.sendRateQueue.PushBack(now) + ss.sendRateQueue.Remove(ss.sendRateQueue.Front()) } ss.SendAppendRequestCnt++ From 10cdaea0593669d18c92c04b6bc9f1d0c33cae38 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 20 Aug 2013 16:48:41 -0700 Subject: [PATCH 09/22] make a better array cyc-queue --- raft_stats.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/raft_stats.go b/raft_stats.go index 7c447bae6..9c572afc9 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -8,9 +8,18 @@ import ( "github.com/coreos/go-raft" ) +const ( + queueCapacity = 200 +) + type runtimeStats struct { } +type packageStats struct { + sendingTime time.Time + size uint64 +} + type raftServerStats struct { State string StartTime time.Time @@ -21,6 +30,7 @@ type raftServerStats struct { SendAppendRequestCnt uint64 SendAppendReqeustRate uint64 sendRateQueue *list.List + recvRateQueue *list.List SendingRate float64 } @@ -90,3 +100,38 @@ func (ps *raftPeerStats) Succ(d time.Duration) { // sdv = sqrt(avg(x^2) - avg(x)^2) ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) } + +type statsQueue struct { + items [queueCapacity]*packageStats + size int + front int + back int +} + +func (q *statsQueue) Len() int { + return q.size +} + +func (q *statsQueue) Front() *packageStats { + if q.size != 0 { + return q.items[q.front] + } + return nil +} + +func (q *statsQueue) Back() *packageStats { + if q.size != 0 { + return q.items[q.back] + } + return nil +} + +func (q *statsQueue) Insert(p *packageStats) { + q.back = (q.back + 1) % queueCapacity + q.items[q.back] = p + if q.size == queueCapacity { + q.front = (q.back + 1) % queueCapacity + } else { + q.size++ + } +} From 6ef18b1ae3cf3c06c11ca73d670faef1900f2a67 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 21 Aug 2013 13:35:15 -0700 Subject: [PATCH 10/22] thread-saft queue --- raft_server.go | 22 +++++++++------ raft_stats.go | 72 ++++++++++++++++++++++++++++++++------------------ transporter.go | 7 +++-- 3 files changed, 65 insertions(+), 36 deletions(-) diff --git a/raft_server.go b/raft_server.go index 651e5d032..d447b16aa 100644 --- a/raft_server.go +++ b/raft_server.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "container/list" "crypto/tls" "encoding/binary" "encoding/json" @@ -49,8 +48,10 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo tlsInfo: tlsInfo, peersStats: make(map[string]*raftPeerStats), serverStats: &raftServerStats{ - StartTime: time.Now(), - sendRateQueue: list.New(), + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, }, } } @@ -281,14 +282,19 @@ func (r *raftServer) Stats() []byte { queue := r.serverStats.sendRateQueue - frontValue, _ := queue.Front().Value.(time.Time) - backValue, _ := queue.Back().Value.(time.Time) + front, back := queue.FrontAndBack() - sampleDuration := backValue.Sub(frontValue) + sampleDuration := back.Time().Sub(front.Time()) - r.serverStats.SendingRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) + r.serverStats.SendingPkgRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) - sBytes, _ := json.Marshal(r.serverStats) + r.serverStats.SendingBandwidthRate = float64(queue.Size()) / float64(sampleDuration) * float64(time.Second) + + sBytes, err := json.Marshal(r.serverStats) + + if err != nil { + warn(err) + } pBytes, _ := json.Marshal(r.peersStats) diff --git a/raft_stats.go b/raft_stats.go index 9c572afc9..3e125e9a2 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -2,7 +2,9 @@ package main import ( "container/list" + "fmt" "math" + "sync" "time" "github.com/coreos/go-raft" @@ -17,7 +19,18 @@ type runtimeStats struct { type packageStats struct { sendingTime time.Time - size uint64 + size int +} + +func NewPackageStats(now time.Time, size int) *packageStats { + return &packageStats{ + sendingTime: now, + size: size, + } +} + +func (ps *packageStats) Time() time.Time { + return ps.sendingTime } type raftServerStats struct { @@ -29,9 +42,10 @@ type raftServerStats struct { RecvAppendRequestCnt uint64 SendAppendRequestCnt uint64 SendAppendReqeustRate uint64 - sendRateQueue *list.List + sendRateQueue *statsQueue recvRateQueue *list.List - SendingRate float64 + SendingPkgRate float64 + SendingBandwidthRate float64 } func (ss *raftServerStats) RecvAppendReq(leaderName string) { @@ -44,7 +58,7 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string) { ss.RecvAppendRequestCnt++ } -func (ss *raftServerStats) SendAppendReq() { +func (ss *raftServerStats) SendAppendReq(pkgSize int) { now := time.Now() if ss.State != raft.Leader { ss.State = raft.Leader @@ -52,12 +66,7 @@ func (ss *raftServerStats) SendAppendReq() { ss.leaderStartTime = now } - if ss.sendRateQueue.Len() < 200 { - ss.sendRateQueue.PushBack(now) - } else { - ss.sendRateQueue.PushBack(now) - ss.sendRateQueue.Remove(ss.sendRateQueue.Front()) - } + ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) ss.SendAppendRequestCnt++ } @@ -102,36 +111,47 @@ func (ps *raftPeerStats) Succ(d time.Duration) { } type statsQueue struct { - items [queueCapacity]*packageStats - size int - front int - back int + items [queueCapacity]*packageStats + size int + front int + back int + totalPkgSize int + rwl sync.RWMutex } func (q *statsQueue) Len() int { return q.size } -func (q *statsQueue) Front() *packageStats { - if q.size != 0 { - return q.items[q.front] - } - return nil +func (q *statsQueue) Size() int { + return q.totalPkgSize } -func (q *statsQueue) Back() *packageStats { +// FrontAndBack gets the front and back elements in the queue +// We must grab front and back together with the protection of the lock +func (q *statsQueue) FrontAndBack() (*packageStats, *packageStats) { + q.rwl.RLock() + defer q.rwl.RUnlock() if q.size != 0 { - return q.items[q.back] + return q.items[q.front], q.items[q.back] } - return nil + return nil, nil } func (q *statsQueue) Insert(p *packageStats) { - q.back = (q.back + 1) % queueCapacity - q.items[q.back] = p - if q.size == queueCapacity { - q.front = (q.back + 1) % queueCapacity + q.rwl.Lock() + defer q.rwl.Unlock() + + if q.size == queueCapacity { //dequeue + q.totalPkgSize -= q.items[q.front].size + q.front = (q.back + 2) % queueCapacity } else { q.size++ } + + q.back = (q.back + 1) % queueCapacity + q.items[q.back] = p + q.totalPkgSize += q.items[q.back].size + + fmt.Println(q.front, q.back, q.size) } diff --git a/transporter.go b/transporter.go index e476fdf05..e82d3668c 100644 --- a/transporter.go +++ b/transporter.go @@ -48,11 +48,14 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var aersp *raft.AppendEntriesResponse var b bytes.Buffer - r.serverStats.SendAppendReq() - json.NewEncoder(&b).Encode(req) + size := b.Len() + + r.serverStats.SendAppendReq(size) + u, _ := nameToRaftURL(peer.Name) + debugf("Send LogEntries to %s ", u) thisPeerStats, ok := r.peersStats[peer.Name] From 23995ffc591aab059a66803d0ec57b5ecc910098 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 21 Aug 2013 14:34:52 -0700 Subject: [PATCH 11/22] add recvQueue --- raft_handlers.go | 2 +- raft_server.go | 11 ++++++----- raft_stats.go | 26 ++++++++++++++++++++++---- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/raft_handlers.go b/raft_handlers.go index f56a3aa0b..4fff0780d 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -43,7 +43,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { if err == nil { debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) - r.serverStats.RecvAppendReq(aereq.LeaderName) + r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) if resp := r.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) diff --git a/raft_server.go b/raft_server.go index d447b16aa..3d11d0899 100644 --- a/raft_server.go +++ b/raft_server.go @@ -52,6 +52,9 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo sendRateQueue: &statsQueue{ back: -1, }, + recvRateQueue: &statsQueue{ + back: -1, + }, }, } } @@ -282,13 +285,11 @@ func (r *raftServer) Stats() []byte { queue := r.serverStats.sendRateQueue - front, back := queue.FrontAndBack() + r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate() - sampleDuration := back.Time().Sub(front.Time()) + queue = r.serverStats.recvRateQueue - r.serverStats.SendingPkgRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) - - r.serverStats.SendingBandwidthRate = float64(queue.Size()) / float64(sampleDuration) * float64(time.Second) + r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() sBytes, err := json.Marshal(r.serverStats) diff --git a/raft_stats.go b/raft_stats.go index 3e125e9a2..5a8458edd 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -1,7 +1,6 @@ package main import ( - "container/list" "fmt" "math" "sync" @@ -43,18 +42,21 @@ type raftServerStats struct { SendAppendRequestCnt uint64 SendAppendReqeustRate uint64 sendRateQueue *statsQueue - recvRateQueue *list.List + recvRateQueue *statsQueue SendingPkgRate float64 SendingBandwidthRate float64 + RecvingPkgRate float64 + RecvingBandwidthRate float64 } -func (ss *raftServerStats) RecvAppendReq(leaderName string) { +func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { ss.State = raft.Follower if leaderName != ss.Leader { ss.Leader = leaderName ss.leaderStartTime = time.Now() } + ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) ss.RecvAppendRequestCnt++ } @@ -129,7 +131,7 @@ func (q *statsQueue) Size() int { // FrontAndBack gets the front and back elements in the queue // We must grab front and back together with the protection of the lock -func (q *statsQueue) FrontAndBack() (*packageStats, *packageStats) { +func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { q.rwl.RLock() defer q.rwl.RUnlock() if q.size != 0 { @@ -155,3 +157,19 @@ func (q *statsQueue) Insert(p *packageStats) { fmt.Println(q.front, q.back, q.size) } + +func (q *statsQueue) Rate() (float64, float64) { + front, back := q.frontAndBack() + + if front == nil || back == nil { + return 0, 0 + } + + sampleDuration := back.Time().Sub(front.Time()) + + pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) + + br := float64(q.Size()) / float64(sampleDuration) * float64(time.Second) + + return pr, br +} From 808eb64bd73f75051818ca4b458d7604552db78c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 23 Aug 2013 01:25:35 -0400 Subject: [PATCH 12/22] tmp commit --- raft_stats.go | 49 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/raft_stats.go b/raft_stats.go index 5a8458edd..3794ef52c 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -33,20 +33,22 @@ func (ps *packageStats) Time() time.Time { } type raftServerStats struct { - State string - StartTime time.Time - Leader string - leaderStartTime time.Time - LeaderUptime string - RecvAppendRequestCnt uint64 - SendAppendRequestCnt uint64 - SendAppendReqeustRate uint64 - sendRateQueue *statsQueue - recvRateQueue *statsQueue - SendingPkgRate float64 - SendingBandwidthRate float64 - RecvingPkgRate float64 - RecvingBandwidthRate float64 + State string `json:"state"` + StartTime time.Time `json:"startTime"` + Leader string `json:"leader"` + LeaderUptime string `json:"leaderUptime"` + + RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt"` + RecvingPkgRate float64 `json:"recvPkgRate"` + RecvingBandwidthRate float64 `json:"recvBandwidthRate"` + + SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` + SendingPkgRate float64 `json:"sendPkgRate"` + SendingBandwidthRate float64 `json:"sendBandwidthRate"` + + leaderStartTime time.Time + sendRateQueue *statsQueue + recvRateQueue *statsQueue } func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { @@ -144,14 +146,15 @@ func (q *statsQueue) Insert(p *packageStats) { q.rwl.Lock() defer q.rwl.Unlock() + q.back = (q.back + 1) % queueCapacity + if q.size == queueCapacity { //dequeue q.totalPkgSize -= q.items[q.front].size - q.front = (q.back + 2) % queueCapacity + q.front = (q.back + 1) % queueCapacity } else { q.size++ } - q.back = (q.back + 1) % queueCapacity q.items[q.back] = p q.totalPkgSize += q.items[q.back].size @@ -165,6 +168,11 @@ func (q *statsQueue) Rate() (float64, float64) { return 0, 0 } + if time.Now.Sub(back.Time()) > time.Second { + q.Clear() + return 0, 0 + } + sampleDuration := back.Time().Sub(front.Time()) pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) @@ -173,3 +181,12 @@ func (q *statsQueue) Rate() (float64, float64) { return pr, br } + +func (q *statsQueue) Clear() { + q.rwl.Lock() + defer q.rwl.Unlock() + q.back = -1 + q.front = 0 + q.size = 0 + q.totalPkgSize = 0 +} From 2f5015552e8db84d04cb75b0adb3f7b9d3be5c56 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Mon, 2 Sep 2013 22:17:39 -0700 Subject: [PATCH 13/22] feat(etcd_handlers): enable CORS When developing or using web frontends for etcd it will be necessary to enable Cross-Origin Resource Sharing. Add a flag that lets the user enable this feature via a whitelist. --- etcd.go | 27 +++++++++++++++++++++++++++ etcd_handlers.go | 19 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/etcd.go b/etcd.go index 46546e8cc..5a0b4dee6 100644 --- a/etcd.go +++ b/etcd.go @@ -3,9 +3,11 @@ package main import ( "crypto/tls" "flag" + "fmt" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "io/ioutil" + "net/url" "os" "strings" "time" @@ -40,6 +42,9 @@ var ( maxClusterSize int cpuprofile string + + cors string + corsList map[string]bool ) func init() { @@ -77,6 +82,8 @@ func init() { flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster") flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") + + flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')") } const ( @@ -152,6 +159,8 @@ func main() { raft.SetLogLevel(raft.Debug) } + parseCorsFlag() + if machines != "" { cluster = strings.Split(machines, ",") } else if machinesFile != "" { @@ -206,3 +215,21 @@ func main() { e.ListenAndServe() } + +// parseCorsFlag gathers up the cors whitelist and puts it into the corsList. +func parseCorsFlag() { + if cors != "" { + corsList = make(map[string]bool) + list := strings.Split(cors, ",") + for _, v := range list { + fmt.Println(v) + if v != "*" { + _, err := url.Parse(v) + if err != nil { + panic(fmt.Sprintf("bad cors url: %s", err)) + } + } + corsList[v] = true + } + } +} diff --git a/etcd_handlers.go b/etcd_handlers.go index 60e7b35b5..02cb2316a 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -29,7 +29,26 @@ func NewEtcdMuxer() *http.ServeMux { type errorHandler func(http.ResponseWriter, *http.Request) error +// addCorsHeader parses the request Origin header and loops through the user +// provided allowed origins and sets the Access-Control-Allow-Origin header if +// there is a match. +func addCorsHeader(w http.ResponseWriter, r *http.Request) { + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } + + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } +} + func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + addCorsHeader(w, r) if e := fn(w, r); e != nil { if etcdErr, ok := e.(etcdErr.Error); ok { debug("Return error: ", etcdErr.Error()) From b300d2877ee97467a89769fd3e63829035bc404a Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 4 Sep 2013 17:31:27 -0700 Subject: [PATCH 14/22] README: add justinsb/jetcd project --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index cf883b0d1..4d4a0d765 100644 --- a/README.md +++ b/README.md @@ -465,6 +465,10 @@ If you are using SSL for server to server communication, you must use it on all - [go-etcd](https://github.com/coreos/go-etcd) +**Java libraries** + +- [justinsb/jetcd](https://github.com/justinsb/jetcd) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) From adbcbefe92087c30736652bef3036a1e81ac5a13 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Thu, 5 Sep 2013 08:42:30 -0700 Subject: [PATCH 15/22] feat(README): add python library transitorykris/etcd-py --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 4d4a0d765..a4ad5f1d4 100644 --- a/README.md +++ b/README.md @@ -469,6 +469,10 @@ If you are using SSL for server to server communication, you must use it on all - [justinsb/jetcd](https://github.com/justinsb/jetcd) +**Python libraries** + +- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) From b366f1044688e3ee8cacc09c4fd1b9ffeb399167 Mon Sep 17 00:00:00 2001 From: Geoff Hayes Date: Thu, 5 Sep 2013 21:08:43 -0700 Subject: [PATCH 16/22] Blank prevValue in POST should be interpreted as a blank test-and-set, not a normal set --- etcd_handlers.go | 12 ++++++------ etcd_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index 02cb2316a..ac523cc89 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -92,15 +92,15 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) - value := req.FormValue("value") + req.ParseForm() + + value := req.Form.Get("value") if len(value) == 0 { return etcdErr.NewError(200, "Set") } - prevValue := req.FormValue("prevValue") - - strDuration := req.FormValue("ttl") + strDuration := req.Form.Get("ttl") expireTime, err := durationToExpireTime(strDuration) @@ -108,11 +108,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { return etcdErr.NewError(202, "Set") } - if len(prevValue) != 0 { + if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { command := &TestAndSetCommand{ Key: key, Value: value, - PrevValue: prevValue, + PrevValue: prevValueArr[0], ExpireTime: expireTime, } diff --git a/etcd_test.go b/etcd_test.go index e61e7e4a8..61db07023 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -54,6 +54,32 @@ func TestSingleNode(t *testing.T) { } t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) } + + // Add a test-and-set test + + // First, we'll test we can change the value if we get it write + result, match, err := c.TestAndSet("foo", "bar", "foobar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Next, we'll make sure we can't set it without the correct prior value + _, _, err = c.TestAndSet("foo", "bar", "foofoo", 100) + + if err == nil { + t.Fatalf("Set 4 expecting error when setting key with incorrect previous value") + } + + // Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match + _, _, err = c.TestAndSet("foo", "", "barbar", 100) + + if err == nil { + t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value") + } } // TestInternalVersionFail will ensure that etcd does not come up if the internal raft From a623effaf14bb3a035f749df0ec180e0eb6ebb4e Mon Sep 17 00:00:00 2001 From: Diwaker Gupta Date: Fri, 6 Sep 2013 19:34:22 -0700 Subject: [PATCH 17/22] Add another Java library for etcd https://github.com/diwakergupta/jetcd --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index a4ad5f1d4..4514c4edf 100644 --- a/README.md +++ b/README.md @@ -468,6 +468,8 @@ If you are using SSL for server to server communication, you must use it on all **Java libraries** - [justinsb/jetcd](https://github.com/justinsb/jetcd) +- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd) + **Python libraries** From 380326b5d1ba26a9c5943a8c09072ab45e1b9d1e Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sat, 7 Sep 2013 09:05:24 -0700 Subject: [PATCH 18/22] feat(README): add note about cluster size Cluster size keeps coming up on IRC and the mailing list. Add an FAQ section. --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index a4ad5f1d4..a9df80200 100644 --- a/README.md +++ b/README.md @@ -495,6 +495,19 @@ If you are using SSL for server to server communication, you must use it on all - [mattn/etcd-vim](https://github.com/mattn/etcd-vim) - SET and GET keys from inside vim - [mattn/etcdenv](https://github.com/mattn/etcdenv) - "env" shebang with etcd integration +## FAQ + +### What size cluster should I use? + +Every command the client sends to the master is broadcast it to all of the followers. +But, the command is not be committed until the majority of the cluster machines receive that command. + +Because of this majority voting property the ideal cluster should be kept small to keep speed up and be made up of an odd number of machines. + +Odd numbers are good because if you have 8 machines the majority will be 5 and if you have 9 machines the majority with be 5. +The result is that an 8 machine cluster can tolerate 3 machine failures and a 9 machine cluster can tolerate 4 nodes failures. +And in the best case when all 9 machines are responding the cluster will perform at the speed of the fastest 5 nodes. + ## Project Details ### Versioning From 43cb2a353f250f4124f727da1860e56c3dd87e33 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 8 Sep 2013 20:48:33 -0400 Subject: [PATCH 19/22] only leader will return peer stats --- etcd_server.go | 2 +- raft_server.go | 12 ++++++++---- raft_stats.go | 19 +++++++++---------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/etcd_server.go b/etcd_server.go index 2cef01558..0139b03c2 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -31,7 +31,7 @@ func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSCo // Start to listen and response etcd client command func (e *etcdServer) ListenAndServe() { - infof("etcd server [%s:%s]", e.name, e.url) + infof("etcd server [name %s, listen on %s, advertised url %s]", e.name, e.Server.Addr, e.url) if e.tlsConf.Scheme == "http" { fatal(e.Server.ListenAndServe()) diff --git a/raft_server.go b/raft_server.go index 262ffc972..a159e9020 100644 --- a/raft_server.go +++ b/raft_server.go @@ -45,6 +45,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi version: raftVersion, name: name, url: url, + listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, peersStats: make(map[string]*raftPeerStats), @@ -148,7 +149,7 @@ func startAsFollower() { // Start to listen and response raft command func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { - infof("raft server [%s:%s]", r.name, r.listenHost) + infof("raft server [name %s, listen on %s, advertised url %s]", r.name, r.listenHost, r.url) raftMux := http.NewServeMux() @@ -298,11 +299,14 @@ func (r *raftServer) Stats() []byte { warn(err) } - pBytes, _ := json.Marshal(r.peersStats) + if r.State() == raft.Leader { + pBytes, _ := json.Marshal(r.peersStats) - b := append(sBytes, pBytes...) + b := append(sBytes, pBytes...) + return b + } - return b + return sBytes } // Register commands to raft server diff --git a/raft_stats.go b/raft_stats.go index 06687d89f..cf8fb1d6c 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "math" "sync" "time" @@ -13,14 +12,14 @@ const ( queueCapacity = 200 ) -type runtimeStats struct { -} - +// packageStats represent the stats we need for a package. +// It has sending time and the size of the package. type packageStats struct { sendingTime time.Time size int } +// NewPackageStats creates a pacakgeStats and return the pointer to it. func NewPackageStats(now time.Time, size int) *packageStats { return &packageStats{ sendingTime: now, @@ -28,6 +27,7 @@ func NewPackageStats(now time.Time, size int) *packageStats { } } +// Time return the sending time of the package. func (ps *packageStats) Time() time.Time { return ps.sendingTime } @@ -38,13 +38,13 @@ type raftServerStats struct { Leader string `json:"leader"` LeaderUptime string `json:"leaderUptime"` - RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt"` - RecvingPkgRate float64 `json:"recvPkgRate"` - RecvingBandwidthRate float64 `json:"recvBandwidthRate"` + RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` + RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` + RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` - SendingPkgRate float64 `json:"sendPkgRate"` - SendingBandwidthRate float64 `json:"sendBandwidthRate"` + SendingPkgRate float64 `json:"sendPkgRate,omitempty"` + SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` leaderStartTime time.Time sendRateQueue *statsQueue @@ -158,7 +158,6 @@ func (q *statsQueue) Insert(p *packageStats) { q.items[q.back] = p q.totalPkgSize += q.items[q.back].size - fmt.Println(q.front, q.back, q.size) } func (q *statsQueue) Rate() (float64, float64) { From 31aa3dfe8216c566570dfff11af20ff037c72226 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 8 Sep 2013 21:12:26 -0400 Subject: [PATCH 20/22] clear up raft_stats --- raft_stats.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/raft_stats.go b/raft_stats.go index cf8fb1d6c..973976487 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -86,10 +86,7 @@ type raftPeerStats struct { SuccCnt uint64 `json:"successCount"` } -func (ps *raftPeerStats) Fail() { - ps.FailCnt++ -} - +// Succ function update the raftPeerStats with a successful send func (ps *raftPeerStats) Succ(d time.Duration) { total := float64(ps.SuccCnt) * ps.AvgLatency @@ -114,6 +111,11 @@ func (ps *raftPeerStats) Succ(d time.Duration) { ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) } +// Fail function update the raftPeerStats with a unsuccessful send +func (ps *raftPeerStats) Fail() { + ps.FailCnt++ +} + type statsQueue struct { items [queueCapacity]*packageStats size int @@ -127,7 +129,7 @@ func (q *statsQueue) Len() int { return q.size } -func (q *statsQueue) Size() int { +func (q *statsQueue) PkgSize() int { return q.totalPkgSize } @@ -142,6 +144,7 @@ func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { return nil, nil } +// Insert function insert a packageStats into the queue and update the records func (q *statsQueue) Insert(p *packageStats) { q.rwl.Lock() defer q.rwl.Unlock() @@ -160,6 +163,7 @@ func (q *statsQueue) Insert(p *packageStats) { } +// Rate function returns the package rate and byte rate func (q *statsQueue) Rate() (float64, float64) { front, back := q.frontAndBack() @@ -176,11 +180,12 @@ func (q *statsQueue) Rate() (float64, float64) { pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) - br := float64(q.Size()) / float64(sampleDuration) * float64(time.Second) + br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second) return pr, br } +// Clear function clear up the statsQueue func (q *statsQueue) Clear() { q.rwl.Lock() defer q.rwl.Unlock() From 86e03d22982ab382d59e726d86f3ea3b14eea29a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 10 Sep 2013 12:28:58 -0400 Subject: [PATCH 21/22] format --- raft_server.go | 1 - raft_stats.go | 1 - 2 files changed, 2 deletions(-) diff --git a/raft_server.go b/raft_server.go index a159e9020..9342e2997 100644 --- a/raft_server.go +++ b/raft_server.go @@ -282,7 +282,6 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { - r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() queue := r.serverStats.sendRateQueue diff --git a/raft_stats.go b/raft_stats.go index 973976487..175a1be55 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -88,7 +88,6 @@ type raftPeerStats struct { // Succ function update the raftPeerStats with a successful send func (ps *raftPeerStats) Succ(d time.Duration) { - total := float64(ps.SuccCnt) * ps.AvgLatency totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare From f9235481829a53fbefb5788971afa807468cba1e Mon Sep 17 00:00:00 2001 From: Michael Stillwell Date: Fri, 13 Sep 2013 09:33:44 +0100 Subject: [PATCH 22/22] Change key "testAndSet" -> "foo" for clarity --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index a495b97f6..7dbd55ae7 100644 --- a/README.md +++ b/README.md @@ -187,17 +187,17 @@ The watch command returns immediately with the same response as previous. Etcd can be used as a centralized coordination service in a cluster and `TestAndSet` is the most basic operation to build distributed lock service. This command will set the value only if the client provided `prevValue` is equal the current key value. -Here is a simple example. Let's create a key-value pair first: `testAndSet=one`. +Here is a simple example. Let's create a key-value pair first: `foo=one`. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d value=one +curl -L http://127.0.0.1:4001/v1/keys/foo -d value=one ``` -Let's try an invaild `TestAndSet` command. +Let's try an invalid `TestAndSet` command. We can give another parameter prevValue to set command to make it a TestAndSet command. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=two -d value=three +curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=two -d value=three ``` This will try to test if the previous of the key is two, it is change it to three. @@ -208,16 +208,16 @@ This will try to test if the previous of the key is two, it is change it to thre which means `testAndSet` failed. -Let us try a vaild one. +Let us try a valid one. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=one -d value=two +curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=one -d value=two ``` The response should be ```json -{"action":"SET","key":"/testAndSet","prevValue":"one","value":"two","index":10} +{"action":"SET","key":"/foo","prevValue":"one","value":"two","index":10} ``` We successfully changed the value from “one” to “two”, since we give the correct previous value.