From 6ef18b1ae3cf3c06c11ca73d670faef1900f2a67 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 21 Aug 2013 13:35:15 -0700 Subject: [PATCH] thread-saft queue --- raft_server.go | 22 +++++++++------ raft_stats.go | 72 ++++++++++++++++++++++++++++++++------------------ transporter.go | 7 +++-- 3 files changed, 65 insertions(+), 36 deletions(-) diff --git a/raft_server.go b/raft_server.go index 651e5d032..d447b16aa 100644 --- a/raft_server.go +++ b/raft_server.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "container/list" "crypto/tls" "encoding/binary" "encoding/json" @@ -49,8 +48,10 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo tlsInfo: tlsInfo, peersStats: make(map[string]*raftPeerStats), serverStats: &raftServerStats{ - StartTime: time.Now(), - sendRateQueue: list.New(), + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, }, } } @@ -281,14 +282,19 @@ func (r *raftServer) Stats() []byte { queue := r.serverStats.sendRateQueue - frontValue, _ := queue.Front().Value.(time.Time) - backValue, _ := queue.Back().Value.(time.Time) + front, back := queue.FrontAndBack() - sampleDuration := backValue.Sub(frontValue) + sampleDuration := back.Time().Sub(front.Time()) - r.serverStats.SendingRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) + r.serverStats.SendingPkgRate = float64(queue.Len()) / float64(sampleDuration) * float64(time.Second) - sBytes, _ := json.Marshal(r.serverStats) + r.serverStats.SendingBandwidthRate = float64(queue.Size()) / float64(sampleDuration) * float64(time.Second) + + sBytes, err := json.Marshal(r.serverStats) + + if err != nil { + warn(err) + } pBytes, _ := json.Marshal(r.peersStats) diff --git a/raft_stats.go b/raft_stats.go index 9c572afc9..3e125e9a2 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -2,7 +2,9 @@ package main import ( "container/list" + "fmt" "math" + "sync" "time" "github.com/coreos/go-raft" @@ -17,7 +19,18 @@ type runtimeStats struct { type packageStats struct { sendingTime time.Time - size uint64 + size int +} + +func NewPackageStats(now time.Time, size int) *packageStats { + return &packageStats{ + sendingTime: now, + size: size, + } +} + +func (ps *packageStats) Time() time.Time { + return ps.sendingTime } type raftServerStats struct { @@ -29,9 +42,10 @@ type raftServerStats struct { RecvAppendRequestCnt uint64 SendAppendRequestCnt uint64 SendAppendReqeustRate uint64 - sendRateQueue *list.List + sendRateQueue *statsQueue recvRateQueue *list.List - SendingRate float64 + SendingPkgRate float64 + SendingBandwidthRate float64 } func (ss *raftServerStats) RecvAppendReq(leaderName string) { @@ -44,7 +58,7 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string) { ss.RecvAppendRequestCnt++ } -func (ss *raftServerStats) SendAppendReq() { +func (ss *raftServerStats) SendAppendReq(pkgSize int) { now := time.Now() if ss.State != raft.Leader { ss.State = raft.Leader @@ -52,12 +66,7 @@ func (ss *raftServerStats) SendAppendReq() { ss.leaderStartTime = now } - if ss.sendRateQueue.Len() < 200 { - ss.sendRateQueue.PushBack(now) - } else { - ss.sendRateQueue.PushBack(now) - ss.sendRateQueue.Remove(ss.sendRateQueue.Front()) - } + ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) ss.SendAppendRequestCnt++ } @@ -102,36 +111,47 @@ func (ps *raftPeerStats) Succ(d time.Duration) { } type statsQueue struct { - items [queueCapacity]*packageStats - size int - front int - back int + items [queueCapacity]*packageStats + size int + front int + back int + totalPkgSize int + rwl sync.RWMutex } func (q *statsQueue) Len() int { return q.size } -func (q *statsQueue) Front() *packageStats { - if q.size != 0 { - return q.items[q.front] - } - return nil +func (q *statsQueue) Size() int { + return q.totalPkgSize } -func (q *statsQueue) Back() *packageStats { +// FrontAndBack gets the front and back elements in the queue +// We must grab front and back together with the protection of the lock +func (q *statsQueue) FrontAndBack() (*packageStats, *packageStats) { + q.rwl.RLock() + defer q.rwl.RUnlock() if q.size != 0 { - return q.items[q.back] + return q.items[q.front], q.items[q.back] } - return nil + return nil, nil } func (q *statsQueue) Insert(p *packageStats) { - q.back = (q.back + 1) % queueCapacity - q.items[q.back] = p - if q.size == queueCapacity { - q.front = (q.back + 1) % queueCapacity + q.rwl.Lock() + defer q.rwl.Unlock() + + if q.size == queueCapacity { //dequeue + q.totalPkgSize -= q.items[q.front].size + q.front = (q.back + 2) % queueCapacity } else { q.size++ } + + q.back = (q.back + 1) % queueCapacity + q.items[q.back] = p + q.totalPkgSize += q.items[q.back].size + + fmt.Println(q.front, q.back, q.size) } diff --git a/transporter.go b/transporter.go index e476fdf05..e82d3668c 100644 --- a/transporter.go +++ b/transporter.go @@ -48,11 +48,14 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var aersp *raft.AppendEntriesResponse var b bytes.Buffer - r.serverStats.SendAppendReq() - json.NewEncoder(&b).Encode(req) + size := b.Len() + + r.serverStats.SendAppendReq(size) + u, _ := nameToRaftURL(peer.Name) + debugf("Send LogEntries to %s ", u) thisPeerStats, ok := r.peersStats[peer.Name]