tests: make log monitor as common helper

It's followup of #15667.

This patch is to use zaptest/observer as base to provide a similar
function to pkg/expect.Expect.

The test env

```bash
11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
mkdir /sys/fs/cgroup/etcd-followup-15667
echo 0-2 | tee /sys/fs/cgroup/etcd-followup-15667/cpuset.cpus # three cores
```

Before change:

* memory.peak: ~ 681 MiB
* Elapsed (wall clock) time (h:mm:ss or m:ss): 6:14.04

After change:

* memory.peak: ~ 671 MiB
* Elapsed (wall clock) time (h:mm:ss or m:ss): 6:13.07

Based on the test result, I think it's safe to be enabled by default.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2023-04-14 18:48:24 +08:00
parent 22f3e50ada
commit 50aa00b203
4 changed files with 221 additions and 110 deletions

View File

@ -581,6 +581,8 @@ type Member struct {
Closed bool
GrpcServerRecorder *grpc_testing.GrpcRecorder
LogObserver *testutils.LogObserver
}
func (m *Member) GRPCURL() string { return m.GrpcURL }
@ -730,7 +732,9 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
}
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.GrpcServerRecorder = &grpc_testing.GrpcRecorder{}
m.Logger = memberLogger(t, mcfg.Name)
m.Logger, m.LogObserver = memberLogger(t, mcfg.Name)
m.StrictReconfigCheck = !mcfg.DisableStrictReconfigCheck
if err := m.listenGRPC(); err != nil {
t.Fatalf("listenGRPC FAILED: %v", err)
@ -743,14 +747,23 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
return m
}
func memberLogger(t testutil.TB, name string) *zap.Logger {
func memberLogger(t testutil.TB, name string) (*zap.Logger, *testutils.LogObserver) {
level := zapcore.InfoLevel
if os.Getenv("CLUSTER_DEBUG") != "" {
level = zapcore.DebugLevel
}
options := zaptest.WrapOptions(zap.Fields(zap.String("member", name)))
return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name)
obCore, logOb := testutils.NewLogObserver(level)
options := zaptest.WrapOptions(
zap.Fields(zap.String("member", name)),
// copy logged entities to log observer
zap.WrapCore(func(oldCore zapcore.Core) zapcore.Core {
return zapcore.NewTee(oldCore, obCore)
}),
)
return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name), logOb
}
// listenGRPC starts a grpc server over a unix domain socket on the member
@ -934,7 +947,7 @@ func (m *Member) Clone(t testutil.TB) *Member {
mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo
mm.ClientTLSInfo = m.ClientTLSInfo
mm.Logger = memberLogger(t, mm.Name+"c")
mm.Logger, mm.LogObserver = memberLogger(t, mm.Name+"c")
return mm
}

View File

@ -0,0 +1,101 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testutils
import (
"context"
"fmt"
"strings"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
zapobserver "go.uber.org/zap/zaptest/observer"
)
type LogObserver struct {
ob *zapobserver.ObservedLogs
enc zapcore.Encoder
mu sync.Mutex
// entries stores all the logged entries after syncLogs.
entries []zapobserver.LoggedEntry
}
func NewLogObserver(level zapcore.LevelEnabler) (zapcore.Core, *LogObserver) {
// align with zaptest
enc := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig())
co, ob := zapobserver.New(level)
return co, &LogObserver{
ob: ob,
enc: enc,
}
}
// Expect returns the first N lines containing the given string.
func (logOb *LogObserver) Expect(ctx context.Context, s string, count int) ([]string, error) {
return logOb.ExpectFunc(ctx, func(log string) bool { return strings.Contains(log, s) }, count)
}
// ExpectFunc returns the first N line satisfying the function f.
func (logOb *LogObserver) ExpectFunc(ctx context.Context, filter func(string) bool, count int) ([]string, error) {
i := 0
res := make([]string, 0, count)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
entries := logOb.syncLogs()
// The order of entries won't be changed because of append-only.
// It's safe to skip scanned entries by reusing `i`.
for ; i < len(entries); i++ {
buf, err := logOb.enc.EncodeEntry(entries[i].Entry, entries[i].Context)
if err != nil {
return nil, fmt.Errorf("failed to encode entry: %w", err)
}
logInStr := buf.String()
if filter(logInStr) {
res = append(res, logInStr)
}
if len(res) >= count {
break
}
}
if len(res) >= count {
return res, nil
}
time.Sleep(10 * time.Millisecond)
}
}
// syncLogs is to take all the existing logged entries from zapobserver and
// truncate zapobserver's entries slice.
func (logOb *LogObserver) syncLogs() []zapobserver.LoggedEntry {
logOb.mu.Lock()
defer logOb.mu.Unlock()
logOb.entries = append(logOb.entries, logOb.ob.TakeAll()...)
return logOb.entries
}

