From 1e8d66ef95dc5c7a805a0d55612ee36f771ca81c Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 20 Dec 2023 13:43:01 +0100 Subject: [PATCH 1/3] Add beforeOpenSnapshotBackend failpoint Signed-off-by: Marek Siarkowicz --- server/etcdserver/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1feb8bc37..00c385674 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1001,6 +1001,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { // wait for raftNode to persist snapshot onto the disk <-toApply.notifyc + // gofail: var beforeOpenSnapshotBackend struct{} newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) From d39d86a21499b6c5ee8ca2d3f35c32d3aa1d9c6b Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 20 Dec 2023 16:26:27 +0100 Subject: [PATCH 2/3] Improve logs around recovering from snapshot backend Signed-off-by: Marek Siarkowicz --- server/etcdserver/bootstrap.go | 16 ++++++++-------- server/storage/backend.go | 2 ++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index b9948f30e..7a0c09cc2 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -227,6 +227,14 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s } } if beExist { + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) if err = schema.Validate(cfg.Logger, be.ReadTx()); err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) return nil, err @@ -414,14 +422,6 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe // already been closed in this case, so we should set the backend again. ci.SetBackend(be) - s1, s2 := be.Size(), be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. diff --git a/server/storage/backend.go b/server/storage/backend.go index 9f518f11c..1137d8389 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -105,8 +105,10 @@ func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { + cfg.Logger.Info("Skipping snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index)) return oldbe, nil } + cfg.Logger.Info("Recovering from snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index)) oldbe.Close() return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } From 3471ef133daed5b8e7608fe66da33ef9bbdab5e6 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 20 Dec 2023 15:32:55 +0100 Subject: [PATCH 3/3] Add an e2e test and robustness failpoint around recovering from snapshot backend Signed-off-by: Marek Siarkowicz --- server/etcdserver/server.go | 2 +- tests/e2e/leader_snapshot_no_proxy_test.go | 98 ++++++++++++++++++++++ tests/robustness/failpoint/failpoint.go | 1 + tests/robustness/failpoint/gofail.go | 1 + tests/robustness/failpoint/network.go | 4 +- 5 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 tests/e2e/leader_snapshot_no_proxy_test.go diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 00c385674..3f75a9899 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1001,7 +1001,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { // wait for raftNode to persist snapshot onto the disk <-toApply.notifyc - // gofail: var beforeOpenSnapshotBackend struct{} + // gofail: var applyBeforeOpenSnapshot struct{} newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) diff --git a/tests/e2e/leader_snapshot_no_proxy_test.go b/tests/e2e/leader_snapshot_no_proxy_test.go new file mode 100644 index 000000000..1dffbe956 --- /dev/null +++ b/tests/e2e/leader_snapshot_no_proxy_test.go @@ -0,0 +1,98 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/failpoint" +) + +func TestRecoverSnapshotBackend(t *testing.T) { + e2e.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + epc, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithClusterSize(3), + e2e.WithKeepDataDir(true), + e2e.WithPeerProxy(true), + e2e.WithSnapshotCatchUpEntries(50), + e2e.WithSnapshotCount(50), + e2e.WithGoFailEnabled(true), + e2e.WithIsPeerTLS(true), + ) + require.NoError(t, err) + + defer epc.Close() + + blackholedMember := epc.Procs[0] + otherMember := epc.Procs[1] + + wg := sync.WaitGroup{} + + trafficCtx, trafficCancel := context.WithCancel(ctx) + c, err := clientv3.New(clientv3.Config{ + Endpoints: otherMember.EndpointsGRPC(), + Logger: zap.NewNop(), + DialKeepAliveTime: 10 * time.Second, + DialKeepAliveTimeout: 100 * time.Millisecond, + }) + require.NoError(t, err) + defer c.Close() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-trafficCtx.Done(): + return + default: + } + putCtx, putCancel := context.WithTimeout(trafficCtx, 50*time.Millisecond) + c.Put(putCtx, "a", "b") + putCancel() + time.Sleep(10 * time.Millisecond) + } + }() + + err = blackholedMember.Failpoints().SetupHTTP(ctx, "applyBeforeOpenSnapshot", "panic") + require.NoError(t, err) + err = failpoint.Blackhole(ctx, t, blackholedMember, epc, true) + require.NoError(t, err) + err = blackholedMember.Wait(ctx) + require.NoError(t, err) + trafficCancel() + wg.Wait() + err = blackholedMember.Start(ctx) + require.NoError(t, err) + _, err = blackholedMember.Logs().ExpectWithContext(ctx, expect.ExpectedResponse{Value: "Recovering from snapshot backend"}) + assert.NoError(t, err) + err = blackholedMember.Etcdctl().Put(ctx, "a", "1", config.PutOptions{}) + assert.NoError(t, err) +} diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index de5bfe53e..12a72b69d 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -50,6 +50,7 @@ var ( DropPeerNetwork, RaftBeforeSaveSleep, RaftAfterSaveSleep, + ApplyBeforeOpenSnapshot, } ) diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 7c12945ef..3d90c5ddd 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -53,6 +53,7 @@ var ( RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackhole{waitTillSnapshot: true}, Follower} RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower} RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower} + ApplyBeforeOpenSnapshot Failpoint = goPanicFailpoint{"applyBeforeOpenSnapshot", triggerBlackhole{waitTillSnapshot: true}, Follower} BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second} RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second} RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second} diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index a765c427e..5d59fba3d 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -51,7 +51,7 @@ type triggerBlackhole struct { } func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - return blackhole(ctx, t, member, clus, tb.waitTillSnapshot) + return Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) } func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { @@ -61,7 +61,7 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces return config.ClusterSize > 1 && process.PeerProxy() != nil } -func blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { +func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { proxy := member.PeerProxy() // Blackholing will cause peers to not be able to use streamWriters registered with member