server: Extract hasher to separate interface

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2022-05-19 17:33:58 +02:00
parent 80828b593a
commit 2b090e86a6
8 changed files with 64 additions and 40 deletions

View File

@ -72,7 +72,7 @@ type ClusterStatusGetter interface {
type maintenanceServer struct { type maintenanceServer struct {
lg *zap.Logger lg *zap.Logger
rg apply.RaftStatusGetter rg apply.RaftStatusGetter
kg KVGetter hasher mvcc.HashStorage
bg BackendGetter bg BackendGetter
a Alarmer a Alarmer
lt LeaderTransferrer lt LeaderTransferrer
@ -83,7 +83,7 @@ type maintenanceServer struct {
} }
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)} srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
if srv.lg == nil { if srv.lg == nil {
srv.lg = zap.NewNop() srv.lg = zap.NewNop()
} }
@ -193,7 +193,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
} }
func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, rev, err := ms.kg.KV().Hash() h, rev, err := ms.hasher.Hash()
if err != nil { if err != nil {
return nil, togRPCError(err) return nil, togRPCError(err)
} }
@ -203,7 +203,7 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H
} }
func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) { func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
h, rev, err := ms.kg.KV().HashByRev(r.Revision) h, rev, err := ms.hasher.HashByRev(r.Revision)
if err != nil { if err != nil {
return nil, togRPCError(err) return nil, togRPCError(err)
} }

View File

@ -39,7 +39,7 @@ type corruptionMonitor struct {
} }
type Hasher interface { type Hasher interface {
mvcc.Hasher mvcc.HashStorage
ReqTimeout() time.Duration ReqTimeout() time.Duration
MemberId() types.ID MemberId() types.ID
PeerHashByRev(int64) []*peerHashKVResp PeerHashByRev(int64) []*peerHashKVResp
@ -50,13 +50,13 @@ type Hasher interface {
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
return &corruptionMonitor{ return &corruptionMonitor{
lg: lg, lg: lg,
hasher: hasherAdapter{s, s.KV()}, hasher: hasherAdapter{s, s.KV().HashStorage()},
} }
} }
type hasherAdapter struct { type hasherAdapter struct {
*EtcdServer *EtcdServer
mvcc.KV mvcc.HashStorage
} }
func (h hasherAdapter) ReqTimeout() time.Duration { func (h hasherAdapter) ReqTimeout() time.Duration {
@ -345,7 +345,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest) http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return return
} }
hash, rev, err := h.server.KV().HashByRev(req.Revision) hash, rev, err := h.server.KV().HashStorage().HashByRev(req.Revision)
if err != nil { if err != nil {
h.lg.Warn( h.lg.Warn(
"failed to get hashKV", "failed to get hashKV",

View File

@ -76,3 +76,29 @@ type KeyValueHash struct {
CompactRevision int64 CompactRevision int64
Revision int64 Revision int64
} }
type HashStorage interface {
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
}
type hashStorage struct {
store *store
}
func newHashStorage(s *store) HashStorage {
return &hashStorage{
store: s,
}
}
func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
return s.store.hash()
}
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
return s.store.hashByRev(rev)
}

View File

@ -73,7 +73,7 @@ func TestHashByRevValue(t *testing.T) {
}, got) }, got)
} }
func TestHashByRevValueZero(t *testing.T) { func TestHashByRevValueLastRevision(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t) b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
@ -120,12 +120,11 @@ func putKVs(s *store, rev, count int64) {
} }
func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
hash, currentRev, err := s.HashByRev(rev)
assert.NoError(t, err, "error on rev %v", rev)
if rev == 0 { if rev == 0 {
rev = currentRev rev = s.Rev()
} }
hash, _, err := s.hashByRev(rev)
assert.NoError(t, err, "error on rev %v", rev)
_, err = s.Compact(traceutil.TODO(), rev) _, err = s.Compact(traceutil.TODO(), rev)
assert.NoError(t, err, "error on compact %v", rev) assert.NoError(t, err, "error on compact %v", rev)
return hash return hash
@ -150,7 +149,7 @@ func testCompactionHash(t *testing.T, s *store, start, stop int64) {
for i := start; i <= stop; i++ { for i := start; i <= stop; i++ {
s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0) s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0)
} }
hash1, _, err := s.HashByRev(stop) hash1, _, err := s.hashByRev(stop)
assert.NoError(t, err, "error on rev %v", stop) assert.NoError(t, err, "error on rev %v", stop)
_, prevCompactRev, err := s.updateCompactRev(stop) _, prevCompactRev, err := s.updateCompactRev(stop)

