From c30b82b59623dea12f8c48527564db35995734d3 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Thu, 16 Oct 2014 16:56:58 -0700 Subject: [PATCH 1/5] etcdserver: fix data race in retrieving self stats --- etcdserver/etcdhttp/http.go | 12 +++--------- etcdserver/etcdhttp/http_test.go | 9 +++++---- etcdserver/server.go | 27 +++++++++++++++++++-------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 3bc4e880d..4c8bac0e2 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -42,6 +42,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { server: server, clusterStore: server.ClusterStore, stats: server, + storestats: server, timer: server, timeout: defaultServerTimeout, } @@ -175,22 +176,15 @@ func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/json") - w.Write(h.storestats.JSON()) + w.Write(h.storestats.StoreStatsJSON()) } func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } - s := h.stats.SelfStats() - b, err := json.Marshal(s) - if err != nil { - log.Printf("error marshalling stats: %v\n", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } w.Header().Set("Content-Type", "application/json") - w.Write(b) + w.Write(h.stats.SelfStatsJSON()) } func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) { diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 50d0edfd6..e867e532d 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -639,11 +639,12 @@ func TestServeMachines(t *testing.T) { } type dummyServerStats struct { - ss *stats.ServerStats + js []byte ls *stats.LeaderStats } -func (dss *dummyServerStats) SelfStats() *stats.ServerStats { return dss.ss } +func (dss *dummyServerStats) SelfStatsJSON() []byte { return dss.js } +func (dss *dummyServerStats) SelfStats() *stats.ServerStats { return nil } func (dss *dummyServerStats) LeaderStats() *stats.LeaderStats { return dss.ls } func TestServeSelfStats(t *testing.T) { @@ -657,7 +658,7 @@ func TestServeSelfStats(t *testing.T) { } sh := &serverHandler{ stats: &dummyServerStats{ - ss: ss, + js: w, }, } rw := httptest.NewRecorder() @@ -737,7 +738,7 @@ type dummyStoreStats struct { data []byte } -func (dss *dummyStoreStats) JSON() []byte { return dss.data } +func (dss *dummyStoreStats) StoreStatsJSON() []byte { return dss.data } func TestServeStoreStats(t *testing.T) { w := "foobarbaz" diff --git a/etcdserver/server.go b/etcdserver/server.go index 3dafd31f8..85bdf3191 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -91,17 +91,18 @@ type Server interface { } type ServerStats interface { - // SelfStats returns the statistics of this server + // SelfStats returns the struct representing statistics of this server SelfStats() *stats.ServerStats + // SelfStats returns the statistics of this server in JSON form + SelfStatsJSON() []byte // LeaderStats returns the statistics of all followers in the cluster // if this server is leader. Otherwise, nil is returned. LeaderStats() *stats.LeaderStats } type StoreStats interface { - // JSON returns statistics of the underlying Store used by the - // EtcdServer, in JSON format - JSON() []byte + // StoreStatsJSON returns statistics of the store in JSON format + StoreStatsJSON() []byte } type RaftTimer interface { @@ -364,18 +365,28 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } func (s *EtcdServer) SelfStats() *stats.ServerStats { - s.stats.LeaderInfo.Uptime = time.Now().Sub(s.stats.LeaderInfo.StartTime).String() - s.stats.SendingPkgRate, s.stats.SendingBandwidthRate = s.stats.SendRates() - s.stats.RecvingPkgRate, s.stats.RecvingBandwidthRate = s.stats.RecvRates() return s.stats } +func (s *EtcdServer) SelfStatsJSON() []byte { + stats := *s.stats + stats.LeaderInfo.Uptime = time.Now().Sub(stats.LeaderInfo.StartTime).String() + stats.SendingPkgRate, stats.SendingBandwidthRate = stats.SendRates() + stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.RecvRates() + b, err := json.Marshal(s.stats) + // TODO(jonboulle): appropriate error handling? + if err != nil { + log.Printf("error marshalling self stats: %v", err) + } + return b +} + func (s *EtcdServer) LeaderStats() *stats.LeaderStats { // TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ... return s.lstats } -func (s *EtcdServer) StoreStats() []byte { +func (s *EtcdServer) StoreStatsJSON() []byte { return s.store.JsonStats() } From c28907ba95dc808dd33594fa7f0a2836afd62c8a Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Thu, 16 Oct 2014 18:39:55 -0700 Subject: [PATCH 2/5] etcdserver: fix race and improve stats interfaces --- etcdserver/cluster_store.go | 3 +- etcdserver/etcdhttp/http.go | 20 +++-------- etcdserver/etcdhttp/http_test.go | 59 ++++++++++---------------------- etcdserver/member.go | 2 +- etcdserver/server.go | 52 ++++++++++++---------------- etcdserver/stats/leader.go | 6 ++++ etcdserver/stats/server.go | 16 +++++++++ 7 files changed, 70 insertions(+), 88 deletions(-) diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 68a5fef11..743bc3fc2 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "net/http" - "strconv" "time" etcdErr "github.com/coreos/etcd/error" @@ -148,7 +147,7 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerSt if m.Type == raftpb.MsgApp { ss.SendAppendReq(len(data)) } - to := strconv.FormatUint(m.To, 16) + to := idAsHex(m.To) fs := ls.Follower(to) start := time.Now() diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 4c8bac0e2..ac89a6b96 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -42,7 +42,6 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { server: server, clusterStore: server.ClusterStore, stats: server, - storestats: server, timer: server, timeout: defaultServerTimeout, } @@ -77,8 +76,7 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { type serverHandler struct { timeout time.Duration server etcdserver.Server - stats etcdserver.ServerStats - storestats etcdserver.StoreStats + stats etcdserver.Stats timer etcdserver.RaftTimer clusterStore etcdserver.ClusterStore } @@ -176,7 +174,7 @@ func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/json") - w.Write(h.storestats.StoreStatsJSON()) + w.Write(h.stats.StoreStats()) } func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { @@ -184,22 +182,15 @@ func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/json") - w.Write(h.stats.SelfStatsJSON()) + w.Write(h.stats.SelfStats()) } func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } - s := h.stats.LeaderStats() - b, err := json.Marshal(s) - if err != nil { - log.Printf("error marshalling stats: %v\n", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } w.Header().Set("Content-Type", "application/json") - w.Write(b) + w.Write(h.stats.LeaderStats()) } func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { @@ -221,8 +212,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { } log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m) if m.Type == raftpb.MsgApp { - // TODO(jonboulle): standardize id uint-->string process: always base 16? - h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength)) + h.stats.UpdateRecvApp(m.From, r.ContentLength) } if err := h.server.Process(context.TODO(), m); err != nil { log.Println("etcdhttp: error processing raft message:", err) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index e867e532d..87e39aab1 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -19,7 +19,6 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) @@ -638,28 +637,20 @@ func TestServeMachines(t *testing.T) { } } -type dummyServerStats struct { - js []byte - ls *stats.LeaderStats +type dummyStats struct { + data []byte } -func (dss *dummyServerStats) SelfStatsJSON() []byte { return dss.js } -func (dss *dummyServerStats) SelfStats() *stats.ServerStats { return nil } -func (dss *dummyServerStats) LeaderStats() *stats.LeaderStats { return dss.ls } +func (ds *dummyStats) SelfStats() []byte { return ds.data } +func (ds *dummyStats) LeaderStats() []byte { return ds.data } +func (ds *dummyStats) StoreStats() []byte { return ds.data } +func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {} func TestServeSelfStats(t *testing.T) { - ss := &stats.ServerStats{ - Name: "foobar", - RecvingPkgRate: 123.4, - } - w, err := json.Marshal(ss) - if err != nil { - t.Fatal("error marshaling: %v", err) - } + wb := []byte("some statistics") + w := string(wb) sh := &serverHandler{ - stats: &dummyServerStats{ - js: w, - }, + stats: &dummyStats{data: wb}, } rw := httptest.NewRecorder() sh.serveSelfStats(rw, &http.Request{Method: "GET"}) @@ -670,8 +661,8 @@ func TestServeSelfStats(t *testing.T) { if gct := rw.Header().Get("Content-Type"); gct != wct { t.Errorf("Content-Type = %q, want %q", gct, wct) } - if g := rw.Body.String(); g != string(w) { - t.Errorf("body = %s, want %s", g, string(w)) + if g := rw.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) } } @@ -708,17 +699,10 @@ func TestLeaderServeStatsBad(t *testing.T) { } func TestServeLeaderStats(t *testing.T) { - ls := &stats.LeaderStats{ - Leader: "foobar", - } - w, err := json.Marshal(ls) - if err != nil { - t.Fatal("error marshaling: %v", err) - } + wb := []byte("some statistics") + w := string(wb) sh := &serverHandler{ - stats: &dummyServerStats{ - ls: ls, - }, + stats: &dummyStats{data: wb}, } rw := httptest.NewRecorder() sh.serveLeaderStats(rw, &http.Request{Method: "GET"}) @@ -729,21 +713,16 @@ func TestServeLeaderStats(t *testing.T) { if gct := rw.Header().Get("Content-Type"); gct != wct { t.Errorf("Content-Type = %q, want %q", gct, wct) } - if g := rw.Body.String(); g != string(w) { - t.Errorf("body = %s, want %s", g, string(w)) + if g := rw.Body.String(); g != w { + t.Errorf("body = %s, want %s", g, w) } } -type dummyStoreStats struct { - data []byte -} - -func (dss *dummyStoreStats) StoreStatsJSON() []byte { return dss.data } - func TestServeStoreStats(t *testing.T) { - w := "foobarbaz" + wb := []byte("some statistics") + w := string(wb) sh := &serverHandler{ - storestats: &dummyStoreStats{data: []byte(w)}, + stats: &dummyStats{data: wb}, } rw := httptest.NewRecorder() sh.serveStoreStats(rw, &http.Request{Method: "GET"}) diff --git a/etcdserver/member.go b/etcdserver/member.go index 5bce76b6d..1831f0002 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -55,7 +55,7 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member { } func (m Member) storeKey() string { - return path.Join(membersKVPrefix, strconv.FormatUint(m.ID, 16)) + return path.Join(membersKVPrefix, idAsHex(m.ID)) } func parseMemberID(key string) uint64 { diff --git a/etcdserver/server.go b/etcdserver/server.go index 85bdf3191..456f1818a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -90,19 +90,16 @@ type Server interface { RemoveMember(ctx context.Context, id uint64) error } -type ServerStats interface { +type Stats interface { // SelfStats returns the struct representing statistics of this server - SelfStats() *stats.ServerStats - // SelfStats returns the statistics of this server in JSON form - SelfStatsJSON() []byte + SelfStats() []byte // LeaderStats returns the statistics of all followers in the cluster // if this server is leader. Otherwise, nil is returned. - LeaderStats() *stats.LeaderStats -} - -type StoreStats interface { - // StoreStatsJSON returns statistics of the store in JSON format - StoreStatsJSON() []byte + LeaderStats() []byte + // StoreStats returns statistics of the store backing this EtcdServer + StoreStats() []byte + // UpdateRecvApp updates the underlying statistics in response to a receiving an Append request + UpdateRecvApp(from uint64, length int64) } type RaftTimer interface { @@ -195,9 +192,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer { sstats := &stats.ServerStats{ Name: cfg.Name, - ID: strconv.FormatUint(cfg.ID(), 16), + ID: idAsHex(cfg.ID()), } - lstats := stats.NewLeaderStats(strconv.FormatUint(cfg.ID(), 16)) + lstats := stats.NewLeaderStats(idAsHex(cfg.ID())) s := &EtcdServer{ store: st, @@ -364,32 +361,23 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } -func (s *EtcdServer) SelfStats() *stats.ServerStats { - return s.stats +func (s *EtcdServer) SelfStats() []byte { + return s.stats.JSON() } -func (s *EtcdServer) SelfStatsJSON() []byte { - stats := *s.stats - stats.LeaderInfo.Uptime = time.Now().Sub(stats.LeaderInfo.StartTime).String() - stats.SendingPkgRate, stats.SendingBandwidthRate = stats.SendRates() - stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.RecvRates() - b, err := json.Marshal(s.stats) - // TODO(jonboulle): appropriate error handling? - if err != nil { - log.Printf("error marshalling self stats: %v", err) - } - return b -} - -func (s *EtcdServer) LeaderStats() *stats.LeaderStats { +func (s *EtcdServer) LeaderStats() []byte { // TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ... - return s.lstats + return s.lstats.JSON() } -func (s *EtcdServer) StoreStatsJSON() []byte { +func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } +func (s *EtcdServer) UpdateRecvApp(from uint64, length int64) { + s.stats.RecvAppendReq(idAsHex(from), int(length)) +} + func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { // TODO: move Member to protobuf type b, err := json.Marshal(memb) @@ -691,3 +679,7 @@ func containsUint64(a []uint64, x uint64) bool { } return false } + +func idAsHex(id uint64) string { + return strconv.FormatUint(id, 16) +} diff --git a/etcdserver/stats/leader.go b/etcdserver/stats/leader.go index 96537859c..de5c36647 100644 --- a/etcdserver/stats/leader.go +++ b/etcdserver/stats/leader.go @@ -1,6 +1,7 @@ package stats import ( + "encoding/json" "math" "sync" "time" @@ -24,6 +25,11 @@ func NewLeaderStats(id string) *LeaderStats { } } +func (ls *LeaderStats) JSON() []byte { + b, _ := json.Marshal(ls) + return b +} + func (ls *LeaderStats) Follower(name string) *FollowerStats { ls.Lock() defer ls.Unlock() diff --git a/etcdserver/stats/server.go b/etcdserver/stats/server.go index 320b453f2..4233ddbbb 100644 --- a/etcdserver/stats/server.go +++ b/etcdserver/stats/server.go @@ -1,6 +1,8 @@ package stats import ( + "encoding/json" + "log" "sync" "time" @@ -36,6 +38,20 @@ type ServerStats struct { sync.Mutex } +func (ss *ServerStats) JSON() []byte { + ss.Lock() + defer ss.Unlock() + ss.LeaderInfo.Uptime = time.Now().Sub(ss.LeaderInfo.StartTime).String() + ss.SendingPkgRate, ss.SendingBandwidthRate = ss.SendRates() + ss.RecvingPkgRate, ss.RecvingBandwidthRate = ss.RecvRates() + b, err := json.Marshal(ss) + // TODO(jonboulle): appropriate error handling? + if err != nil { + log.Printf("error marshalling server stats: %v", err) + } + return b +} + // Initialize clears the statistics of ServerStats and resets its start time func (ss *ServerStats) Initialize() { if ss == nil { From 233e940410b87a6893181705a4794df24be04d5d Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Thu, 16 Oct 2014 19:49:35 -0700 Subject: [PATCH 3/5] etcdserver: copy stats instead of marshaling with lock --- etcdserver/stats/server.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/etcdserver/stats/server.go b/etcdserver/stats/server.go index 4233ddbbb..2b9e2a3d9 100644 --- a/etcdserver/stats/server.go +++ b/etcdserver/stats/server.go @@ -40,11 +40,12 @@ type ServerStats struct { func (ss *ServerStats) JSON() []byte { ss.Lock() - defer ss.Unlock() - ss.LeaderInfo.Uptime = time.Now().Sub(ss.LeaderInfo.StartTime).String() - ss.SendingPkgRate, ss.SendingBandwidthRate = ss.SendRates() - ss.RecvingPkgRate, ss.RecvingBandwidthRate = ss.RecvRates() - b, err := json.Marshal(ss) + stats := *ss + ss.Unlock() + stats.LeaderInfo.Uptime = time.Now().Sub(stats.LeaderInfo.StartTime).String() + stats.SendingPkgRate, stats.SendingBandwidthRate = stats.SendRates() + stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.RecvRates() + b, err := json.Marshal(stats) // TODO(jonboulle): appropriate error handling? if err != nil { log.Printf("error marshalling server stats: %v", err) From 82023c591daaf26268813c12ad8e0fa76f64e186 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Thu, 16 Oct 2014 19:51:38 -0700 Subject: [PATCH 4/5] etcdserver/stats: log any marshaling error --- etcdserver/stats/leader.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/etcdserver/stats/leader.go b/etcdserver/stats/leader.go index de5c36647..ad03ec0b0 100644 --- a/etcdserver/stats/leader.go +++ b/etcdserver/stats/leader.go @@ -2,6 +2,7 @@ package stats import ( "encoding/json" + "log" "math" "sync" "time" @@ -26,7 +27,11 @@ func NewLeaderStats(id string) *LeaderStats { } func (ls *LeaderStats) JSON() []byte { - b, _ := json.Marshal(ls) + b, err := json.Marshal(ls) + // TODO(jonboulle): appropriate error handling? + if err != nil { + log.Printf("error marshalling leader stats: %v", err) + } return b } From da64e7509c48ee11dfffee6462fc843ac5318254 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Fri, 17 Oct 2014 00:11:25 -0700 Subject: [PATCH 5/5] etcdserver/stats: lock on leaderstats too --- etcdserver/stats/leader.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/etcdserver/stats/leader.go b/etcdserver/stats/leader.go index ad03ec0b0..c8d8d4eb0 100644 --- a/etcdserver/stats/leader.go +++ b/etcdserver/stats/leader.go @@ -27,7 +27,10 @@ func NewLeaderStats(id string) *LeaderStats { } func (ls *LeaderStats) JSON() []byte { - b, err := json.Marshal(ls) + ls.Lock() + stats := *ls + ls.Unlock() + b, err := json.Marshal(stats) // TODO(jonboulle): appropriate error handling? if err != nil { log.Printf("error marshalling leader stats: %v", err)