From 7358362c990ffc2593ce62845abf3c16859aa25f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 May 2022 17:33:58 +0200 Subject: [PATCH] server: Extract hasher to separate interface Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/v3rpc/maintenance.go | 25 +++++++++++---------- server/etcdserver/corrupt.go | 8 +++---- server/mvcc/hash.go | 26 ++++++++++++++++++++++ server/mvcc/hash_test.go | 11 +++++---- server/mvcc/kv.go | 12 +++------- server/mvcc/kv_test.go | 2 +- server/mvcc/kvstore.go | 13 +++++++---- server/mvcc/kvstore_test.go | 6 ++--- 8 files changed, 64 insertions(+), 39 deletions(-) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 03f1142a9..42f8b0da2 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -66,19 +66,20 @@ type ClusterStatusGetter interface { } type maintenanceServer struct { - lg *zap.Logger - rg etcdserver.RaftStatusGetter - kg KVGetter - bg BackendGetter - a Alarmer - lt LeaderTransferrer - hdr header - cs ClusterStatusGetter - d Downgrader + lg *zap.Logger + rg etcdserver.RaftStatusGetter + hasher mvcc.HashStorage + kg KVGetter + bg BackendGetter + a Alarmer + lt LeaderTransferrer + hdr header + cs ClusterStatusGetter + d Downgrader } func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s} + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s} if srv.lg == nil { srv.lg = zap.NewNop() } @@ -180,7 +181,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance } func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { - h, rev, err := ms.kg.KV().Hash() + h, rev, err := ms.hasher.Hash() if err != nil { return nil, togRPCError(err) } @@ -190,7 +191,7 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H } func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) { - h, rev, err := ms.kg.KV().HashByRev(r.Revision) + h, rev, err := ms.hasher.HashByRev(r.Revision) if err != nil { return nil, togRPCError(err) } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 9eaadfba7..be7af1967 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -40,7 +40,7 @@ type corruptionMonitor struct { } type Hasher interface { - mvcc.Hasher + mvcc.HashStorage ReqTimeout() time.Duration MemberId() types.ID PeerHashByRev(int64) []*peerHashKVResp @@ -51,13 +51,13 @@ type Hasher interface { func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { return &corruptionMonitor{ lg: lg, - hasher: hasherAdapter{s, s.KV()}, + hasher: hasherAdapter{s, s.KV().HashStorage()}, } } type hasherAdapter struct { *EtcdServer - mvcc.KV + mvcc.HashStorage } func (h hasherAdapter) MemberId() types.ID { @@ -384,7 +384,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } - hash, rev, err := h.server.KV().HashByRev(req.Revision) + hash, rev, err := h.server.KV().HashStorage().HashByRev(req.Revision) if err != nil { h.lg.Warn( "failed to get hashKV", diff --git a/server/mvcc/hash.go b/server/mvcc/hash.go index b37ab0043..1b83a98d0 100644 --- a/server/mvcc/hash.go +++ b/server/mvcc/hash.go @@ -76,3 +76,29 @@ type KeyValueHash struct { CompactRevision int64 Revision int64 } + +type HashStorage interface { + // Hash computes the hash of the KV's backend. + Hash() (hash uint32, revision int64, err error) + + // HashByRev computes the hash of all MVCC revisions up to a given revision. + HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) +} + +type hashStorage struct { + store *store +} + +func newHashStorage(s *store) HashStorage { + return &hashStorage{ + store: s, + } +} + +func (s *hashStorage) Hash() (hash uint32, revision int64, err error) { + return s.store.hash() +} + +func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) { + return s.store.hashByRev(rev) +} diff --git a/server/mvcc/hash_test.go b/server/mvcc/hash_test.go index 5d73ffcd7..c93f5e022 100644 --- a/server/mvcc/hash_test.go +++ b/server/mvcc/hash_test.go @@ -73,7 +73,7 @@ func TestHashByRevValue(t *testing.T) { }, got) } -func TestHashByRevValueZero(t *testing.T) { +func TestHashByRevValueLastRevision(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) @@ -120,12 +120,11 @@ func putKVs(s *store, rev, count int64) { } func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { - hash, currentRev, err := s.HashByRev(rev) - assert.NoError(t, err, "error on rev %v", rev) - if rev == 0 { - rev = currentRev + rev = s.Rev() } + hash, _, err := s.hashByRev(rev) + assert.NoError(t, err, "error on rev %v", rev) _, err = s.Compact(traceutil.TODO(), rev) assert.NoError(t, err, "error on compact %v", rev) return hash @@ -150,7 +149,7 @@ func testCompactionHash(t *testing.T, s *store, start, stop int64) { for i := start; i <= stop; i++ { s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0) } - hash1, _, err := s.HashByRev(stop) + hash1, _, err := s.hashByRev(stop) assert.NoError(t, err, "error on rev %v", stop) _, prevCompactRev, err := s.updateCompactRev(stop) diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index 5a3fbc323..109b0d7cc 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -112,7 +112,6 @@ const ( type KV interface { ReadView WriteView - Hasher // Read creates a read transaction. Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead @@ -120,6 +119,9 @@ type KV interface { // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite + // HashStorage returns HashStorage interface for KV storage. + HashStorage() HashStorage + // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) @@ -131,14 +133,6 @@ type KV interface { Close() error } -type Hasher interface { - // Hash computes the hash of the KV's backend. - Hash() (hash uint32, revision int64, err error) - - // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash KeyValueHash, revision int64, err error) -} - // WatchableKV is a KV that can be watched. type WatchableKV interface { KV diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index ad33b4041..0f84a4d11 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -605,7 +605,7 @@ func TestKVHash(t *testing.T) { kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) - hashes[i], _, err = kv.Hash() + hashes[i], _, err = kv.hash() if err != nil { t.Fatalf("failed to get hash: %v", err) } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 622374933..e7d465baf 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -83,7 +83,8 @@ type store struct { stopc chan struct{} - lg *zap.Logger + lg *zap.Logger + hashes HashStorage } // NewStore returns a new store. It is useful to create a store inside @@ -111,6 +112,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi lg: lg, } + s.hashes = newHashStorage(s) s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { @@ -153,7 +155,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { close(ch) } -func (s *store) Hash() (hash uint32, revision int64, err error) { +func (s *store) hash() (hash uint32, revision int64, err error) { // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly start := time.Now() @@ -164,7 +166,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } -func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) { +func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) { var compactRev int64 start := time.Now() @@ -180,7 +182,6 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e s.mu.RUnlock() return KeyValueHash{}, currentRev, ErrFutureRev } - if rev == 0 { rev = currentRev } @@ -528,3 +529,7 @@ func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { func isTombstone(b []byte) bool { return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone } + +func (s *store) HashStorage() HashStorage { + return s.hashes +} diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index cc4b186da..bae7ba599 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -552,7 +552,7 @@ func TestHashKVWhenCompacting(t *testing.T) { go func() { defer wg.Done() for { - hash, _, err := s.HashByRev(int64(rev)) + hash, _, err := s.HashStorage().HashByRev(int64(rev)) if err != nil { t.Error(err) } @@ -614,12 +614,12 @@ func TestHashKVZeroRevision(t *testing.T) { t.Fatal(err) } - hash1, _, err := s.HashByRev(int64(rev)) + hash1, _, err := s.HashStorage().HashByRev(int64(rev)) if err != nil { t.Fatal(err) } var hash2 KeyValueHash - hash2, _, err = s.HashByRev(0) + hash2, _, err = s.HashStorage().HashByRev(0) if err != nil { t.Fatal(err) }