From d6520303c64fedd1d50361bd56de4e2831b2771e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 29 Feb 2016 16:04:14 -0800 Subject: [PATCH] etcdserver: detect raft starvation caused by contention --- etcdserver/raft.go | 12 +++++++ etcdserver/server.go | 9 +++++ pkg/contention/contention.go | 69 ++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 pkg/contention/contention.go diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 5d299ef7d..a70bbeb31 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -117,6 +118,8 @@ type raftNode struct { // If transport is nil, server will panic. transport rafthttp.Transporter + td *contention.TimeoutDetector + stopped chan struct{} done chan struct{} } @@ -130,6 +133,14 @@ func (r *raftNode) start(s *EtcdServer) { r.stopped = make(chan struct{}) r.done = make(chan struct{}) + heartbeat := 200 * time.Millisecond + if s.cfg != nil { + heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond + } + // set up contention detectors for raft heartbeat message. + // expect to send a heartbeat within 2 heartbeat intervals. + r.td = contention.NewTimeoutDetector(2 * heartbeat) + go func() { var syncC <-chan time.Time @@ -162,6 +173,7 @@ func (r *raftNode) start(s *EtcdServer) { if r.s.compactor != nil { r.s.compactor.Resume() } + r.td.Reset() } else { if r.s.lessor != nil { r.s.lessor.Demote() diff --git a/etcdserver/server.go b/etcdserver/server.go index 0b79fecd4..3aa43d151 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -919,6 +919,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { } } +// TODO: move this function into raft.go func (s *EtcdServer) send(ms []raftpb.Message) { for i := range ms { if s.cluster.IsIDRemoved(types.ID(ms[i].To)) { @@ -940,6 +941,14 @@ func (s *EtcdServer) send(ms []raftpb.Message) { ms[i].To = 0 } } + if ms[i].Type == raftpb.MsgHeartbeat { + ok, exceed := s.r.td.Observe(ms[i].To) + if !ok { + // TODO: limit request rate. + plog.Warningf("failed to send out heartbeat on time (deadline exceeded for %v)", exceed) + plog.Warningf("server is likely overloaded") + } + } } s.r.transport.Send(ms) diff --git a/pkg/contention/contention.go b/pkg/contention/contention.go new file mode 100644 index 000000000..9a96a8d77 --- /dev/null +++ b/pkg/contention/contention.go @@ -0,0 +1,69 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package contention + +import ( + "sync" + "time" +) + +// TimeoutDetector detects routine starvations by +// observing the actual time duration to finish an action +// or between two events that should happen in a fixed +// interval. If the observed duration is longer than +// the expectation, the detector will report the result. +type TimeoutDetector struct { + mu sync.Mutex // protects all + maxDuration time.Duration + // map from event to time + // time is the last seen time of the event. + records map[uint64]time.Time +} + +// NewTimeoutDetector creates the TimeoutDetector. +func NewTimeoutDetector(maxDuration time.Duration) *TimeoutDetector { + return &TimeoutDetector{ + maxDuration: maxDuration, + records: make(map[uint64]time.Time), + } +} + +// Reset resets the NewTimeoutDetector. +func (td *TimeoutDetector) Reset() { + td.mu.Lock() + defer td.mu.Unlock() + + td.records = make(map[uint64]time.Time) +} + +// Observe observes an event for given id. It returns false and exceeded duration +// if the interval is longer than the expectation. +func (td *TimeoutDetector) Observe(which uint64) (bool, time.Duration) { + td.mu.Lock() + defer td.mu.Unlock() + + ok := true + now := time.Now() + exceed := time.Duration(0) + + if pt, found := td.records[which]; found { + exceed = now.Sub(pt) - td.maxDuration + if exceed > 0 { + ok = false + } + } + td.records[which] = now + return ok, exceed +}