diff --git a/pkg/logutil/merge_logger.go b/pkg/logutil/merge_logger.go new file mode 100644 index 000000000..98e4c7c5a --- /dev/null +++ b/pkg/logutil/merge_logger.go @@ -0,0 +1,192 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package logutil includes utilities to faciliate logging. +package logutil + +import ( + "fmt" + "sync" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" +) + +var ( + defaultMergePeriod = time.Second + + outputInterval = time.Second +) + +// line represents a log line that can be printed out +// through capnslog.PackageLogger. +type line struct { + level capnslog.LogLevel + str string +} + +func (l line) append(s string) line { + return line{ + level: l.level, + str: l.str + " " + s, + } +} + +// status represents the merge status of a line. +type status struct { + period time.Duration + + start time.Time // start time of latest merge period + count int // number of merged lines from starting +} + +func (s *status) isInMergePeriod(now time.Time) bool { + return s.period == 0 || s.start.Add(s.period).After(now) +} + +func (s *status) isEmpty() bool { return s.count == 0 } + +func (s *status) summary(now time.Time) string { + return fmt.Sprintf("[merged %d repeated lines in %s]", s.count, now.Sub(s.start)) +} + +func (s *status) reset(now time.Time) { + s.start = now + s.count = 0 +} + +// MergeLogger supports merge logging, which merges repeated log lines +// and prints summary log lines instead. +// +// For merge logging, MergeLogger prints out the line when the line appears +// at the first time. MergeLogger holds the same log line printed within +// defaultMergePeriod, and prints out summary log line at the end of defaultMergePeriod. +// It stops merging when the line doesn't appear within the +// defaultMergePeriod. +type MergeLogger struct { + *capnslog.PackageLogger + + mu sync.Mutex // protect statusm + statusm map[line]*status +} + +func NewMergeLogger(logger *capnslog.PackageLogger) *MergeLogger { + l := &MergeLogger{ + PackageLogger: logger, + statusm: make(map[line]*status), + } + go l.outputLoop() + return l +} + +func (l *MergeLogger) MergeInfo(entries ...interface{}) { + l.merge(line{ + level: capnslog.INFO, + str: fmt.Sprint(entries...), + }) +} + +func (l *MergeLogger) MergeInfof(format string, args ...interface{}) { + l.merge(line{ + level: capnslog.INFO, + str: fmt.Sprintf(format, args...), + }) +} + +func (l *MergeLogger) MergeNotice(entries ...interface{}) { + l.merge(line{ + level: capnslog.NOTICE, + str: fmt.Sprint(entries...), + }) +} + +func (l *MergeLogger) MergeNoticef(format string, args ...interface{}) { + l.merge(line{ + level: capnslog.NOTICE, + str: fmt.Sprintf(format, args...), + }) +} + +func (l *MergeLogger) MergeWarning(entries ...interface{}) { + l.merge(line{ + level: capnslog.WARNING, + str: fmt.Sprint(entries...), + }) +} + +func (l *MergeLogger) MergeWarningf(format string, args ...interface{}) { + l.merge(line{ + level: capnslog.WARNING, + str: fmt.Sprintf(format, args...), + }) +} + +func (l *MergeLogger) MergeError(entries ...interface{}) { + l.merge(line{ + level: capnslog.ERROR, + str: fmt.Sprint(entries...), + }) +} + +func (l *MergeLogger) MergeErrorf(format string, args ...interface{}) { + l.merge(line{ + level: capnslog.ERROR, + str: fmt.Sprintf(format, args...), + }) +} + +func (l *MergeLogger) merge(ln line) { + l.mu.Lock() + + // increase count if the logger is merging the line + if status, ok := l.statusm[ln]; ok { + status.count++ + l.mu.Unlock() + return + } + + // initialize status of the line + l.statusm[ln] = &status{ + period: defaultMergePeriod, + start: time.Now(), + } + // release the lock before IO operation + l.mu.Unlock() + // print out the line at its first time + l.PackageLogger.Logf(ln.level, ln.str) +} + +func (l *MergeLogger) outputLoop() { + for now := range time.Tick(outputInterval) { + var outputs []line + + l.mu.Lock() + for ln, status := range l.statusm { + if status.isInMergePeriod(now) { + continue + } + if status.isEmpty() { + delete(l.statusm, ln) + continue + } + outputs = append(outputs, ln.append(status.summary(now))) + status.reset(now) + } + l.mu.Unlock() + + for _, o := range outputs { + l.PackageLogger.Logf(o.level, o.str) + } + } +} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 8a95d5031..f55628ef3 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -171,7 +171,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } if status.isActive() { - plog.Warningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) + plog.MergeWarningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) } else { plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) } diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index 97893fc92..d838b7ab3 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -31,14 +31,12 @@ type peerStatus struct { id types.ID mu sync.Mutex // protect variables below active bool - failureMap map[failureType]string activeSince time.Time } func newPeerStatus(id types.ID) *peerStatus { return &peerStatus{ - id: id, - failureMap: make(map[failureType]string), + id: id, } } @@ -49,25 +47,21 @@ func (s *peerStatus) activate() { plog.Infof("the connection with %s became active", s.id) s.active = true s.activeSince = time.Now() - s.failureMap = make(map[failureType]string) } } func (s *peerStatus) deactivate(failure failureType, reason string) { s.mu.Lock() defer s.mu.Unlock() + msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) if s.active { + plog.Errorf(msg) plog.Infof("the connection with %s became inactive", s.id) s.active = false s.activeSince = time.Time{} - } - logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) - if r, ok := s.failureMap[failure]; ok && r == reason { - plog.Debugf(logline) return } - s.failureMap[failure] = reason - plog.Errorf(logline) + plog.Debugf(msg) } func (s *peerStatus) isActive() bool { diff --git a/rafthttp/remote.go b/rafthttp/remote.go index 9b3bfcd36..0e63dee8b 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -42,7 +42,7 @@ func (g *remote) send(m raftpb.Message) { case g.pipeline.msgc <- m: default: if g.status.isActive() { - plog.Warningf("dropped %s to %s since sending buffer is full", m.Type, g.id) + plog.MergeWarningf("dropped %s to %s since sending buffer is full", m.Type, g.id) } else { plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id) } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index bc3674bd7..e83392e30 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -331,7 +331,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { case recvc <- m: default: if cr.status.isActive() { - plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) + plog.MergeWarningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) } else { plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 6a6fa12f6..f0020d117 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -24,13 +24,14 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" ) -var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp") +var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")) type Raft interface { Process(ctx context.Context, m raftpb.Message) error