mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
20488b498a
@ -170,8 +170,9 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
|
||||
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
|
||||
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
|
||||
|
||||
// add peer stats
|
||||
if c.Name != r.Name() {
|
||||
r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
|
||||
r.peersStats.Peers[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
|
||||
}
|
||||
|
||||
return b, err
|
||||
@ -198,7 +199,9 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
|
||||
key := path.Join("_etcd/machines", c.Name)
|
||||
|
||||
_, err := etcdStore.Delete(key, raftServer.CommitIndex())
|
||||
delete(r.peersStats, c.Name)
|
||||
|
||||
// delete from stats
|
||||
delete(r.peersStats.Peers, c.Name)
|
||||
|
||||
if err != nil {
|
||||
return []byte{0}, err
|
||||
|
@ -22,7 +22,7 @@ func NewEtcdMuxer() *http.ServeMux {
|
||||
etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
|
||||
etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
|
||||
etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
|
||||
etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
|
||||
etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
|
||||
etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
|
||||
etcdMux.HandleFunc("/test/", TestHttpHandler)
|
||||
return etcdMux
|
||||
@ -167,22 +167,8 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er
|
||||
return etcdErr.NewError(300, "")
|
||||
}
|
||||
|
||||
// tell the client where is the leader
|
||||
path := req.URL.Path
|
||||
redirect(leader, etcd, w, req)
|
||||
|
||||
var url string
|
||||
|
||||
if etcd {
|
||||
etcdAddr, _ := nameToEtcdURL(leader)
|
||||
url = etcdAddr + path
|
||||
} else {
|
||||
raftAddr, _ := nameToRaftURL(leader)
|
||||
url = raftAddr + path
|
||||
}
|
||||
|
||||
debugf("Redirect to %s", url)
|
||||
|
||||
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
|
||||
return nil
|
||||
}
|
||||
return etcdErr.NewError(300, "")
|
||||
@ -227,9 +213,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
|
||||
// Handler to return the basic stats of etcd
|
||||
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(etcdStore.Stats())
|
||||
w.Write(r.Stats())
|
||||
option := req.URL.Path[len("/v1/stats/"):]
|
||||
|
||||
switch option {
|
||||
case "self":
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(r.Stats())
|
||||
case "leader":
|
||||
if r.State() == raft.Leader {
|
||||
w.Write(r.PeerStats())
|
||||
} else {
|
||||
leader := r.Leader()
|
||||
// current no leader
|
||||
if leader == "" {
|
||||
return etcdErr.NewError(300, "")
|
||||
}
|
||||
redirect(leader, true, w, req)
|
||||
}
|
||||
case "store":
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(etcdStore.Stats())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ type raftServer struct {
|
||||
listenHost string
|
||||
tlsConf *TLSConfig
|
||||
tlsInfo *TLSInfo
|
||||
peersStats map[string]*raftPeerStats
|
||||
peersStats *raftPeersStats
|
||||
serverStats *raftServerStats
|
||||
}
|
||||
|
||||
@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
|
||||
listenHost: listenHost,
|
||||
tlsConf: tlsConf,
|
||||
tlsInfo: tlsInfo,
|
||||
peersStats: make(map[string]*raftPeerStats),
|
||||
peersStats: &raftPeersStats{
|
||||
Leader: name,
|
||||
Peers: make(map[string]*raftPeerStats),
|
||||
},
|
||||
serverStats: &raftServerStats{
|
||||
StartTime: time.Now(),
|
||||
sendRateQueue: &statsQueue{
|
||||
@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
|
||||
|
||||
// Start the raft server
|
||||
func (r *raftServer) ListenAndServe() {
|
||||
|
||||
// Setup commands.
|
||||
registerCommands()
|
||||
|
||||
@ -282,7 +284,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
|
||||
}
|
||||
|
||||
func (r *raftServer) Stats() []byte {
|
||||
r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String()
|
||||
r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String()
|
||||
|
||||
queue := r.serverStats.sendRateQueue
|
||||
|
||||
@ -292,20 +294,17 @@ func (r *raftServer) Stats() []byte {
|
||||
|
||||
r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
|
||||
|
||||
sBytes, err := json.Marshal(r.serverStats)
|
||||
b, _ := json.Marshal(r.serverStats)
|
||||
|
||||
if err != nil {
|
||||
warn(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (r *raftServer) PeerStats() []byte {
|
||||
if r.State() == raft.Leader {
|
||||
pBytes, _ := json.Marshal(r.peersStats)
|
||||
|
||||
b := append(sBytes, pBytes...)
|
||||
b, _ := json.Marshal(r.peersStats)
|
||||
return b
|
||||
}
|
||||
|
||||
return sBytes
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register commands to raft server
|
||||
|
@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time {
|
||||
}
|
||||
|
||||
type raftServerStats struct {
|
||||
State string `json:"state"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
Leader string `json:"leader"`
|
||||
LeaderUptime string `json:"leaderUptime"`
|
||||
State string `json:"state"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
|
||||
LeaderInfo struct {
|
||||
Name string `json:"leader"`
|
||||
Uptime string `json:"uptime"`
|
||||
startTime time.Time
|
||||
} `json:"leaderInfo"`
|
||||
|
||||
RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"`
|
||||
RecvingPkgRate float64 `json:"recvPkgRate,omitempty"`
|
||||
@ -46,16 +50,15 @@ type raftServerStats struct {
|
||||
SendingPkgRate float64 `json:"sendPkgRate,omitempty"`
|
||||
SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
|
||||
|
||||
leaderStartTime time.Time
|
||||
sendRateQueue *statsQueue
|
||||
recvRateQueue *statsQueue
|
||||
sendRateQueue *statsQueue
|
||||
recvRateQueue *statsQueue
|
||||
}
|
||||
|
||||
func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
|
||||
ss.State = raft.Follower
|
||||
if leaderName != ss.Leader {
|
||||
ss.Leader = leaderName
|
||||
ss.leaderStartTime = time.Now()
|
||||
if leaderName != ss.LeaderInfo.Name {
|
||||
ss.LeaderInfo.Name = leaderName
|
||||
ss.LeaderInfo.startTime = time.Now()
|
||||
}
|
||||
|
||||
ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
|
||||
@ -64,17 +67,23 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
|
||||
|
||||
func (ss *raftServerStats) SendAppendReq(pkgSize int) {
|
||||
now := time.Now()
|
||||
|
||||
if ss.State != raft.Leader {
|
||||
ss.State = raft.Leader
|
||||
ss.Leader = r.Name()
|
||||
ss.leaderStartTime = now
|
||||
ss.LeaderInfo.Name = r.Name()
|
||||
ss.LeaderInfo.startTime = now
|
||||
}
|
||||
|
||||
ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
|
||||
ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
|
||||
|
||||
ss.SendAppendRequestCnt++
|
||||
}
|
||||
|
||||
type raftPeersStats struct {
|
||||
Leader string `json:"leader"`
|
||||
Peers map[string]*raftPeerStats `json:"peers"`
|
||||
}
|
||||
|
||||
type raftPeerStats struct {
|
||||
Latency float64 `json:"latency"`
|
||||
AvgLatency float64 `json:"averageLatency"`
|
||||
|
@ -66,7 +66,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
|
||||
|
||||
debugf("Send LogEntries to %s ", u)
|
||||
|
||||
thisPeerStats, ok := r.peersStats[peer.Name]
|
||||
thisPeerStats, ok := r.peersStats.Peers[peer.Name]
|
||||
|
||||
start := time.Now()
|
||||
|
||||
@ -85,7 +85,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
|
||||
}
|
||||
}
|
||||
|
||||
r.peersStats[peer.Name] = thisPeerStats
|
||||
r.peersStats.Peers[peer.Name] = thisPeerStats
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
|
17
util.go
17
util.go
@ -128,6 +128,23 @@ func sanitizeListenHost(listen string, advertised string) string {
|
||||
return net.JoinHostPort(listen, aport)
|
||||
}
|
||||
|
||||
func redirect(node string, etcd bool, w http.ResponseWriter, req *http.Request) {
|
||||
var url string
|
||||
path := req.URL.Path
|
||||
|
||||
if etcd {
|
||||
etcdAddr, _ := nameToEtcdURL(node)
|
||||
url = etcdAddr + path
|
||||
} else {
|
||||
raftAddr, _ := nameToRaftURL(node)
|
||||
url = raftAddr + path
|
||||
}
|
||||
|
||||
debugf("Redirect to %s", url)
|
||||
|
||||
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
|
||||
}
|
||||
|
||||
func check(err error) {
|
||||
if err != nil {
|
||||
fatal(err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user