From 5da9cac1931852698bf70c68535969ea05cc2238 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sun, 4 Apr 2021 15:25:10 +0200 Subject: [PATCH] embed: etcd.Close() is closing Errc() channel as well. Inspired by https://github.com/etcd-io/etcd/pull/9612 by purpleidea@. --- CHANGELOG-3.5.md | 1 + server/embed/etcd.go | 14 +++++++++++--- server/embed/serve_test.go | 3 +-- .../clientv3/snapshot/v3_snapshot_test.go | 6 ++---- tests/integration/embed/embed_test.go | 11 +++++++---- tests/integration/snapshot/v3_snapshot_test.go | 1 - 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index bf8cb510c..fd798d490 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -68,6 +68,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and - Previously supported [GRPCResolver was decomissioned](https://github.com/etcd-io/etcd/pull/12675). Use [resolver](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go) instead. - Turned on [--pre-vote by default](https://github.com/etcd-io/etcd/pull/12770). Should prevent disrupting RAFT leader by an individual member. - [ETCD_CLIENT_DEBUG env](https://github.com/etcd-io/etcd/pull/12786): Now supports log levels (debug, info, warn, error, dpanic, panic, fatal). Only when set, overrides application-wide grpc logging settings. +- [Embed Etcd.Close()](https://github.com/etcd-io/etcd/pull/12828) needs to called exactly once and closes Etcd.Err() stream. ### - Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896). diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 01d5f6916..7bf8ed935 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -341,7 +341,9 @@ func (e *Etcd) Close() { lg.Sync() }() - e.closeOnce.Do(func() { close(e.stopc) }) + e.closeOnce.Do(func() { + close(e.stopc) + }) // close client requests with request timeout timeout := 2 * time.Second @@ -383,12 +385,14 @@ func (e *Etcd) Close() { cancel() } } + if e.errc != nil { + close(e.errc) + } } func stopServers(ctx context.Context, ss *servers) { // first, close the http.Server ss.http.Shutdown(ctx) - // do not grpc.Server.GracefulStop with TLS enabled etcd server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 @@ -418,7 +422,11 @@ func stopServers(ctx context.Context, ss *servers) { } } -func (e *Etcd) Err() <-chan error { return e.errc } +// Err - return channel used to report errors during etcd run/shutdown. +// Since etcd 3.5 the channel is being closed when the etcd is over. +func (e *Etcd) Err() <-chan error { + return e.errc +} func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil { diff --git a/server/embed/serve_test.go b/server/embed/serve_test.go index b8e6b1318..aada585f0 100644 --- a/server/embed/serve_test.go +++ b/server/embed/serve_test.go @@ -26,12 +26,11 @@ import ( // TestStartEtcdWrongToken ensures that StartEtcd with wrong configs returns with error. func TestStartEtcdWrongToken(t *testing.T) { - tdir, err := ioutil.TempDir(os.TempDir(), "token-test") + tdir, err := ioutil.TempDir(t.TempDir(), "token-test") if err != nil { t.Fatal(err) } - defer os.RemoveAll(tdir) cfg := NewConfig() diff --git a/tests/integration/clientv3/snapshot/v3_snapshot_test.go b/tests/integration/clientv3/snapshot/v3_snapshot_test.go index 3f8f200eb..ade06e018 100644 --- a/tests/integration/clientv3/snapshot/v3_snapshot_test.go +++ b/tests/integration/clientv3/snapshot/v3_snapshot_test.go @@ -30,8 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/tests/v3/integration" - - "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) // TestSaveSnapshotFilePermissions ensures that the snapshot is saved with @@ -99,11 +98,10 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { } dpPath := filepath.Join(t.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) - if err = snapshot.Save(context.Background(), zap.NewExample(), ccfg, dpPath); err != nil { + if err = snapshot.Save(context.Background(), zaptest.NewLogger(t), ccfg, dpPath); err != nil { t.Fatal(err) } - srv.Close() return dpPath } diff --git a/tests/integration/embed/embed_test.go b/tests/integration/embed/embed_test.go index ac8c9291b..9f7dae7e9 100644 --- a/tests/integration/embed/embed_test.go +++ b/tests/integration/embed/embed_test.go @@ -119,8 +119,9 @@ func TestEmbedEtcd(t *testing.T) { e.Close() select { case err := <-e.Err(): - t.Errorf("#%d: unexpected error on close (%v)", i, err) - default: + if err != nil { + t.Errorf("#%d: unexpected error on close (%v)", i, err) + } } } } @@ -174,12 +175,14 @@ func testEmbedEtcdGracefulStop(t *testing.T, secure bool) { close(donec) }() select { - case err := <-e.Err(): - t.Fatal(err) case <-donec: case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()): t.Fatalf("took too long to close server") } + err = <-e.Err() + if err != nil { + t.Fatal(err) + } } func newEmbedURLs(secure bool, n int) (urls []url.URL) { diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index b4fbbd6fc..0b549bb63 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -216,7 +216,6 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { } os.RemoveAll(cfg.Dir) - srv.Close() return dpPath }