diff --git a/.gitignore b/.gitignore index c16223f09..d00d899e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ src/ pkg/ -./etcd +/etcd release_version.go +/machine* diff --git a/build b/build index a4fe58737..b121ba30d 100755 --- a/build +++ b/build @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh ETCD_PACKAGE=github.com/coreos/etcd export GOPATH="${PWD}" diff --git a/command.go b/command.go index b9dbb0919..c3be92cee 100644 --- a/command.go +++ b/command.go @@ -170,6 +170,12 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) + // add peer stats + if c.Name != r.Name() { + r.followersStats.Followers[c.Name] = &raftFollowerStats{} + r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 + } + return b, err } @@ -194,7 +200,9 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) - delete(r.peersStats, c.Name) + + // delete from stats + delete(r.followersStats.Followers, c.Name) if err != nil { return []byte{0}, err diff --git a/etcd_handlers.go b/etcd_handlers.go index b7cf0e791..24d7bb8aa 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -22,7 +22,7 @@ func NewEtcdMuxer() *http.ServeMux { etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) - etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler)) etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) return etcdMux @@ -167,22 +167,8 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er return etcdErr.NewError(300, "") } - // tell the client where is the leader - path := req.URL.Path + redirect(leader, etcd, w, req) - var url string - - if etcd { - etcdAddr, _ := nameToEtcdURL(leader) - url = etcdAddr + path - } else { - raftAddr, _ := nameToRaftURL(leader) - url = raftAddr + path - } - - debugf("Redirect to %s", url) - - http.Redirect(w, req, url, http.StatusTemporaryRedirect) return nil } return etcdErr.NewError(300, "") @@ -227,9 +213,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return the basic stats of etcd func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - w.Write(etcdStore.Stats()) - w.Write(r.Stats()) + option := req.URL.Path[len("/v1/stats/"):] + + switch option { + case "self": + w.WriteHeader(http.StatusOK) + w.Write(r.Stats()) + case "leader": + if r.State() == raft.Leader { + w.Write(r.PeerStats()) + } else { + leader := r.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "") + } + redirect(leader, true, w, req) + } + case "store": + w.WriteHeader(http.StatusOK) + w.Write(etcdStore.Stats()) + } + return nil } diff --git a/raft_server.go b/raft_server.go index ca853c789..fc204546a 100644 --- a/raft_server.go +++ b/raft_server.go @@ -17,15 +17,15 @@ import ( type raftServer struct { *raft.Server - version string - joinIndex uint64 - name string - url string - listenHost string - tlsConf *TLSConfig - tlsInfo *TLSInfo - peersStats map[string]*raftPeerStats - serverStats *raftServerStats + version string + joinIndex uint64 + name string + url string + listenHost string + tlsConf *TLSConfig + tlsInfo *TLSInfo + followersStats *raftFollowersStats + serverStats *raftServerStats } var r *raftServer @@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, - peersStats: make(map[string]*raftPeerStats), + followersStats: &raftFollowersStats{ + Leader: name, + Followers: make(map[string]*raftFollowerStats), + }, serverStats: &raftServerStats{ StartTime: time.Now(), sendRateQueue: &statsQueue{ @@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Start the raft server func (r *raftServer) ListenAndServe() { - // Setup commands. registerCommands() @@ -282,7 +284,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { - r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() + r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String() queue := r.serverStats.sendRateQueue @@ -292,20 +294,17 @@ func (r *raftServer) Stats() []byte { r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() - sBytes, err := json.Marshal(r.serverStats) + b, _ := json.Marshal(r.serverStats) - if err != nil { - warn(err) - } + return b +} +func (r *raftServer) PeerStats() []byte { if r.State() == raft.Leader { - pBytes, _ := json.Marshal(r.peersStats) - - b := append(sBytes, pBytes...) + b, _ := json.Marshal(r.followersStats) return b } - - return sBytes + return nil } // Register commands to raft server diff --git a/raft_stats.go b/raft_stats.go index 175a1be55..b5b622317 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time { } type raftServerStats struct { - State string `json:"state"` - StartTime time.Time `json:"startTime"` - Leader string `json:"leader"` - LeaderUptime string `json:"leaderUptime"` + State string `json:"state"` + StartTime time.Time `json:"startTime"` + + LeaderInfo struct { + Name string `json:"leader"` + Uptime string `json:"uptime"` + startTime time.Time + } `json:"leaderInfo"` RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` @@ -46,16 +50,15 @@ type raftServerStats struct { SendingPkgRate float64 `json:"sendPkgRate,omitempty"` SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - leaderStartTime time.Time - sendRateQueue *statsQueue - recvRateQueue *statsQueue + sendRateQueue *statsQueue + recvRateQueue *statsQueue } func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { ss.State = raft.Follower - if leaderName != ss.Leader { - ss.Leader = leaderName - ss.leaderStartTime = time.Now() + if leaderName != ss.LeaderInfo.Name { + ss.LeaderInfo.Name = leaderName + ss.LeaderInfo.startTime = time.Now() } ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) @@ -64,55 +67,66 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { func (ss *raftServerStats) SendAppendReq(pkgSize int) { now := time.Now() + if ss.State != raft.Leader { ss.State = raft.Leader - ss.Leader = r.Name() - ss.leaderStartTime = now + ss.LeaderInfo.Name = r.Name() + ss.LeaderInfo.startTime = now } - ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) ss.SendAppendRequestCnt++ } -type raftPeerStats struct { - Latency float64 `json:"latency"` - AvgLatency float64 `json:"averageLatency"` - avgLatencySquare float64 - SdvLatency float64 `json:"sdvLatency"` - MinLatency float64 `json:"minLatency"` - MaxLatency float64 `json:"maxLatency"` - FailCnt uint64 `json:"failsCount"` - SuccCnt uint64 `json:"successCount"` +type raftFollowersStats struct { + Leader string `json:"leader"` + Followers map[string]*raftFollowerStats `json:"peers"` } -// Succ function update the raftPeerStats with a successful send -func (ps *raftPeerStats) Succ(d time.Duration) { - total := float64(ps.SuccCnt) * ps.AvgLatency - totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare +type raftFollowerStats struct { + Latency struct { + Current float64 `json:"current"` + Average float64 `json:"average"` + averageSquare float64 + StandardDeviation float64 `json:"standardDeviation"` + Minimum float64 `json:"minimum"` + Maximum float64 `json:"maximum"` + } `json:"latency"` - ps.SuccCnt++ + Counts struct { + Fail uint64 `json:"fail"` + Success uint64 `json:"success"` + } `json:"counts"` +} - ps.Latency = float64(d) / (1000000.0) +// Succ function update the raftFollowerStats with a successful send +func (ps *raftFollowerStats) Succ(d time.Duration) { + total := float64(ps.Counts.Success) * ps.Latency.Average + totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare - if ps.Latency > ps.MaxLatency { - ps.MaxLatency = ps.Latency + ps.Counts.Success++ + + ps.Latency.Current = float64(d) / (1000000.0) + + if ps.Latency.Current > ps.Latency.Maximum { + ps.Latency.Maximum = ps.Latency.Current } - if ps.Latency < ps.MinLatency { - ps.MinLatency = ps.Latency + if ps.Latency.Current < ps.Latency.Minimum { + ps.Latency.Minimum = ps.Latency.Current } - ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt) - ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt) + ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success) + ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success) // sdv = sqrt(avg(x^2) - avg(x)^2) - ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) + ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average) } -// Fail function update the raftPeerStats with a unsuccessful send -func (ps *raftPeerStats) Fail() { - ps.FailCnt++ +// Fail function update the raftFollowerStats with a unsuccessful send +func (ps *raftFollowerStats) Fail() { + ps.Counts.Fail++ } type statsQueue struct { diff --git a/scripts/test-cluster b/scripts/test-cluster new file mode 100755 index 000000000..ccdedd1b7 --- /dev/null +++ b/scripts/test-cluster @@ -0,0 +1,19 @@ +#!/bin/bash +SESSION=etcd-cluster + +tmux new-session -d -s $SESSION + +# Setup a window for tailing log files +tmux new-window -t $SESSION:1 -n 'machines' +tmux split-window -h +tmux select-pane -t 0 +tmux send-keys "./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d machine1 -n machine1" C-m + +for i in 2 3; do + tmux select-pane -t 0 + tmux split-window -v + tmux send-keys "./etcd -cors='*' -s 127.0.0.1:700${i} -c 127.0.0.1:400${i} -C 127.0.0.1:7001 -d machine${i} -n machine${i}" C-m +done + +# Attach to session +tmux attach-session -t $SESSION diff --git a/transporter.go b/transporter.go index 517bb7dc4..c17c9d35f 100644 --- a/transporter.go +++ b/transporter.go @@ -66,11 +66,12 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P debugf("Send LogEntries to %s ", u) - thisPeerStats, ok := r.peersStats[peer.Name] + thisFollowerStats, ok := r.followersStats.Followers[peer.Name] - if !ok { // we first see this peer - thisPeerStats = &raftPeerStats{MinLatency: 1 << 63} - r.peersStats[peer.Name] = thisPeerStats + if !ok { //this is the first time this follower has been seen + thisFollowerStats = &raftFollowerStats{} + thisFollowerStats.Latency.Minimum = 1 << 63 + r.followersStats.Followers[peer.Name] = thisFollowerStats } start := time.Now() @@ -82,11 +83,11 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P if err != nil { debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) if ok { - thisPeerStats.Fail() + thisFollowerStats.Fail() } } else { if ok { - thisPeerStats.Succ(end.Sub(start)) + thisFollowerStats.Succ(end.Sub(start)) } } diff --git a/util.go b/util.go index 579f1c675..7fba80b3e 100644 --- a/util.go +++ b/util.go @@ -128,6 +128,23 @@ func sanitizeListenHost(listen string, advertised string) string { return net.JoinHostPort(listen, aport) } +func redirect(node string, etcd bool, w http.ResponseWriter, req *http.Request) { + var url string + path := req.URL.Path + + if etcd { + etcdAddr, _ := nameToEtcdURL(node) + url = etcdAddr + path + } else { + raftAddr, _ := nameToRaftURL(node) + url = raftAddr + path + } + + debugf("Redirect to %s", url) + + http.Redirect(w, req, url, http.StatusTemporaryRedirect) +} + func check(err error) { if err != nil { fatal(err)