From 306085db5f33bdab0e4401f584cdd6976b0b8e71 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 3 Aug 2015 09:06:31 +0800 Subject: [PATCH 1/4] Godeps: add probing dependency --- Godeps/Godeps.json | 4 + .../src/github.com/xiang90/probing/.gitignore | 24 ++++ .../src/github.com/xiang90/probing/LICENSE | 22 ++++ .../src/github.com/xiang90/probing/README.md | 39 ++++++ .../src/github.com/xiang90/probing/prober.go | 112 ++++++++++++++++++ .../github.com/xiang90/probing/prober_test.go | 90 ++++++++++++++ .../src/github.com/xiang90/probing/server.go | 25 ++++ .../src/github.com/xiang90/probing/status.go | 96 +++++++++++++++ 8 files changed, 412 insertions(+) create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/.gitignore create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/LICENSE create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/README.md create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/prober.go create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/server.go create mode 100644 Godeps/_workspace/src/github.com/xiang90/probing/status.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index daabbeedf..3a6a7614d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -102,6 +102,10 @@ "ImportPath": "github.com/stretchr/testify/assert", "Rev": "9cc77fa25329013ce07362c7742952ff887361f2" }, + { + "ImportPath": "github.com/xiang90/probing", + "Rev": "e8a0407769cb84c61c2ddf8f1d9cdae9fb489b9b" + }, { "ImportPath": "golang.org/x/crypto/bcrypt", "Rev": "1351f936d976c60a0a48d728281922cf63eafb8d" diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore b/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore new file mode 100644 index 000000000..daf913b1b --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE b/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE new file mode 100644 index 000000000..cde8b8b05 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Xiang Li + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/README.md b/Godeps/_workspace/src/github.com/xiang90/probing/README.md new file mode 100644 index 000000000..2ff682057 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/README.md @@ -0,0 +1,39 @@ +## Getting Started + +### Install the handler + +We first need to serve the probing HTTP handler. + +```go + http.HandleFunc("/health", probing.NewHandler()) + err := http.ListenAndServe(":12345", nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +``` + +### Start to probe + +Now we can start to probe the endpoint. + +``` go + id := "example" + probingInterval = 5 * time.Second + url := "http://example.com:12345/health" + p.AddHTTP(id, probingInterval, url) + + time.Sleep(13 * time.Second) + status, err := p.Status(id) + fmt.Printf("Total Probing: %d, Total Loss: %d, Estimated RTT: %v, Estimated Clock Difference: %v\n", + status.Total(), status.Loss(), status.SRTT(), status.ClockDiff()) + // Total Probing: 2, Total Loss: 0, Estimated RTT: 320.771µs, Estimated Clock Difference: -35.869µs +``` + +### TODOs: + +- TCP probing +- UDP probing +- Gossip based probing +- More accurate RTT estimation +- More accurate Clock difference estimation +- Use a clock interface rather than the real clock diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/prober.go b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go new file mode 100644 index 000000000..e2aa212a2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go @@ -0,0 +1,112 @@ +package probing + +import ( + "encoding/json" + "errors" + "net/http" + "sync" + "time" +) + +var ( + ErrNotFound = errors.New("probing: id not found") + ErrExist = errors.New("probing: id exists") +) + +type Prober interface { + AddHTTP(id string, probingInterval time.Duration, endpoints []string) error + Remove(id string) error + Reset(id string) error + Status(id string) (Status, error) +} + +type prober struct { + mu sync.Mutex + targets map[string]*status +} + +func NewProber() Prober { + return &prober{targets: make(map[string]*status)} +} + +func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.targets[id]; ok { + return ErrExist + } + + s := &status{stopC: make(chan struct{})} + p.targets[id] = s + + ticker := time.NewTicker(probingInterval) + + go func() { + pinned := 0 + for { + select { + case <-ticker.C: + start := time.Now() + resp, err := http.Get(endpoints[pinned]) + if err != nil { + s.recordFailure() + pinned = (pinned + 1) % len(endpoints) + continue + } + + var hh Health + d := json.NewDecoder(resp.Body) + err = d.Decode(&hh) + resp.Body.Close() + if err != nil || !hh.OK { + s.recordFailure() + pinned = (pinned + 1) % len(endpoints) + continue + } + + s.record(time.Since(start), hh.Now) + case <-s.stopC: + ticker.Stop() + return + } + } + }() + + return nil +} + +func (p *prober) Remove(id string) error { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return ErrNotFound + } + close(s.stopC) + delete(p.targets, id) + return nil +} + +func (p *prober) Reset(id string) error { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return ErrNotFound + } + s.reset() + return nil +} + +func (p *prober) Status(id string) (Status, error) { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return nil, ErrNotFound + } + return s, nil +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go b/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go new file mode 100644 index 000000000..d80bbcaa7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go @@ -0,0 +1,90 @@ +package probing + +import ( + "net/http/httptest" + "testing" + "time" +) + +var ( + testID = "testID" +) + +func TestProbe(t *testing.T) { + s := httptest.NewServer(NewHandler()) + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + defer p.Remove(testID) + + time.Sleep(100 * time.Millisecond) + status, err := p.Status(testID) + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } + + // become unhealthy + s.Close() + + time.Sleep(100 * time.Millisecond) + if total := status.Total(); total < 150 || total > 250 { + t.Fatalf("total = %v, want around %v", total, 200) + } + if loss := status.Loss(); loss < 50 || loss > 150 { + t.Fatalf("loss = %v, want around %v", loss, 200) + } + if health := status.Health(); health != false { + t.Fatalf("health = %v, want %v", health, false) + } +} + +func TestProbeReset(t *testing.T) { + s := httptest.NewServer(NewHandler()) + defer s.Close() + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + defer p.Remove(testID) + + time.Sleep(100 * time.Millisecond) + status, err := p.Status(testID) + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } + + p.Reset(testID) + + time.Sleep(100 * time.Millisecond) + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } +} + +func TestProbeRemove(t *testing.T) { + s := httptest.NewServer(NewHandler()) + defer s.Close() + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + + p.Remove(testID) + _, err := p.Status(testID) + if err != ErrNotFound { + t.Fatalf("err = %v, want %v", err, ErrNotFound) + } +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/server.go b/Godeps/_workspace/src/github.com/xiang90/probing/server.go new file mode 100644 index 000000000..0e7b797d2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/server.go @@ -0,0 +1,25 @@ +package probing + +import ( + "encoding/json" + "net/http" + "time" +) + +func NewHandler() http.Handler { + return &httpHealth{} +} + +type httpHealth struct { +} + +type Health struct { + OK bool + Now time.Time +} + +func (h *httpHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) { + health := Health{OK: true, Now: time.Now()} + e := json.NewEncoder(w) + e.Encode(health) +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/status.go b/Godeps/_workspace/src/github.com/xiang90/probing/status.go new file mode 100644 index 000000000..bdfab2761 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/status.go @@ -0,0 +1,96 @@ +package probing + +import ( + "sync" + "time" +) + +var ( + // weight factor + α = 0.125 +) + +type Status interface { + Total() int64 + Loss() int64 + Health() bool + // Estimated smoothed round trip time + SRTT() time.Duration + // Estimated clock difference + ClockDiff() time.Duration + StopNotify() <-chan struct{} +} + +type status struct { + mu sync.Mutex + srtt time.Duration + total int64 + loss int64 + health bool + clockdiff time.Duration + stopC chan struct{} +} + +// SRTT = (1-α) * SRTT + α * RTT +func (s *status) SRTT() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.srtt +} + +func (s *status) Total() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.total +} + +func (s *status) Loss() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.loss +} + +func (s *status) Health() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.health +} + +func (s *status) ClockDiff() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.clockdiff +} + +func (s *status) StopNotify() <-chan struct{} { + return s.stopC +} + +func (s *status) record(rtt time.Duration, when time.Time) { + s.mu.Lock() + defer s.mu.Unlock() + + s.total += 1 + s.health = true + s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt)) + s.clockdiff = time.Now().Sub(when) - s.srtt/2 +} + +func (s *status) recordFailure() { + s.mu.Lock() + defer s.mu.Unlock() + + s.total++ + s.health = false + s.loss += 1 +} + +func (s *status) reset() { + s.mu.Lock() + defer s.mu.Unlock() + + s.srtt = 0 + s.total = 0 + s.health = false + s.clockdiff = 0 +} From 0fc764200da4c64d74541a20db1e74e9bdfd6ef4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 3 Aug 2015 09:07:16 +0800 Subject: [PATCH 2/4] rafthttp: monitor connection --- rafthttp/http.go | 1 + rafthttp/probing_status.go | 60 ++++++++++++++++++++++++++++++++++++++ rafthttp/transport.go | 15 ++++++++-- rafthttp/transport_test.go | 7 ++++- 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 rafthttp/probing_status.go diff --git a/rafthttp/http.go b/rafthttp/http.go index 8720fb22c..4febf6ff2 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -33,6 +33,7 @@ const ( var ( RaftPrefix = "/raft" + ProbingPrefix = path.Join(RaftPrefix, "probing") RaftStreamPrefix = path.Join(RaftPrefix, "stream") errIncompatibleVersion = errors.New("incompatible version") diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go new file mode 100644 index 000000000..ed042a683 --- /dev/null +++ b/rafthttp/probing_status.go @@ -0,0 +1,60 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" +) + +var ( + // proberInterval must be shorter than read timeout. + // Or the connection will time-out. + proberInterval = ConnReadTimeout - time.Second + statusMonitoringInterval = 30 * time.Second +) + +func addPeerToProber(p probing.Prober, id string, us []string) { + hus := make([]string, len(us)) + for i := range us { + hus[i] = us[i] + ProbingPrefix + } + + p.AddHTTP(id, proberInterval, hus) + + s, err := p.Status(id) + if err != nil { + plog.Errorf("failed to add peer %s into prober", id) + } else { + go monitorProbingStatus(s, id) + } +} + +func monitorProbingStatus(s probing.Status, id string) { + for { + select { + case <-time.After(statusMonitoringInterval): + if !s.Health() { + plog.Warningf("the connection to peer %s is unhealthy", id) + } + if s.ClockDiff() > time.Second { + plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) + } + case <-s.StopNotify(): + return + } + } +} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index d3b9607fa..8441dcba5 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" @@ -83,7 +84,9 @@ type transport struct { term uint64 // the latest term that has been observed remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - errorc chan error + + prober probing.Prober + errorc chan error } func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter { @@ -96,7 +99,9 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan leaderStats: ls, remotes: make(map[types.ID]*remote), peers: make(map[types.ID]Peer), - errorc: errorc, + + prober: probing.NewProber(), + errorc: errorc, } } @@ -106,6 +111,7 @@ func (t *transport) Handler() http.Handler { mux := http.NewServeMux() mux.Handle(RaftPrefix, pipelineHandler) mux.Handle(RaftStreamPrefix+"/", streamHandler) + mux.Handle(ProbingPrefix, probing.NewHandler()) return mux } @@ -195,6 +201,7 @@ func (t *transport) AddPeer(id types.ID, us []string) { } fs := t.leaderStats.Follower(id.String()) t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc, t.term) + addPeerToProber(t.prober, id.String(), us) } func (t *transport) RemovePeer(id types.ID) { @@ -220,6 +227,7 @@ func (t *transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.leaderStats.Followers, id.String()) + t.prober.Remove(id.String()) } func (t *transport) UpdatePeer(id types.ID, us []string) { @@ -234,6 +242,9 @@ func (t *transport) UpdatePeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } t.peers[id].Update(urls) + + t.prober.Remove(id.String()) + addPeerToProber(t.prober, id.String(), us) } type Pausable interface { diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 6c4a3a37c..d0492109a 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" @@ -73,6 +74,7 @@ func TestTransportAdd(t *testing.T) { leaderStats: ls, term: term, peers: make(map[types.ID]Peer), + prober: probing.NewProber(), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -104,6 +106,7 @@ func TestTransportRemove(t *testing.T) { roundTripper: &roundTripperRecorder{}, leaderStats: stats.NewLeaderStats(""), peers: make(map[types.ID]Peer), + prober: probing.NewProber(), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -117,7 +120,8 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, + peers: map[types.ID]Peer{types.ID(1): peer}, + prober: probing.NewProber(), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -133,6 +137,7 @@ func TestTransportErrorc(t *testing.T) { roundTripper: newRespRoundTripper(http.StatusForbidden, nil), leaderStats: stats.NewLeaderStats(""), peers: make(map[types.ID]Peer), + prober: probing.NewProber(), errorc: errorc, } tr.AddPeer(1, []string{"http://localhost:2380"}) From 709718ed97bbdaec0ed09e05b0b2da7937a67e06 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 4 Aug 2015 17:40:39 +0800 Subject: [PATCH 3/4] godeps: update probing pkg --- Godeps/Godeps.json | 2 +- .../src/github.com/xiang90/probing/prober.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 3a6a7614d..8086eb795 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -104,7 +104,7 @@ }, { "ImportPath": "github.com/xiang90/probing", - "Rev": "e8a0407769cb84c61c2ddf8f1d9cdae9fb489b9b" + "Rev": "11caf1c32ca4055f97e55541e92a75966635981d" }, { "ImportPath": "golang.org/x/crypto/bcrypt", diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/prober.go b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go index e2aa212a2..d34fa2b22 100644 --- a/Godeps/_workspace/src/github.com/xiang90/probing/prober.go +++ b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go @@ -16,6 +16,7 @@ var ( type Prober interface { AddHTTP(id string, probingInterval time.Duration, endpoints []string) error Remove(id string) error + RemoveAll() Reset(id string) error Status(id string) (Status, error) } @@ -88,6 +89,16 @@ func (p *prober) Remove(id string) error { return nil } +func (p *prober) RemoveAll() { + p.mu.Lock() + defer p.mu.Unlock() + + for _, s := range p.targets { + close(s.stopC) + } + p.targets = make(map[string]*status) +} + func (p *prober) Reset(id string) error { p.mu.Lock() defer p.mu.Unlock() From 1e048b5c24312a013042a7618ef007952745af9b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 4 Aug 2015 17:42:51 +0800 Subject: [PATCH 4/4] rafthttp: cleanup prober when stopping the transport --- rafthttp/transport.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 8441dcba5..e856ed2ce 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -171,6 +171,7 @@ func (t *transport) Stop() { for _, p := range t.peers { p.Stop() } + t.prober.RemoveAll() if tr, ok := t.roundTripper.(*http.Transport); ok { tr.CloseIdleConnections() }