View File

@ -0,0 +1,83 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testutils
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestLogObserver_Timeout(t *testing.T) {
logCore, logOb := NewLogObserver(zap.InfoLevel)
logger := zap.New(logCore)
logger.Info(t.Name())
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
_, err := logOb.Expect(ctx, "unknown", 1)
cancel()
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Equal(t, 1, len(logOb.entries))
}
func TestLogObserver_Expect(t *testing.T) {
logCore, logOb := NewLogObserver(zap.InfoLevel)
logger := zap.New(logCore)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
resCh := make(chan []string, 1)
go func() {
defer close(resCh)
res, err := logOb.Expect(ctx, t.Name(), 2)
require.NoError(t, err)
resCh <- res
}()
msgs := []string{"Hello " + t.Name(), t.Name() + ", World"}
for _, msg := range msgs {
logger.Info(msg)
time.Sleep(40 * time.Millisecond)
}
res := <-resCh
assert.Equal(t, 2, len(res))
// The logged message should be like
//
// 2023-04-16T11:46:19.367+0800 INFO Hello TestLogObserver_Expect
// 2023-04-16T11:46:19.408+0800 INFO TestLogObserver_Expect, World
//
// The prefix timestamp is unpredictable so we should assert the suffix
// only.
for idx := range msgs {
expected := fmt.Sprintf("\tINFO\t%s\n", msgs[idx])
assert.True(t, strings.HasSuffix(res[idx], expected))
}
assert.Equal(t, 2, len(logOb.entries))
}

View File

@ -17,14 +17,10 @@ package integration
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
@ -58,9 +54,7 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s
func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)
logMonitor := newTestingLogfMonitor(t)
clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{
clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
@ -95,6 +89,14 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
kvc := integration.ToGRPC(clus.Client(1)).KV
// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
}
}
// NOTE: When starting a new cluster with 3 members, each member will
// apply 3 ConfChange directly at the beginning before a leader is
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
@ -108,24 +110,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
logSubID := "compacted"
logSub := newLineCountExpecter("compacted Raft logs", 4) // two members
logMonitor.addSubscriber(logSubID, logSub)
// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
}
}
// ensure two members has compacted the log twice.
if err := logSub.wait(5 * time.Second); err != nil {
t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds")
}
logMonitor.delSubscriber(logSubID)
t.Logf("two members have compacted raft logs")
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
@ -154,6 +139,8 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives)
}
expectMemberLog(t, clus.Members[0], 5*time.Second, "received and saved database snapshot", 1)
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")
@ -186,88 +173,15 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}
type lineCountExpecter struct {
doneOnce sync.Once
doneCh chan struct{}
content string
count int64
seen int64
}
func newLineCountExpecter(expectedContent string, expectedCount int64) *lineCountExpecter {
return &lineCountExpecter{
doneCh: make(chan struct{}),
content: expectedContent,
count: expectedCount,
}
}
func (le *lineCountExpecter) Notify(log string) {
if !strings.Contains(log, le.content) {
return
}
if atomic.AddInt64(&le.seen, 1) >= le.count {
le.doneOnce.Do(func() {
close(le.doneCh)
})
}
}
func (le *lineCountExpecter) wait(timeout time.Duration) error {
func expectMemberLog(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
select {
case <-le.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
lines, err := m.LogObserver.Expect(ctx, s, count)
if err != nil {
t.Fatalf("failed to expect (log:%s, count:%v): %v", s, count, err)
}
for _, line := range lines {
t.Logf("[expected line]: %v", line)
}
}
type testingLogfSubscriber interface {
Notify(log string)
}
// testingLogfMonitor is to monitor t.Logf output.
type testingLogfMonitor struct {
testutil.TB
mu sync.RWMutex
subscribers map[string]testingLogfSubscriber
}
func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor {
return &testingLogfMonitor{
TB: tb,
subscribers: make(map[string]testingLogfSubscriber),
}
}
func (m *testingLogfMonitor) addSubscriber(id string, sub testingLogfSubscriber) {
m.mu.Lock()
defer m.mu.Unlock()
m.subscribers[id] = sub
}
func (m *testingLogfMonitor) delSubscriber(id string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.subscribers, id)
}
func (m *testingLogfMonitor) Logf(format string, args ...interface{}) {
m.mu.RLock()
if len(m.subscribers) > 0 {
log := fmt.Sprintf(format, args...)
for _, sub := range m.subscribers {
sub.Notify(log)
}
}
m.mu.RUnlock()
m.TB.Logf(format, args...)
}