From f53b70facb261b1b00665aa63e650cefce3df62b Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 29 Apr 2021 01:03:36 +0200 Subject: [PATCH] Embed: In case KVStoreHash verification fails, close the backend. In case of failed verification, the server used to keep opened backend (so the file was locked on OS level). --- client/pkg/testutil/leak.go | 8 +++++-- client/pkg/testutil/testutil.go | 3 ++- server/embed/etcd.go | 3 +++ server/etcdserver/server.go | 42 +++++++++++++++++++-------------- tests/e2e/ctl_v3_test.go | 1 + tests/e2e/etcd_corrupt_test.go | 17 +++++++++---- 6 files changed, 48 insertions(+), 26 deletions(-) diff --git a/client/pkg/testutil/leak.go b/client/pkg/testutil/leak.go index 2bc13ab44..f786b5ccd 100644 --- a/client/pkg/testutil/leak.go +++ b/client/pkg/testutil/leak.go @@ -110,8 +110,12 @@ func BeforeTest(t TB) { // It will detect common goroutine leaks, retrying in case there are goroutines // not synchronously torn down, and fail the test if any goroutines are stuck. func AfterTest(t TB) { - if err := CheckAfterTest(1 * time.Second); err != nil { - t.Errorf("Test %v", err) + // If test-failed the leaked goroutines list is hidding the real + // source of problem. + if !t.Failed() { + if err := CheckAfterTest(1 * time.Second); err != nil { + t.Errorf("Test %v", err) + } } } diff --git a/client/pkg/testutil/testutil.go b/client/pkg/testutil/testutil.go index 3eb94a328..6dc55d0df 100644 --- a/client/pkg/testutil/testutil.go +++ b/client/pkg/testutil/testutil.go @@ -53,8 +53,9 @@ func MustNewURL(t *testing.T, s string) *url.URL { func FatalStack(t *testing.T, s string) { stackTrace := make([]byte, 1024*1024) n := runtime.Stack(stackTrace, true) + t.Errorf("---> Test failed: %s", s) t.Error(string(stackTrace[:n])) - t.Fatalf(s) + t.Fatal(s) } // ConditionFunc returns true when a condition is met. diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 7dcdbc56d..2ccff9797 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -229,6 +229,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if err = e.Server.CheckInitialHashKV(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) + + e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err)) + e.Server.Cleanup() e.Server = nil return e, err } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index c0565e3aa..373506386 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1028,7 +1028,6 @@ func (s *EtcdServer) run() { close(s.stopping) s.wgMu.Unlock() s.cancel() - sched.Stop() // wait for gouroutines before closing raft so wal stays open @@ -1040,23 +1039,8 @@ func (s *EtcdServer) run() { // by adding a peer after raft stops the transport s.r.stop() - // kv, lessor and backend can be nil if running without v3 enabled - // or running unit tests. - if s.lessor != nil { - s.lessor.Stop() - } - if s.kv != nil { - s.kv.Close() - } - if s.authStore != nil { - s.authStore.Close() - } - if s.be != nil { - s.be.Close() - } - if s.compactor != nil { - s.compactor.Stop() - } + s.Cleanup() + close(s.done) }() @@ -1112,6 +1096,28 @@ func (s *EtcdServer) run() { } } +// Cleanup removes allocated objects by EtcdServer.NewServer in +// situation that EtcdServer::Start was not called (that takes care of cleanup). +func (s *EtcdServer) Cleanup() { + // kv, lessor and backend can be nil if running without v3 enabled + // or running unit tests. + if s.lessor != nil { + s.lessor.Stop() + } + if s.kv != nil { + s.kv.Close() + } + if s.authStore != nil { + s.authStore.Close() + } + if s.be != nil { + s.be.Close() + } + if s.compactor != nil { + s.compactor.Stop() + } +} + func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index e7efb0a44..d1743e088 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -250,6 +250,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) case <-donec: } + t.Log("---Test logic DONE") } func (cx *ctlCtx) prefixArgs(eps []string) []string { diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go index 84daab668..edc95c010 100644 --- a/tests/e2e/etcd_corrupt_test.go +++ b/tests/e2e/etcd_corrupt_test.go @@ -19,14 +19,13 @@ import ( "errors" "fmt" "os" - "path/filepath" "testing" "time" + bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" - - bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/datadir" ) // TODO: test with embedded etcd in integration package @@ -49,6 +48,7 @@ func TestEtcdCorruptHash(t *testing.T) { } func corruptTest(cx ctlCtx) { + cx.t.Log("putting 10 keys...") for i := 0; i < 10; i++ { if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { if cx.dialTimeout > 0 && !isGRPCTimedout(err) { @@ -57,8 +57,10 @@ func corruptTest(cx ctlCtx) { } } // enough time for all nodes sync on the same data + cx.t.Log("sleeping 3sec to let nodes sync...") time.Sleep(3 * time.Second) + cx.t.Log("connecting clientv3...") eps := cx.epc.EndpointsV3() cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) if err != nil { @@ -67,19 +69,23 @@ func corruptTest(cx ctlCtx) { defer cli1.Close() sresp, err := cli1.Status(context.TODO(), eps[0]) + cx.t.Logf("checked status sresp:%v err:%v", sresp, err) if err != nil { cx.t.Fatal(err) } id0 := sresp.Header.GetMemberId() + cx.t.Log("stopping etcd[0]...") cx.epc.procs[0].Stop() - // corrupt first member by modifying backend offline. - fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db") + // corrupting first member by modifying backend offline. + fp := datadir.ToBackendFileName(cx.epc.procs[0].Config().dataDirPath) + cx.t.Logf("corrupting backend: %v", fp) if err = cx.corruptFunc(fp); err != nil { cx.t.Fatal(err) } + cx.t.Log("restarting etcd[0]") ep := cx.epc.procs[0] proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...)) if err != nil { @@ -87,6 +93,7 @@ func corruptTest(cx ctlCtx) { } defer proc.Stop() + cx.t.Log("waiting for etcd[0] failure...") // restarting corrupted member should fail waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) }