From ff3729c4d5d084504af43ba4310119cf5ef4e9ae Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 18 Aug 2021 17:43:58 +0200 Subject: [PATCH] server: Implement storage schema migration to follow cluster version change and panic if unknown storage version is found Storage version should follow cluster version. During upgrades this should be immidiate as storage version can be always upgraded as storage is backward compatible. During downgrades it will be delayed and will require time for incompatible changes to be snapshotted. As storage version change can happen long after cluster is running, we need to add a step during bootstrap to validate if loaded data can be understood by migrator. --- CHANGELOG-3.6.md | 4 + server/etcdserver/adapters.go | 43 ++++++ server/etcdserver/bootstrap.go | 17 ++- server/etcdserver/server.go | 33 +++-- server/etcdserver/version/monitor.go | 24 ++++ server/etcdserver/version/monitor_test.go | 75 ++++++++++ server/storage/schema/bucket.go | 3 +- server/storage/schema/schema.go | 39 ++++-- server/storage/schema/schema_test.go | 161 +++++++++++++++------- 9 files changed, 319 insertions(+), 80 deletions(-) diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md index 1094c2663..360f25f49 100644 --- a/CHANGELOG-3.6.md +++ b/CHANGELOG-3.6.md @@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). +### Breaking Changes + +- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command. + ### etcdctl v3 - Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133). diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index b00e721f7..a6611359c 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,6 +18,8 @@ import ( "context" "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" "go.etcd.io/etcd/api/v3/version" @@ -28,6 +30,14 @@ import ( // serverVersionAdapter implements Server interface needed by serverversion.Monitor type serverVersionAdapter struct { *EtcdServer + tx backend.BatchTx +} + +func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter { + return &serverVersionAdapter{ + EtcdServer: s, + tx: nil, + } } var _ serverversion.Server = (*serverVersionAdapter)(nil) @@ -56,3 +66,36 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions { return getVersions(s.lg, s.cluster, s.id, s.peerRt) } + +func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { + if s.tx == nil { + s.Lock() + defer s.Unlock() + } + v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx) + if err != nil { + return nil + } + return &v +} + +func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) { + if s.tx == nil { + s.Lock() + defer s.Unlock() + } + err := schema.UnsafeMigrate(s.lg, s.tx, target) + if err != nil { + s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err)) + } +} + +func (s *serverVersionAdapter) Lock() { + s.tx = s.be.BatchTx() + s.tx.Lock() +} + +func (s *serverVersionAdapter) Unlock() { + s.tx.Unlock() + s.tx = nil +} diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 9a31f659c..da9cc6da0 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -147,16 +147,29 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co ci = cindex.NewConsistentIndex(nil) beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) be = serverstorage.OpenBackend(cfg, beHooks) + defer func() { + if err != nil && be != nil { + be.Close() + } + }() ci.SetBackend(be) schema.CreateMetaBucket(be.BatchTx()) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { - err := maybeDefragBackend(cfg, be) + err = maybeDefragBackend(cfg, be) if err != nil { - be.Close() return nil, nil, false, nil, err } } cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) + + // TODO(serathius): Implement schema setup in fresh storage + if beExist { + err = schema.Validate(cfg.Logger, be.BatchTx()) + if err != nil { + cfg.Logger.Error("Failed to validate schema", zap.Error(err)) + return nil, nil, false, nil, err + } + } return be, ci, beExist, beHooks, nil } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5db8f31ce..c491b8225 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -291,9 +291,6 @@ type EtcdServer struct { firstCommitInTerm *notify.Notifier *AccessController - - // Ensure that storage schema is updated only once. - updateStorageSchema sync.Once } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -516,7 +513,8 @@ func (s *EtcdServer) Start() { s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.GoAttach(s.purgeFile) s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) }) - s.GoAttach(s.monitorVersions) + s.GoAttach(s.monitorClusterVersions) + s.GoAttach(s.monitorStorageVersion) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) s.GoAttach(s.monitorDowngrade) @@ -2071,12 +2069,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) - s.updateStorageSchema.Do(func() { - err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx()) - if err != nil { - s.lg.Warn("failed to update storage version", zap.Error(err)) - } - }) // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after @@ -2137,9 +2129,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { return s.cluster.Version() } -// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed. -func (s *EtcdServer) monitorVersions() { - monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) +// monitorClusterVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed. +func (s *EtcdServer) monitorClusterVersions() { + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) for { select { case <-s.firstCommitInTerm.Receive(): @@ -2155,6 +2147,19 @@ func (s *EtcdServer) monitorVersions() { } } +// monitorStorageVersion every monitorVersionInterval updates storage version if needed. +func (s *EtcdServer) monitorStorageVersion() { + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) + for { + select { + case <-time.After(monitorVersionInterval): + case <-s.stopping: + return + } + monitor.UpdateStorageVersionIfNeeded() + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() @@ -2233,7 +2238,7 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) { // monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed. func (s *EtcdServer) monitorDowngrade() { - monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) + monitor := serverversion.NewMonitor(s.Logger(), newServerVersionAdapter(s)) t := s.Cfg.DowngradeCheckTime if t == 0 { return diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 0db7be625..19e91f7ef 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -34,6 +34,12 @@ type Server interface { GetVersions() map[string]*version.Versions UpdateClusterVersion(string) DowngradeCancel() + + GetStorageVersion() *semver.Version + UpdateStorageVersion(semver.Version) + + Lock() + Unlock() } func NewMonitor(lg *zap.Logger, storage Server) *Monitor { @@ -73,6 +79,24 @@ func (m *Monitor) UpdateClusterVersionIfNeeded() { } } +// UpdateStorageVersionIfNeeded updates the storage version if it differs from cluster version. +func (m *Monitor) UpdateStorageVersionIfNeeded() { + cv := m.s.GetClusterVersion() + if cv == nil { + return + } + m.s.Lock() + defer m.s.Unlock() + sv := m.s.GetStorageVersion() + + if sv == nil || sv.Major != cv.Major || sv.Minor != cv.Minor { + if sv != nil { + m.lg.Info("storage version differs from storage version.", zap.String("cluster-version", cv.String()), zap.String("storage-version", sv.String())) + } + m.s.UpdateStorageVersion(semver.Version{Major: cv.Major, Minor: cv.Minor}) + } +} + func (m *Monitor) CancelDowngradeIfNeeded() { d := m.s.GetDowngradeInfo() if !d.Enabled { diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 1a712ef08..b76915b11 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -13,6 +13,11 @@ import ( var testLogger = zap.NewExample() +var ( + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} +) + func TestDecideClusterVersion(t *testing.T) { tests := []struct { vers map[string]*version.Versions @@ -52,6 +57,55 @@ func TestDecideClusterVersion(t *testing.T) { } } +func TestDecideStorageVersion(t *testing.T) { + tests := []struct { + name string + clusterVersion *semver.Version + storageVersion *semver.Version + expectStorageVersion *semver.Version + }{ + { + name: "No action if cluster version is nil", + }, + { + name: "Should set storage version if cluster version is set", + clusterVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version was already set", + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "No action if storage version equals cluster version", + clusterVersion: &V3_5, + storageVersion: &V3_5, + expectStorageVersion: &V3_5, + }, + { + name: "Should set storage version to cluster version", + clusterVersion: &V3_6, + storageVersion: &V3_5, + expectStorageVersion: &V3_6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &storageMock{ + clusterVersion: tt.clusterVersion, + storageVersion: tt.storageVersion, + } + monitor := NewMonitor(testLogger, s) + monitor.UpdateStorageVersionIfNeeded() + if !reflect.DeepEqual(s.storageVersion, tt.expectStorageVersion) { + t.Errorf("Unexpected storage version value, got = %+v, want %+v", s.storageVersion, tt.expectStorageVersion) + } + }) + } +} + func TestVersionMatchTarget(t *testing.T) { tests := []struct { name string @@ -107,7 +161,9 @@ func TestVersionMatchTarget(t *testing.T) { type storageMock struct { versions map[string]*version.Versions clusterVersion *semver.Version + storageVersion *semver.Version downgradeInfo *membership.DowngradeInfo + locked bool } var _ Server = (*storageMock)(nil) @@ -131,3 +187,22 @@ func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo { func (s *storageMock) GetVersions() map[string]*version.Versions { return s.versions } + +func (s *storageMock) GetStorageVersion() *semver.Version { + return s.storageVersion +} + +func (s *storageMock) UpdateStorageVersion(v semver.Version) { + s.storageVersion = &v +} + +func (s *storageMock) Lock() { + if s.locked { + panic("Deadlock") + } + s.locked = true +} + +func (s *storageMock) Unlock() { + s.locked = false +} diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index c39ed71e0..e5eda721b 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -88,8 +88,9 @@ var ( func DefaultIgnores(bucket, key []byte) bool { // consistent index & term might be changed due to v2 internal sync, which // is not controllable by the user. + // storage version might change after wal snapshot and is not controller by user. return bytes.Compare(bucket, Meta.Name()) == 0 && - (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0) + (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0 || bytes.Compare(key, MetaStorageVersionName) == 0) } func BackendMemberKey(id types.ID) []byte { diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index e498e8285..0fbdb7bb9 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -25,28 +25,45 @@ import ( ) var ( - V3_5 = semver.Version{Major: 3, Minor: 5} - V3_6 = semver.Version{Major: 3, Minor: 6} - currentVersion semver.Version + V3_5 = semver.Version{Major: 3, Minor: 5} + V3_6 = semver.Version{Major: 3, Minor: 6} ) -func init() { - v := semver.New(version.Version) - currentVersion = semver.Version{Major: v.Major, Minor: v.Minor} +// Validate checks provided backend to confirm that schema used is supported. +func Validate(lg *zap.Logger, tx backend.BatchTx) error { + tx.Lock() + defer tx.Unlock() + return unsafeValidate(lg, tx) } -// UpdateStorageSchema updates storage schema to etcd binary version. -func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error { - return Migrate(lg, tx, currentVersion) +func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error { + current, err := UnsafeDetectSchemaVersion(lg, tx) + if err != nil { + // v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version. + lg.Warn("Failed to detect storage schema version. Please wait till wal snapshot before upgrading cluster.") + return nil + } + _, err = newPlan(lg, current, localBinaryVersion()) + return err +} + +func localBinaryVersion() semver.Version { + v := semver.New(version.Version) + return semver.Version{Major: v.Major, Minor: v.Minor} } // Migrate updates storage schema to provided target version. func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { tx.Lock() defer tx.Unlock() + return UnsafeMigrate(lg, tx, target) +} + +// UnsafeMigrate is non-threadsafe version of Migrate. +func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { - return fmt.Errorf("cannot determine storage version: %w", err) + return fmt.Errorf("cannot detect storage schema version: %w", err) } plan, err := newPlan(lg, current, target) if err != nil { @@ -65,7 +82,7 @@ func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, e return UnsafeDetectSchemaVersion(lg, tx) } -// UnsafeDetectSchemaVersion non thread safe version of DetectSchemaVersion. +// UnsafeDetectSchemaVersion non-threadsafe version of DetectSchemaVersion. func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { vp := UnsafeReadStorageVersion(tx) if vp != nil { diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index c919751c2..bb4a0e3e9 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -31,6 +31,70 @@ var ( V3_7 = semver.Version{Major: 3, Minor: 7} ) +func TestValidate(t *testing.T) { + tcs := []struct { + name string + version semver.Version + // Overrides which keys should be set (default based on version) + overrideKeys func(tx backend.BatchTx) + expectError bool + expectErrorMsg string + }{ + // As storage version field was added in v3.6, for v3.5 we will not set it. + // For storage to be considered v3.5 it have both confstate and term key set. + { + name: `V3.4 schema is correct`, + version: V3_4, + }, + { + name: `V3.5 schema without confstate and term fields is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) {}, + }, + { + name: `V3.5 schema without term field is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) { + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + }, + }, + { + name: `V3.5 schema with all fields is correct`, + version: V3_5, + overrideKeys: func(tx backend.BatchTx) { + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + }, + }, + { + name: `V3.6 schema is correct`, + version: V3_6, + }, + { + name: `V3.7 schema is unknown and should return error`, + version: V3_7, + expectError: true, + expectErrorMsg: "downgrades are not yet supported", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dataPath := setupBackendData(t, tc.version, tc.overrideKeys) + + b := backend.NewDefaultBackend(dataPath) + defer b.Close() + err := Validate(lg, b.BatchTx()) + if (err != nil) != tc.expectError { + t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError) + } + if err != nil && err.Error() != tc.expectErrorMsg { + t.Errorf("Validate(lg, tx) = %q, expected error message: %q", err, tc.expectErrorMsg) + } + }) + } +} + func TestMigrate(t *testing.T) { tcs := []struct { name string @@ -52,7 +116,7 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot determine storage version: missing confstate information`, + expectErrorMsg: `cannot detect storage schema version: missing confstate information`, }, { name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`, @@ -63,7 +127,7 @@ func TestMigrate(t *testing.T) { targetVersion: V3_6, expectVersion: nil, expectError: true, - expectErrorMsg: `cannot determine storage version: missing term information`, + expectErrorMsg: `cannot detect storage schema version: missing term information`, }, { name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`, @@ -125,23 +189,9 @@ func TestMigrate(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { lg := zap.NewNop() - be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } - tx.Lock() - UnsafeCreateMetaBucket(tx) - if tc.overrideKeys != nil { - tc.overrideKeys(tx) - } else { - setupKeys(t, tx, tc.version) - } - tx.Unlock() - be.ForceCommit() - be.Close() + dataPath := setupBackendData(t, tc.version, tc.overrideKeys) - b := backend.NewDefaultBackend(tmpPath) + b := backend.NewDefaultBackend(dataPath) defer b.Close() err := Migrate(lg, b.BatchTx(), tc.targetVersion) if (err != nil) != tc.expectError { @@ -182,39 +232,31 @@ func TestMigrateIsReversible(t *testing.T) { for _, tc := range tcs { t.Run(tc.initialVersion.String(), func(t *testing.T) { lg := zap.NewNop() - be, _ := betesting.NewTmpBackend(t, time.Microsecond, 10) + dataPath := setupBackendData(t, tc.initialVersion, nil) + + be := backend.NewDefaultBackend(dataPath) defer be.Close() tx := be.BatchTx() - if tx == nil { - t.Fatal("batch tx is nil") - } tx.Lock() - UnsafeCreateMetaBucket(tx) - setupKeys(t, tx, tc.initialVersion) + defer tx.Unlock() assertBucketState(t, tx, Meta, tc.state) - tx.Unlock() // Upgrade to current version - tx.Lock() - err := testUnsafeMigrate(lg, be.BatchTx(), currentVersion) + ver := localBinaryVersion() + err := testUnsafeMigrate(lg, tx, ver) if err != nil { - t.Errorf("Migrate(lg, tx, %q) returned error %+v", currentVersion, err) + t.Errorf("Migrate(lg, tx, %q) returned error %+v", ver, err) } - assert.Equal(t, ¤tVersion, UnsafeReadStorageVersion(tx)) - tx.Unlock() + assert.Equal(t, &ver, UnsafeReadStorageVersion(tx)) // Downgrade back to initial version - tx.Lock() - err = testUnsafeMigrate(lg, be.BatchTx(), tc.initialVersion) + err = testUnsafeMigrate(lg, tx, tc.initialVersion) if err != nil { t.Errorf("Migrate(lg, tx, %q) returned error %+v", tc.initialVersion, err) } - tx.Unlock() // Assert that all changes were revered - tx.Lock() assertBucketState(t, tx, Meta, tc.state) - tx.Unlock() }) } } @@ -233,23 +275,38 @@ func testUnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version return plan.unsafeExecute(lg, tx) } -func setupKeys(t *testing.T, tx backend.BatchTx, ver semver.Version) { +func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx backend.BatchTx)) string { t.Helper() - switch ver { - case V3_4: - case V3_5: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - case V3_6: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - UnsafeSetStorageVersion(tx, &V3_6) - case V3_7: - MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) - UnsafeUpdateConsistentIndex(tx, 1, 1, false) - UnsafeSetStorageVersion(tx, &V3_7) - tx.UnsafePut(Meta, []byte("future-key"), []byte("")) - default: - t.Fatalf("Unsupported storage version") + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") } + tx.Lock() + UnsafeCreateMetaBucket(tx) + if overrideKeys != nil { + overrideKeys(tx) + } else { + switch version { + case V3_4: + case V3_5: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + case V3_6: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_6) + case V3_7: + MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) + UnsafeUpdateConsistentIndex(tx, 1, 1, false) + UnsafeSetStorageVersion(tx, &V3_7) + tx.UnsafePut(Meta, []byte("future-key"), []byte("")) + default: + t.Fatalf("Unsupported storage version") + } + } + tx.Unlock() + be.ForceCommit() + be.Close() + return tmpPath }