From 0e5d81704f69d7c46d0ce3b384ef4c8c17d9cde9 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 1 Oct 2020 14:34:58 +0200 Subject: [PATCH 1/5] .travis.yml, scripts: Fix minor bugs in the test script. 1. setting environment variable cannot be in quote 2. "--race" testing for unit tests is supposed to be part of linux-amd64-unit-4-cpu-race config. 3. 'run' function in test script should log_error in case of failed command (wrong operator for ints comparison in bash). --- .travis.yml | 2 +- scripts/test_lib.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index a47c98451..830d89bac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -125,7 +125,7 @@ script: sudo HOST_TMP_DIR=/tmp TEST_OPTS="VERBOSE='1'" make docker-test-coverage ;; linux-amd64-fmt-unit-go-tip-2-cpu) - GOARCH=amd64 PASSES='fmt unit' 'CPU=2' ./test -p=2 + GOARCH=amd64 PASSES='fmt unit' CPU='2' RACE='false' ./test -p=2 ;; linux-386-unit-1-cpu) docker run --rm \ diff --git a/scripts/test_lib.sh b/scripts/test_lib.sh index 6207c9c8e..51279c4ca 100644 --- a/scripts/test_lib.sh +++ b/scripts/test_lib.sh @@ -72,7 +72,7 @@ function run { log_callout "% ${repro}" "${@}" local error_code=$? - if [ ${error_code} != 0 ]; then + if [ ${error_code} -ne 0 ]; then log_error -e "FAIL: (code:${error_code}):\n % ${repro}" return ${error_code} fi From 528f5315d63da7f0585799a9c6ef4751db382f3e Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 1 Oct 2020 16:03:05 +0200 Subject: [PATCH 2/5] auth: Fix "race" - auth unit tests leaking goroutines - We were leaking goroutines in auth-test - The go-routines were depending / modifying global test environment variables (simpleTokenTTLDefault) leading to races Removed the leaked go-routines, and expanded 'auth' package to be covered we leaked go-routines detection. --- auth/main_test.go | 15 +++++++++++++++ auth/simple_token.go | 1 + auth/simple_token_test.go | 1 + auth/store_test.go | 22 +++++++++++++--------- client/integration/main_test.go | 7 +------ clientv3/integration/main_test.go | 7 +------ integration/main_test.go | 7 +------ pkg/testutil/leak.go | 29 +++++++++++++++++------------ 8 files changed, 50 insertions(+), 39 deletions(-) create mode 100644 auth/main_test.go diff --git a/auth/main_test.go b/auth/main_test.go new file mode 100644 index 000000000..442c18ed6 --- /dev/null +++ b/auth/main_test.go @@ -0,0 +1,15 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package auth + +import ( + "testing" + + "go.etcd.io/etcd/v3/pkg/testutil" +) + +func TestMain(m *testing.M) { + testutil.MustTestMainWithLeakDetection(m) +} diff --git a/auth/simple_token.go b/auth/simple_token.go index 24871b0ee..7b1b094ae 100644 --- a/auth/simple_token.go +++ b/auth/simple_token.go @@ -36,6 +36,7 @@ const ( ) // var for testing purposes +// TODO: Remove this mutable global state - as it's race-prone. var ( simpleTokenTTLDefault = 300 * time.Second simpleTokenTTLResolution = 1 * time.Second diff --git a/auth/simple_token_test.go b/auth/simple_token_test.go index bc6c0f0eb..1bea56961 100644 --- a/auth/simple_token_test.go +++ b/auth/simple_token_test.go @@ -50,6 +50,7 @@ func TestSimpleTokenDisabled(t *testing.T) { func TestSimpleTokenAssign(t *testing.T) { tp := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter, simpleTokenTTLDefault) tp.enable() + defer tp.disable() ctx := context.WithValue(context.WithValue(context.TODO(), AuthenticateParamIndex{}, uint64(1)), AuthenticateParamSimpleTokenPrefix{}, "dummy") token, err := tp.assign(ctx, "user1", 0) if err != nil { diff --git a/auth/store_test.go b/auth/store_test.go index c620cd4c1..760126f23 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -64,10 +64,10 @@ func TestNewAuthStoreRevision(t *testing.T) { // no changes to commit b2 := backend.NewDefaultBackend(tPath) + defer b2.Close() as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost) + defer as.Close() new := as.Revision() - as.Close() - b2.Close() if old != new { t.Fatalf("expected revision %d, got %d", old, new) @@ -77,6 +77,7 @@ func TestNewAuthStoreRevision(t *testing.T) { // TestNewAuthStoreBryptCost ensures that NewAuthStore uses default when given bcrypt-cost is invalid func TestNewAuthStoreBcryptCost(t *testing.T) { b, tPath := backend.NewDefaultTmpBackend() + defer b.Close() defer os.Remove(tPath) tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault) @@ -87,13 +88,11 @@ func TestNewAuthStoreBcryptCost(t *testing.T) { invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1} for _, invalidCost := range invalidCosts { as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost) + defer as.Close() if as.BcryptCost() != bcrypt.DefaultCost { t.Fatalf("expected DefaultCost when bcryptcost is invalid") } - as.Close() } - - b.Close() } func encodePassword(s string) string { @@ -175,6 +174,7 @@ func TestUserAdd(t *testing.T) { func TestRecover(t *testing.T) { as, tearDown := setupAuthStore(t) + defer as.Close() defer tearDown(t) as.enabled = false @@ -654,6 +654,7 @@ func TestIsAuthEnabled(t *testing.T) { // TestAuthRevisionRace ensures that access to authStore.revision is thread-safe. func TestAuthInfoFromCtxRace(t *testing.T) { b, tPath := backend.NewDefaultTmpBackend() + defer b.Close() defer os.Remove(tPath) tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault) @@ -709,7 +710,8 @@ func TestIsAdminPermitted(t *testing.T) { } func TestRecoverFromSnapshot(t *testing.T) { - as, _ := setupAuthStore(t) + as, teardown := setupAuthStore(t) + defer teardown(t) ua := &pb.AuthUserAddRequest{Name: "foo", Options: &authpb.UserAddOptions{NoPassword: false}} _, err := as.UserAdd(ua) // add an existing user @@ -733,9 +735,7 @@ func TestRecoverFromSnapshot(t *testing.T) { t.Fatal(err) } as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost) - defer func(a *authStore) { - a.Close() - }(as2) + defer as2.Close() if !as2.IsAuthEnabled() { t.Fatal("recovering authStore from existing backend failed") @@ -808,13 +808,16 @@ func TestHammerSimpleAuthenticate(t *testing.T) { // TestRolesOrder tests authpb.User.Roles is sorted func TestRolesOrder(t *testing.T) { b, tPath := backend.NewDefaultTmpBackend() + defer b.Close() defer os.Remove(tPath) tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault) + defer tp.disable() if err != nil { t.Fatal(err) } as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + defer as.Close() err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -863,6 +866,7 @@ func TestAuthInfoFromCtxWithRootJWT(t *testing.T) { // testAuthInfoFromCtxWithRoot ensures "WithRoot" properly embeds token in the context. func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) { b, tPath := backend.NewDefaultTmpBackend() + defer b.Close() defer os.Remove(tPath) tp, err := NewTokenProvider(zap.NewExample(), opts, dummyIndexWaiter, simpleTokenTTLDefault) diff --git a/client/integration/main_test.go b/client/integration/main_test.go index 4dec98da4..0e2151345 100644 --- a/client/integration/main_test.go +++ b/client/integration/main_test.go @@ -5,16 +5,11 @@ package integration import ( - "os" "testing" "go.etcd.io/etcd/v3/pkg/testutil" ) func TestMain(m *testing.M) { - v := m.Run() - if v == 0 && testutil.CheckLeakedGoroutine() { - os.Exit(1) - } - os.Exit(v) + testutil.MustTestMainWithLeakDetection(m) } diff --git a/clientv3/integration/main_test.go b/clientv3/integration/main_test.go index 4dec98da4..0e2151345 100644 --- a/clientv3/integration/main_test.go +++ b/clientv3/integration/main_test.go @@ -5,16 +5,11 @@ package integration import ( - "os" "testing" "go.etcd.io/etcd/v3/pkg/testutil" ) func TestMain(m *testing.M) { - v := m.Run() - if v == 0 && testutil.CheckLeakedGoroutine() { - os.Exit(1) - } - os.Exit(v) + testutil.MustTestMainWithLeakDetection(m) } diff --git a/integration/main_test.go b/integration/main_test.go index 4dec98da4..0e2151345 100644 --- a/integration/main_test.go +++ b/integration/main_test.go @@ -5,16 +5,11 @@ package integration import ( - "os" "testing" "go.etcd.io/etcd/v3/pkg/testutil" ) func TestMain(m *testing.M) { - v := m.Run() - if v == 0 && testutil.CheckLeakedGoroutine() { - os.Exit(1) - } - os.Exit(v) + testutil.MustTestMainWithLeakDetection(m) } diff --git a/pkg/testutil/leak.go b/pkg/testutil/leak.go index 546ab05da..51b119f22 100644 --- a/pkg/testutil/leak.go +++ b/pkg/testutil/leak.go @@ -24,11 +24,7 @@ running(leaking) after all tests. import "go.etcd.io/etcd/v3/pkg/testutil" func TestMain(m *testing.M) { - v := m.Run() - if v == 0 && testutil.CheckLeakedGoroutine() { - os.Exit(1) - } - os.Exit(v) + testutil.MustTestMainWithLeakDetection(m) } func TestSample(t *testing.T) { @@ -38,10 +34,6 @@ running(leaking) after all tests. */ func CheckLeakedGoroutine() bool { - if testing.Short() { - // not counting goroutines for leakage in -short mode - return false - } gs := interestingGoroutines() if len(gs) == 0 { return false @@ -66,9 +58,6 @@ func CheckLeakedGoroutine() bool { // Waits for go-routines shutdown for 'd'. func CheckAfterTest(d time.Duration) error { http.DefaultTransport.(*http.Transport).CloseIdleConnections() - if testing.Short() { - return nil - } var bad string badSubstring := map[string]string{ ").writeLoop(": "a Transport", @@ -140,3 +129,19 @@ func interestingGoroutines() (gs []string) { sort.Strings(gs) return gs } + +// MustTestMainWithLeakDetection expands standard m.Run with leaked +// goroutines detection. +func MustTestMainWithLeakDetection(m *testing.M) { + v := m.Run() + + http.DefaultTransport.(*http.Transport).CloseIdleConnections() + + // Let the other goroutines finalize. + runtime.Gosched() + + if v == 0 && CheckLeakedGoroutine() { + os.Exit(1) + } + os.Exit(v) +} From 220f711a2a6cd07b9fa8d9273f9e186bd8578b8f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 1 Oct 2020 17:56:28 +0200 Subject: [PATCH 3/5] clientv3/integration: Fix leaked goroutine in case of skipped test. --- clientv3/integration/metrics_test.go | 29 ++++++++++--------- etcdserver/api/v3rpc/interceptor.go | 6 ++-- etcdserver/corrupt.go | 2 +- etcdserver/server.go | 43 ++++++++++++++++------------ 4 files changed, 44 insertions(+), 36 deletions(-) diff --git a/clientv3/integration/metrics_test.go b/clientv3/integration/metrics_test.go index 1065a1ced..6ef833f20 100644 --- a/clientv3/integration/metrics_test.go +++ b/clientv3/integration/metrics_test.go @@ -43,21 +43,27 @@ func TestV3ClientMetrics(t *testing.T) { ln net.Listener ) - // listen for all Prometheus metrics + srv := &http.Server{Handler: promhttp.Handler()} + srv.SetKeepAlivesEnabled(false) + + ln, err := transport.NewUnixListener(addr) + if err != nil { + t.Errorf("Error: %v occurred while listening on addr: %v", err, addr) + } + donec := make(chan struct{}) + defer func() { + ln.Close() + <-donec + }() + + // listen for all Prometheus metrics + go func() { var err error defer close(donec) - srv := &http.Server{Handler: promhttp.Handler()} - srv.SetKeepAlivesEnabled(false) - - ln, err = transport.NewUnixListener(addr) - if err != nil { - t.Errorf("Error: %v occurred while listening on addr: %v", err, addr) - } - err = srv.Serve(ln) if err != nil && !transport.IsClosedConnError(err) { t.Errorf("Err serving http requests: %v", err) @@ -88,7 +94,7 @@ func TestV3ClientMetrics(t *testing.T) { pBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary") - _, err := cli.Put(context.Background(), "foo", "bar") + _, err = cli.Put(context.Background(), "foo", "bar") if err != nil { t.Errorf("Error putting value in key store") } @@ -109,9 +115,6 @@ func TestV3ClientMetrics(t *testing.T) { if wBefore+1 != wAfter { t.Errorf("grpc_client_msg_received_total expected %d, got %d", 1, wAfter-wBefore) } - - ln.Close() - <-donec } func sumCountersForMetricAndLabels(t *testing.T, url string, metricName string, matchingLabelValues ...string) int { diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 45ea5d734..fb0328166 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -264,13 +264,13 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap { streams: make(map[grpc.ServerStream]struct{}), } - go func() { + s.GoAttach(func() { election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond noLeaderCnt := 0 for { select { - case <-s.StopNotify(): + case <-s.StoppingNotify(): return case <-time.After(election): if s.Leader() == types.ID(raft.None) { @@ -295,7 +295,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap { } } } - }() + }) return smap } diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 45aa45357..5e68c4bd0 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -180,7 +180,7 @@ func (s *EtcdServer) checkHashKV() error { Action: pb.AlarmRequest_ACTIVATE, Alarm: pb.AlarmType_CORRUPT, } - s.goAttach(func() { + s.GoAttach(func() { s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) }) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 1d04c721b..6e04a14d8 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -698,13 +698,13 @@ func (s *EtcdServer) adjustTicks() { // should be implemented in goroutines. func (s *EtcdServer) Start() { s.start() - s.goAttach(func() { s.adjustTicks() }) - s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) - s.goAttach(s.purgeFile) - s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) - s.goAttach(s.monitorVersions) - s.goAttach(s.linearizableReadLoop) - s.goAttach(s.monitorKVHash) + s.GoAttach(func() { s.adjustTicks() }) + s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) + s.GoAttach(s.purgeFile) + s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) + s.GoAttach(s.monitorVersions) + s.GoAttach(s.linearizableReadLoop) + s.GoAttach(s.monitorKVHash) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -939,7 +939,7 @@ func (s *EtcdServer) run() { } defer func() { - s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping + s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping close(s.stopping) s.wgMu.Unlock() s.cancel() @@ -986,7 +986,7 @@ func (s *EtcdServer) run() { f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: - s.goAttach(func() { + s.GoAttach(func() { // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, lease := range leases { @@ -996,7 +996,7 @@ func (s *EtcdServer) run() { return } lid := lease.ID - s.goAttach(func() { + s.GoAttach(func() { ctx := s.authStore.WithRoot(s.ctx) _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) if lerr == nil { @@ -1347,6 +1347,10 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { // when the server is stopped. func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } +// StoppingNotify returns a channel that receives a empty struct +// when the server is being stopped. +func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping } + func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) LeaderStats() []byte { @@ -1767,7 +1771,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { // There is no promise that node has leader when do SYNC request, // so it uses goroutine to propose. ctx, cancel := context.WithTimeout(s.ctx, timeout) - s.goAttach(func() { + s.GoAttach(func() { s.r.Propose(ctx, data) cancel() }) @@ -1908,7 +1912,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { s.r.transport.SendSnapshot(merged) lg.Info("sending merged snapshot", fields...) - s.goAttach(func() { + s.GoAttach(func() { select { case ok := <-merged.CloseNotify(): // delay releasing inflight snapshot for another 30 seconds to @@ -2051,7 +2055,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { zap.Error(ar.err), ) - s.goAttach(func() { + s.GoAttach(func() { a := &pb.AlarmRequest{ MemberID: uint64(s.ID()), Action: pb.AlarmRequest_ACTIVATE, @@ -2144,7 +2148,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // the go routine created below. s.KV().Commit() - s.goAttach(func() { + s.GoAttach(func() { lg := s.getLogger() d, err := clone.SaveNoCopy() @@ -2268,12 +2272,12 @@ func (s *EtcdServer) monitorVersions() { if v != nil { verStr = v.String() } - s.goAttach(func() { s.updateClusterVersion(verStr) }) + s.GoAttach(func() { s.updateClusterVersion(verStr) }) continue } if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) { - s.goAttach(func() { s.updateClusterVersion(v.String()) }) + s.GoAttach(func() { s.updateClusterVersion(v.String()) }) } } } @@ -2372,15 +2376,16 @@ func (s *EtcdServer) restoreAlarms() error { return nil } -// goAttach creates a goroutine on a given function and tracks it using +// GoAttach creates a goroutine on a given function and tracks it using // the etcdserver waitgroup. -func (s *EtcdServer) goAttach(f func()) { +// The passed function should interrupt on s.StoppingNotify(). +func (s *EtcdServer) GoAttach(f func()) { s.wgMu.RLock() // this blocks with ongoing close(s.stopping) defer s.wgMu.RUnlock() select { case <-s.stopping: lg := s.getLogger() - lg.Warn("server has stopped; skipping goAttach") + lg.Warn("server has stopped; skipping GoAttach") return default: } From 98b123f034a7dcdc33d2620fc07c4f5ff11a540c Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 2 Oct 2020 19:36:18 +0200 Subject: [PATCH 4/5] mvcc: Fix races between metrics gathering and mvcc.Restore The races was manifesting as following flakes: ``` ``` See: https://github.com/etcd-io/etcd/issues/12336 I'm taking the locks for short-duration of time (instead of the whole duriation of Restore) to allow metrics being gather when the server restoration is in progress. ``` {"level":"warn","ts":"2020-09-26T13:33:13.010Z","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-c9c21e47-2013-4776-8e83-e331b2caa9ae/localhost:14422410081761184170","attempt":0,"error":"rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial unix localhost:14422410081761184170: connect: no such file or directory\""} {"level":"warn","ts":"2020-09-26T13:33:13.011Z","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-c9c21e47-2013-4776-8e83-e331b2caa9ae/localhost:14422410081761184170","attempt":0,"error":"rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial unix localhost:14422410081761184170: connect: no such file or directory\""} {"level":"warn","ts":"2020-09-26T13:33:16.285Z","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-b504e954-e000-42a4-aa4f-70ded8dbef39/localhost:55672762955698614610","attempt":0,"error":"rpc error: code = NotFound desc = etcdserver: requested lease not found"} {"level":"warn","ts":"2020-09-26T13:33:21.434Z","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-7945004b-f67e-42aa-af11-a7b40fbbe6fc/localhost:49623072144007561240","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"} ================== WARNING: DATA RACE Write at 0x00c000905f78 by goroutine 764: go.etcd.io/etcd/v3/mvcc.(*store).restore() /go/src/go.etcd.io/etcd/mvcc/kvstore.go:397 +0x773 go.etcd.io/etcd/v3/mvcc.(*store).Restore() /go/src/go.etcd.io/etcd/mvcc/kvstore.go:343 +0x5f1 go.etcd.io/etcd/v3/mvcc.(*watchableStore).Restore() /go/src/go.etcd.io/etcd/mvcc/watchable_store.go:199 +0xe2 go.etcd.io/etcd/v3/etcdserver.(*EtcdServer).applySnapshot() /go/src/go.etcd.io/etcd/etcdserver/server.go:1107 +0xa49 go.etcd.io/etcd/v3/etcdserver.(*EtcdServer).applyAll() /go/src/go.etcd.io/etcd/etcdserver/server.go:1031 +0x6d go.etcd.io/etcd/v3/etcdserver.(*EtcdServer).run.func8() /go/src/go.etcd.io/etcd/etcdserver/server.go:986 +0x53 go.etcd.io/etcd/v3/pkg/schedule.(*fifo).run() /go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:157 +0x11e Previous read at 0x00c000905f78 by goroutine 180: [failed to restore the stack] Goroutine 764 (running) created at: go.etcd.io/etcd/v3/pkg/schedule.NewFIFOScheduler() /go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:70 +0x2b1 go.etcd.io/etcd/v3/etcdserver.(*EtcdServer).run() /go/src/go.etcd.io/etcd/etcdserver/server.go:871 +0x32c Goroutine 180 (running) created at: net/http.(*Server).Serve() /usr/local/go/src/net/http/server.go:2933 +0x5b6 net/http/httptest.(*Server).goServe.func1() /usr/local/go/src/net/http/httptest/server.go:308 +0xd3 ================== --- FAIL: TestV3WatchRestoreSnapshotUnsync (6.74s) testing.go:906: race detected during execution of test FAIL coverage: 83.5% of statements FAIL go.etcd.io/etcd/v3/integration 231.272s FAIL Command 'go test -timeout=30m -cpu=1 --race --cover=true go.etcd.io/etcd/v3/integration' failed. ``` --- mvcc/kvstore.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index ba3854615..6bafb270a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -333,8 +333,15 @@ func (s *store) Restore(b backend.Backend) error { s.b = b s.kvindex = newTreeIndex(s.lg) - s.currentRev = 1 - s.compactMainRev = -1 + + { + // During restore the metrics might report 'special' values + s.revMu.Lock() + s.currentRev = 1 + s.compactMainRev = -1 + s.revMu.Unlock() + } + s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) s.ci.SetBatchTx(b.BatchTx()) @@ -358,6 +365,7 @@ func (s *store) restore() error { _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { + s.revMu.Lock() s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main s.lg.Info( @@ -366,6 +374,7 @@ func (s *store) restore() error { zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), zap.Int64("restored-compact-revision", s.compactMainRev), ) + s.revMu.Unlock() } _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) @@ -394,14 +403,20 @@ func (s *store) restore() error { revToBytes(newMin, min) } close(rkvc) - s.currentRev = <-revc - // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. - // the correct revision should be set to compaction revision in the case, not the largest revision - // we have seen. - if s.currentRev < s.compactMainRev { - s.currentRev = s.compactMainRev + { + s.revMu.Lock() + s.currentRev = <-revc + + // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. + // the correct revision should be set to compaction revision in the case, not the largest revision + // we have seen. + if s.currentRev < s.compactMainRev { + s.currentRev = s.compactMainRev + } + s.revMu.Unlock() } + if scheduledCompact <= s.compactMainRev { scheduledCompact = 0 } From 97820f1c6e790c48887f0a7fb3b2159a959dca76 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 2 Oct 2020 20:35:51 +0200 Subject: [PATCH 5/5] integration: Fix flakes of TestV3WatchRestoreSnapshotUnsync ``` ``` The flakes manifested as: ``` --- FAIL: TestV3WatchRestoreSnapshotUnsync (3.59s) v3_watch_restore_test.go:82: inflight snapshot sends expected 0 or 1, got "" FAIL coverage: 55.2% of statements FAIL go.etcd.io/etcd/v3/integration 3.646s FAIL ``` The root reason is that all the SnapMsg processing happends on both ends (leader, follower) assynchronously in goroutines, e.g. on Fifo schedule within EtcdServer.run, so when we observe through metrics, we don't know whether it finised (or even got started). Idally we should have EtcdServer.Drain() call that exits when the server processed or internal 'queues' and is idle. --- integration/cluster.go | 4 ++- integration/v3_watch_restore_test.go | 53 ++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/integration/cluster.go b/integration/cluster.go index 45d356144..e29f66616 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -429,9 +429,11 @@ func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) { } } +// WaitLeader returns index of the member in c.Members that is leader (or -1). func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) } -// waitLeader waits until given members agree on the same leader. +// waitLeader waits until given members agree on the same leader, +// and returns its 'index' in the 'membs' list (or -1). func (c *cluster) waitLeader(t testing.TB, membs []*member) int { possibleLead := make(map[uint64]bool) var lead uint64 diff --git a/integration/v3_watch_restore_test.go b/integration/v3_watch_restore_test.go index 16b1f906b..f2f64fae9 100644 --- a/integration/v3_watch_restore_test.go +++ b/integration/v3_watch_restore_test.go @@ -23,6 +23,28 @@ import ( pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" ) +// MustFetchNotEmptyMetric attempts to fetch given 'metric' from 'member', +// waiting for not-empty value or 'timeout'. +func MustFetchNotEmptyMetric(tb testing.TB, member *member, metric string, timeout <-chan time.Time) string { + metricValue := "" + tick := time.Tick(tickDuration) + for metricValue == "" { + tb.Logf("Waiting for metric: %v", metric) + select { + case <-timeout: + tb.Fatalf("Failed to fetch metric %v", metric) + return "" + case <-tick: + var err error + metricValue, err = member.Metric(metric) + if err != nil { + tb.Fatal(err) + } + } + } + return metricValue +} + // TestV3WatchRestoreSnapshotUnsync tests whether slow follower can restore // from leader snapshot, and still notify on watchers from an old revision // that were created in synced watcher group in the first place. @@ -55,8 +77,11 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { } clus.Members[0].InjectPartition(t, clus.Members[1:]...) - clus.waitLeader(t, clus.Members[1:]) + initialLead := clus.waitLeader(t, clus.Members[1:]) + t.Logf("elected lead: %v", clus.Members[initialLead].s.ID()) + t.Logf("sleeping for 2 seconds") time.Sleep(2 * time.Second) + t.Logf("sleeping for 2 seconds DONE") kvc := toGRPC(clus.Client(1)).KV @@ -71,26 +96,32 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // trigger snapshot send from leader to this slow follower // which then calls watchable store Restore clus.Members[0].RecoverPartition(t, clus.Members[1:]...) + // We don't expect leadership change here, just recompute the leader's index + // within clus.Members list. lead := clus.WaitLeader(t) - sends, err := clus.Members[lead].Metric("etcd_network_snapshot_send_inflights_total") - if err != nil { - t.Fatal(err) - } - if sends != "0" && sends != "1" { + // Sending is scheduled on fifo 'sched' within EtcdServer::run, + // so it can start delayed after recovery. + send := MustFetchNotEmptyMetric(t, clus.Members[lead], + "etcd_network_snapshot_send_inflights_total", + time.After(5*time.Second)) + + if send != "0" && send != "1" { // 0 if already sent, 1 if sending - t.Fatalf("inflight snapshot sends expected 0 or 1, got %q", sends) - } - receives, err := clus.Members[(lead+1)%3].Metric("etcd_network_snapshot_receive_inflights_total") - if err != nil { - t.Fatal(err) + t.Fatalf("inflight snapshot snapshot_send_inflights_total expected 0 or 1, got %q", send) } + + receives := MustFetchNotEmptyMetric(t, clus.Members[(lead+1)%3], + "etcd_network_snapshot_receive_inflights_total", + time.After(5*time.Second)) if receives != "0" && receives != "1" { // 0 if already received, 1 if receiving t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives) } + t.Logf("sleeping for 2 seconds") time.Sleep(2 * time.Second) + t.Logf("sleeping for 2 seconds DONE") // slow follower now applies leader snapshot // should be able to notify on old-revision watchers in unsynced