Embed: In case KVStoreHash verification fails, close the backend.

In case of failed verification, the server used to keep opened backend
(so the file was locked on OS level).
This commit is contained in:
Piotr Tabor 2021-04-29 01:03:36 +02:00
parent 2ad893b110
commit f53b70facb
6 changed files with 48 additions and 26 deletions

View File

@ -110,8 +110,12 @@ func BeforeTest(t TB) {
// It will detect common goroutine leaks, retrying in case there are goroutines
// not synchronously torn down, and fail the test if any goroutines are stuck.
func AfterTest(t TB) {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
// If test-failed the leaked goroutines list is hidding the real
// source of problem.
if !t.Failed() {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
}
}
}

View File

@ -53,8 +53,9 @@ func MustNewURL(t *testing.T, s string) *url.URL {
func FatalStack(t *testing.T, s string) {
stackTrace := make([]byte, 1024*1024)
n := runtime.Stack(stackTrace, true)
t.Errorf("---> Test failed: %s", s)
t.Error(string(stackTrace[:n]))
t.Fatalf(s)
t.Fatal(s)
}
// ConditionFunc returns true when a condition is met.

View File

@ -229,6 +229,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = e.Server.CheckInitialHashKV(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
e.Server.Cleanup()
e.Server = nil
return e, err
}

View File

@ -1028,7 +1028,6 @@ func (s *EtcdServer) run() {
close(s.stopping)
s.wgMu.Unlock()
s.cancel()
sched.Stop()
// wait for gouroutines before closing raft so wal stays open
@ -1040,23 +1039,8 @@ func (s *EtcdServer) run() {
// by adding a peer after raft stops the transport
s.r.stop()
// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
if s.lessor != nil {
s.lessor.Stop()
}
if s.kv != nil {
s.kv.Close()
}
if s.authStore != nil {
s.authStore.Close()
}
if s.be != nil {
s.be.Close()
}
if s.compactor != nil {
s.compactor.Stop()
}
s.Cleanup()
close(s.done)
}()
@ -1112,6 +1096,28 @@ func (s *EtcdServer) run() {
}
}
// Cleanup removes allocated objects by EtcdServer.NewServer in
// situation that EtcdServer::Start was not called (that takes care of cleanup).
func (s *EtcdServer) Cleanup() {
// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
if s.lessor != nil {
s.lessor.Stop()
}
if s.kv != nil {
s.kv.Close()
}
if s.authStore != nil {
s.authStore.Close()
}
if s.be != nil {
s.be.Close()
}
if s.compactor != nil {
s.compactor.Stop()
}
}
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)

View File

@ -250,6 +250,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
t.Log("---Test logic DONE")
}
func (cx *ctlCtx) prefixArgs(eps []string) []string {

View File

@ -19,14 +19,13 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/datadir"
)
// TODO: test with embedded etcd in integration package
@ -49,6 +48,7 @@ func TestEtcdCorruptHash(t *testing.T) {
}
func corruptTest(cx ctlCtx) {
cx.t.Log("putting 10 keys...")
for i := 0; i < 10; i++ {
if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
@ -57,8 +57,10 @@ func corruptTest(cx ctlCtx) {
}
}
// enough time for all nodes sync on the same data
cx.t.Log("sleeping 3sec to let nodes sync...")
time.Sleep(3 * time.Second)
cx.t.Log("connecting clientv3...")
eps := cx.epc.EndpointsV3()
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
if err != nil {
@ -67,19 +69,23 @@ func corruptTest(cx ctlCtx) {
defer cli1.Close()
sresp, err := cli1.Status(context.TODO(), eps[0])
cx.t.Logf("checked status sresp:%v err:%v", sresp, err)
if err != nil {
cx.t.Fatal(err)
}
id0 := sresp.Header.GetMemberId()
cx.t.Log("stopping etcd[0]...")
cx.epc.procs[0].Stop()
// corrupt first member by modifying backend offline.
fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db")
// corrupting first member by modifying backend offline.
fp := datadir.ToBackendFileName(cx.epc.procs[0].Config().dataDirPath)
cx.t.Logf("corrupting backend: %v", fp)
if err = cx.corruptFunc(fp); err != nil {
cx.t.Fatal(err)
}
cx.t.Log("restarting etcd[0]")
ep := cx.epc.procs[0]
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
if err != nil {
@ -87,6 +93,7 @@ func corruptTest(cx ctlCtx) {
}
defer proc.Stop()
cx.t.Log("waiting for etcd[0] failure...")
// restarting corrupted member should fail
waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}