From 221c8878190cd1617bee4805d82501a685131a35 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 29 Jun 2021 13:29:42 +0200 Subject: [PATCH] etcdserver: Extract functions for setting and reading compaction information in backend --- server/mvcc/kvstore.go | 21 ++---- server/mvcc/kvstore_compaction.go | 4 +- server/mvcc/kvstore_compaction_test.go | 7 +- server/mvcc/kvstore_test.go | 2 +- server/mvcc/store.go | 60 ++++++++++++++++ server/mvcc/store_test.go | 96 ++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 24 deletions(-) create mode 100644 server/mvcc/store.go create mode 100644 server/mvcc/store_test.go diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index f60aeb6f4..2fb3f60eb 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -236,13 +236,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { s.compactMainRev = rev - rbytes := newRevBytes() - revToBytes(revision{main: rev}, rbytes) - - tx := s.b.BatchTx() - tx.Lock() - tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) - tx.Unlock() + SetScheduledCompact(s.b.BatchTx(), rev) // ensure that desired compaction is persisted s.b.ForceCommit() @@ -339,10 +333,10 @@ func (s *store) restore() error { tx := s.b.BatchTx() tx.Lock() - _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) - if len(finishedCompactBytes) != 0 { + finishedCompact, found := UnsafeReadFinishedCompact(tx) + if found { s.revMu.Lock() - s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main + s.compactMainRev = finishedCompact s.lg.Info( "restored last compact revision", @@ -352,12 +346,7 @@ func (s *store) restore() error { ) s.revMu.Unlock() } - _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0) - scheduledCompact := int64(0) - if len(scheduledCompactBytes) != 0 { - scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main - } - + scheduledCompact, _ := UnsafeReadScheduledCompact(tx) // index keys concurrently as they're loaded in from tx keysGauge.Set(0) rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index a3a02fe48..d6d186d83 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -53,9 +53,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc } if len(keys) < batchNum { - rbytes := make([]byte, 8+1+8) - revToBytes(revision{main: compactMainRev}, rbytes) - tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes) + UnsafeSetFinishedCompact(tx, compactMainRev) tx.Unlock() s.lg.Info( "finished scheduled compaction", diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 7af751124..36eebad02 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -89,10 +89,9 @@ func TestScheduleCompaction(t *testing.T) { t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) } } - _, vals := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) - revToBytes(revision{main: tt.rev}, ibytes) - if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) { - t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, w) + vals, _ := UnsafeReadFinishedCompact(tx) + if !reflect.DeepEqual(vals, tt.rev) { + t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, tt.rev) } tx.Unlock() diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 0dcab8dbb..12049d561 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -512,7 +512,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { // The disk compaction is done asynchronously and requires more time on slow disk. // try 5 times for CI with slow IO. for i := 0; i < 5; i++ { - tx = s.b.BatchTx() + tx := s.b.BatchTx() tx.Lock() ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0) tx.Unlock() diff --git a/server/mvcc/store.go b/server/mvcc/store.go new file mode 100644 index 000000000..04ef2e3cb --- /dev/null +++ b/server/mvcc/store.go @@ -0,0 +1,60 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" +) + +func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) { + _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) + if len(finishedCompactBytes) != 0 { + return bytesToRev(finishedCompactBytes[0]).main, true + } + return 0, false +} + +func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) { + _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0) + if len(scheduledCompactBytes) != 0 { + return bytesToRev(scheduledCompactBytes[0]).main, true + } + return 0, false +} + +func SetScheduledCompact(tx backend.BatchTx, value int64) { + tx.Lock() + defer tx.Unlock() + UnsafeSetScheduledCompact(tx, value) +} + +func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { + rbytes := newRevBytes() + revToBytes(revision{main: value}, rbytes) + tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) +} + +func SetFinishedCompact(tx backend.BatchTx, value int64) { + tx.Lock() + defer tx.Unlock() + UnsafeSetFinishedCompact(tx, value) +} + +func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) { + rbytes := newRevBytes() + revToBytes(revision{main: value}, rbytes) + tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes) +} diff --git a/server/mvcc/store_test.go b/server/mvcc/store_test.go new file mode 100644 index 000000000..f83f0d731 --- /dev/null +++ b/server/mvcc/store_test.go @@ -0,0 +1,96 @@ +package mvcc + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.etcd.io/etcd/server/v3/mvcc/backend" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" +) + +// TestScheduledCompact ensures that UnsafeSetScheduledCompact&UnsafeReadScheduledCompact work well together. +func TestScheduledCompact(t *testing.T) { + tcs := []struct { + value int64 + }{ + { + value: 1, + }, + { + value: 0, + }, + { + value: math.MaxInt64, + }, + { + value: math.MinInt64, + }, + } + for _, tc := range tcs { + t.Run(fmt.Sprint(tc.value), func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + tx.UnsafeCreateBucket(buckets.Meta) + UnsafeSetScheduledCompact(tx, tc.value) + tx.Unlock() + be.ForceCommit() + be.Close() + + b := backend.NewDefaultBackend(tmpPath) + defer b.Close() + v, found := UnsafeReadScheduledCompact(b.BatchTx()) + assert.Equal(t, true, found) + assert.Equal(t, tc.value, v) + }) + } +} + +// TestFinishedCompact ensures that UnsafeSetFinishedCompact&UnsafeReadFinishedCompact work well together. +func TestFinishedCompact(t *testing.T) { + tcs := []struct { + value int64 + }{ + { + value: 1, + }, + { + value: 0, + }, + { + value: math.MaxInt64, + }, + { + value: math.MinInt64, + }, + } + for _, tc := range tcs { + t.Run(fmt.Sprint(tc.value), func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + tx.UnsafeCreateBucket(buckets.Meta) + UnsafeSetFinishedCompact(tx, tc.value) + tx.Unlock() + be.ForceCommit() + be.Close() + + b := backend.NewDefaultBackend(tmpPath) + defer b.Close() + v, found := UnsafeReadFinishedCompact(b.BatchTx()) + assert.Equal(t, true, found) + assert.Equal(t, tc.value, v) + }) + } +}