View File

@ -112,7 +112,6 @@ const (
type KV interface { type KV interface {
ReadView ReadView
WriteView WriteView
Hasher
// Read creates a read transaction. // Read creates a read transaction.
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
@ -120,6 +119,9 @@ type KV interface {
// Write creates a write transaction. // Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite Write(trace *traceutil.Trace) TxnWrite
// HashStorage returns HashStorage interface for KV storage.
HashStorage() HashStorage
// Compact frees all superseded keys with revisions less than rev. // Compact frees all superseded keys with revisions less than rev.
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
@ -131,14 +133,6 @@ type KV interface {
Close() error Close() error
} }
type Hasher interface {
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash KeyValueHash, revision int64, err error)
}
// WatchableKV is a KV that can be watched. // WatchableKV is a KV that can be watched.
type WatchableKV interface { type WatchableKV interface {
KV KV

View File

@ -639,7 +639,7 @@ func TestKVHash(t *testing.T) {
kv := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) kv := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
hashes[i], _, err = kv.Hash() hashes[i], _, err = kv.hash()
if err != nil { if err != nil {
t.Fatalf("failed to get hash: %v", err) t.Fatalf("failed to get hash: %v", err)
} }

View File

@ -83,6 +83,7 @@ type store struct {
stopc chan struct{} stopc chan struct{}
lg *zap.Logger lg *zap.Logger
hashes HashStorage
} }
// NewStore returns a new store. It is useful to create a store inside // NewStore returns a new store. It is useful to create a store inside
@ -113,6 +114,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
lg: lg, lg: lg,
} }
s.hashes = newHashStorage(s)
s.ReadView = &readView{s} s.ReadView = &readView{s}
s.WriteView = &writeView{s} s.WriteView = &writeView{s}
if s.le != nil { if s.le != nil {
@ -155,7 +157,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
close(ch) close(ch)
} }
func (s *store) Hash() (hash uint32, revision int64, err error) { func (s *store) hash() (hash uint32, revision int64, err error) {
// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
start := time.Now() start := time.Now()
@ -166,7 +168,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err return h, s.currentRev, err
} }
func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) { func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) {
var compactRev int64 var compactRev int64
start := time.Now() start := time.Now()
@ -182,7 +184,6 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
s.mu.RUnlock() s.mu.RUnlock()
return KeyValueHash{}, currentRev, ErrFutureRev return KeyValueHash{}, currentRev, ErrFutureRev
} }
if rev == 0 { if rev == 0 {
rev = currentRev rev = currentRev
} }
@ -513,3 +514,7 @@ func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
func isTombstone(b []byte) bool { func isTombstone(b []byte) bool {
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
} }
func (s *store) HashStorage() HashStorage {
return s.hashes
}

View File

@ -560,7 +560,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
for { for {
hash, _, err := s.HashByRev(int64(rev)) hash, _, err := s.HashStorage().HashByRev(int64(rev))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -622,12 +622,12 @@ func TestHashKVZeroRevision(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
hash1, _, err := s.HashByRev(int64(rev)) hash1, _, err := s.HashStorage().HashByRev(int64(rev))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var hash2 KeyValueHash var hash2 KeyValueHash
hash2, _, err = s.HashByRev(0) hash2, _, err = s.HashStorage().HashByRev(0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }