diff --git a/command.go b/command.go index 0147811da..dc0684707 100644 --- a/command.go +++ b/command.go @@ -170,8 +170,9 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) + // add peer stats if c.Name != r.Name() { - r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} + r.peersStats.Peers[c.Name] = &raftPeerStats{MinLatency: 1 << 63} } return b, err @@ -198,7 +199,9 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) - delete(r.peersStats, c.Name) + + // delete from stats + delete(r.peersStats.Peers, c.Name) if err != nil { return []byte{0}, err diff --git a/etcd_handlers.go b/etcd_handlers.go index b7cf0e791..24d7bb8aa 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -22,7 +22,7 @@ func NewEtcdMuxer() *http.ServeMux { etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) - etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler)) etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) return etcdMux @@ -167,22 +167,8 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er return etcdErr.NewError(300, "") } - // tell the client where is the leader - path := req.URL.Path + redirect(leader, etcd, w, req) - var url string - - if etcd { - etcdAddr, _ := nameToEtcdURL(leader) - url = etcdAddr + path - } else { - raftAddr, _ := nameToRaftURL(leader) - url = raftAddr + path - } - - debugf("Redirect to %s", url) - - http.Redirect(w, req, url, http.StatusTemporaryRedirect) return nil } return etcdErr.NewError(300, "") @@ -227,9 +213,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return the basic stats of etcd func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - w.Write(etcdStore.Stats()) - w.Write(r.Stats()) + option := req.URL.Path[len("/v1/stats/"):] + + switch option { + case "self": + w.WriteHeader(http.StatusOK) + w.Write(r.Stats()) + case "leader": + if r.State() == raft.Leader { + w.Write(r.PeerStats()) + } else { + leader := r.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "") + } + redirect(leader, true, w, req) + } + case "store": + w.WriteHeader(http.StatusOK) + w.Write(etcdStore.Stats()) + } + return nil } diff --git a/raft_server.go b/raft_server.go index 9342e2997..ec3388aa9 100644 --- a/raft_server.go +++ b/raft_server.go @@ -24,7 +24,7 @@ type raftServer struct { listenHost string tlsConf *TLSConfig tlsInfo *TLSInfo - peersStats map[string]*raftPeerStats + peersStats *raftPeersStats serverStats *raftServerStats } @@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, - peersStats: make(map[string]*raftPeerStats), + peersStats: &raftPeersStats{ + Leader: name, + Peers: make(map[string]*raftPeerStats), + }, serverStats: &raftServerStats{ StartTime: time.Now(), sendRateQueue: &statsQueue{ @@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Start the raft server func (r *raftServer) ListenAndServe() { - // Setup commands. registerCommands() @@ -282,7 +284,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { - r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() + r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String() queue := r.serverStats.sendRateQueue @@ -292,20 +294,17 @@ func (r *raftServer) Stats() []byte { r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() - sBytes, err := json.Marshal(r.serverStats) + b, _ := json.Marshal(r.serverStats) - if err != nil { - warn(err) - } + return b +} +func (r *raftServer) PeerStats() []byte { if r.State() == raft.Leader { - pBytes, _ := json.Marshal(r.peersStats) - - b := append(sBytes, pBytes...) + b, _ := json.Marshal(r.peersStats) return b } - - return sBytes + return nil } // Register commands to raft server diff --git a/raft_stats.go b/raft_stats.go index 175a1be55..439c14ce9 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time { } type raftServerStats struct { - State string `json:"state"` - StartTime time.Time `json:"startTime"` - Leader string `json:"leader"` - LeaderUptime string `json:"leaderUptime"` + State string `json:"state"` + StartTime time.Time `json:"startTime"` + + LeaderInfo struct { + Name string `json:"leader"` + Uptime string `json:"uptime"` + startTime time.Time + } `json:"leaderInfo"` RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` @@ -46,16 +50,15 @@ type raftServerStats struct { SendingPkgRate float64 `json:"sendPkgRate,omitempty"` SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - leaderStartTime time.Time - sendRateQueue *statsQueue - recvRateQueue *statsQueue + sendRateQueue *statsQueue + recvRateQueue *statsQueue } func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { ss.State = raft.Follower - if leaderName != ss.Leader { - ss.Leader = leaderName - ss.leaderStartTime = time.Now() + if leaderName != ss.LeaderInfo.Name { + ss.LeaderInfo.Name = leaderName + ss.LeaderInfo.startTime = time.Now() } ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) @@ -64,17 +67,23 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { func (ss *raftServerStats) SendAppendReq(pkgSize int) { now := time.Now() + if ss.State != raft.Leader { ss.State = raft.Leader - ss.Leader = r.Name() - ss.leaderStartTime = now + ss.LeaderInfo.Name = r.Name() + ss.LeaderInfo.startTime = now } - ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) ss.SendAppendRequestCnt++ } +type raftPeersStats struct { + Leader string `json:"leader"` + Peers map[string]*raftPeerStats `json:"peers"` +} + type raftPeerStats struct { Latency float64 `json:"latency"` AvgLatency float64 `json:"averageLatency"` diff --git a/transporter.go b/transporter.go index 461741ce6..f0145b63c 100644 --- a/transporter.go +++ b/transporter.go @@ -66,7 +66,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P debugf("Send LogEntries to %s ", u) - thisPeerStats, ok := r.peersStats[peer.Name] + thisPeerStats, ok := r.peersStats.Peers[peer.Name] start := time.Now() @@ -85,7 +85,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P } } - r.peersStats[peer.Name] = thisPeerStats + r.peersStats.Peers[peer.Name] = thisPeerStats if resp != nil { defer resp.Body.Close() diff --git a/util.go b/util.go index 579f1c675..7fba80b3e 100644 --- a/util.go +++ b/util.go @@ -128,6 +128,23 @@ func sanitizeListenHost(listen string, advertised string) string { return net.JoinHostPort(listen, aport) } +func redirect(node string, etcd bool, w http.ResponseWriter, req *http.Request) { + var url string + path := req.URL.Path + + if etcd { + etcdAddr, _ := nameToEtcdURL(node) + url = etcdAddr + path + } else { + raftAddr, _ := nameToRaftURL(node) + url = raftAddr + path + } + + debugf("Redirect to %s", url) + + http.Redirect(w, req, url, http.StatusTemporaryRedirect) +} + func check(err error) { if err != nil { fatal(err)