diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 38cc91371..fa6df6da4 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -195,7 +195,7 @@ func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (* return nil, togRPCError(err) } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: compactRev} ms.hdr.fill(resp.Header) return resp, nil } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 3b02aac44..5990c8daf 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -51,26 +51,19 @@ type Hasher interface { func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { return &corruptionMonitor{ lg: lg, - hasher: hasherAdapter{s}, + hasher: hasherAdapter{s, s.KV()}, } } type hasherAdapter struct { *EtcdServer + mvcc.KV } func (h hasherAdapter) MemberId() types.ID { return h.EtcdServer.ID() } -func (h hasherAdapter) Hash() (hash uint32, revision int64, err error) { - return h.EtcdServer.KV().Hash() -} - -func (h hasherAdapter) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) { - return h.EtcdServer.KV().HashByRev(rev) -} - func (h hasherAdapter) ReqTimeout() time.Duration { return h.EtcdServer.Cfg.ReqTimeout() } @@ -107,7 +100,7 @@ func (cm *corruptionMonitor) InitialCheck() error { zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", peerID.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Int64("remote-peer-revision", p.resp.Header.Revision), @@ -115,7 +108,7 @@ func (cm *corruptionMonitor) InitialCheck() error { zap.Uint32("remote-peer-hash", p.resp.Hash), } - if h != p.resp.Hash { + if h.Hash != p.resp.Hash { if crev == p.resp.CompactRevision { cm.lg.Warn("found different hash values from remote peer", fields...) mismatch++ @@ -135,7 +128,7 @@ func (cm *corruptionMonitor) InitialCheck() error { zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), @@ -146,7 +139,7 @@ func (cm *corruptionMonitor) InitialCheck() error { zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), @@ -193,15 +186,15 @@ func (cm *corruptionMonitor) periodicCheck() error { cm.hasher.TriggerCorruptAlarm(id) } - if h2 != h && rev2 == rev && crev == crev2 { + if h2.Hash != h.Hash && rev2 == rev && crev == crev2 { cm.lg.Warn( "found hash mismatch", zap.Int64("revision-1", rev), zap.Int64("compact-revision-1", crev), - zap.Uint32("hash-1", h), + zap.Uint32("hash-1", h.Hash), zap.Int64("revision-2", rev2), zap.Int64("compact-revision-2", crev2), - zap.Uint32("hash-2", h2), + zap.Uint32("hash-2", h2.Hash), ) mismatch(uint64(cm.hasher.MemberId())) } @@ -237,11 +230,11 @@ func (cm *corruptionMonitor) periodicCheck() error { } // follower's compact revision is leader's old one, then hashes must match - if p.resp.CompactRevision == crev && p.resp.Hash != h { + if p.resp.CompactRevision == crev && p.resp.Hash != h.Hash { cm.lg.Warn( "same compact revision then hashes must match", zap.Int64("leader-compact-revision", crev2), - zap.Uint32("leader-hash", h), + zap.Uint32("leader-hash", h.Hash), zap.Int64("follower-compact-revision", p.resp.CompactRevision), zap.Uint32("follower-hash", p.resp.Hash), zap.String("follower-peer-id", types.ID(id).String()), @@ -401,7 +394,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: compactRev} respBytes, err := json.Marshal(resp) if err != nil { h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index c43158e46..617c2fbcd 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -24,6 +24,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap/zaptest" ) @@ -70,18 +71,18 @@ func TestInitialCheck(t *testing.T) { }, { name: "Peer returned same hash", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, { name: "Peer returned different hash with same compaction rev", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, expectError: true, }, { name: "Peer returned different hash and compaction rev", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, } @@ -136,17 +137,17 @@ func TestPeriodicCheck(t *testing.T) { }, { name: "Different local hash and revision", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, revision: 1}, {hash: 2, revision: 2}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2}}}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, }, { name: "Different local hash and compaction revision", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}, {hash: 2, compactRev: 2}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, compactRev: 2}}}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, }, { name: "Different local hash and same revisions", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 1, compactRev: 1}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 1, compactRev: 1}}}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"}, expectCorrupt: true, }, @@ -176,7 +177,7 @@ func TestPeriodicCheck(t *testing.T) { { name: "Peer with same hash and compact revision", hasher: fakeHasher{ - hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 2, compactRev: 2}}, + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}}, }, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, @@ -184,7 +185,7 @@ func TestPeriodicCheck(t *testing.T) { { name: "Peer with different hash and same compact revision as first local", hasher: fakeHasher{ - hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 2, compactRev: 2}}, + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}}, }, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"}, @@ -231,7 +232,7 @@ type fakeHasher struct { } type hashByRev struct { - hash uint32 + hash mvcc.KeyValueHash revision int64 compactRev int64 err error @@ -241,10 +242,10 @@ func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) { panic("not implemented") } -func (f *fakeHasher) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) { +func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, compactRev int64, err error) { f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev)) if len(f.hashByRevResponses) == 0 { - return 0, 0, 0, nil + return mvcc.KeyValueHash{}, 0, 0, nil } hashByRev := f.hashByRevResponses[f.hashByRevIndex] f.hashByRevIndex++ diff --git a/server/mvcc/hash.go b/server/mvcc/hash.go index 9b7b61574..b37ab0043 100644 --- a/server/mvcc/hash.go +++ b/server/mvcc/hash.go @@ -22,8 +22,8 @@ import ( "go.etcd.io/etcd/server/v3/mvcc/buckets" ) -func unsafeHashByRev(tx backend.ReadTx, lower, upper int64, keep map[revision]struct{}) (uint32, error) { - h := newKVHasher(lower, upper, keep) +func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { + h := newKVHasher(compactRevision, revision, keep) err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { h.WriteKeyValue(k, v) return nil @@ -32,29 +32,30 @@ func unsafeHashByRev(tx backend.ReadTx, lower, upper int64, keep map[revision]st } type kvHasher struct { - hash hash.Hash32 - lower, upper int64 - keep map[revision]struct{} + hash hash.Hash32 + compactRevision int64 + revision int64 + keep map[revision]struct{} } -func newKVHasher(lower, upper int64, keep map[revision]struct{}) kvHasher { +func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher { h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h.Write(buckets.Key.Name()) return kvHasher{ - hash: h, - lower: lower, - upper: upper, - keep: keep, + hash: h, + compactRevision: compactRev, + revision: rev, + keep: keep, } } func (h *kvHasher) WriteKeyValue(k, v []byte) { kr := bytesToRev(k) - upper := revision{main: h.upper + 1} + upper := revision{main: h.revision + 1} if !upper.GreaterThan(kr) { return } - lower := revision{main: h.lower + 1} + lower := revision{main: h.compactRevision + 1} // skip revisions that are scheduled for deletion // due to compacting; don't skip if there isn't one. if lower.GreaterThan(kr) && len(h.keep) > 0 { @@ -66,6 +67,12 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) { h.hash.Write(v) } -func (h *kvHasher) Hash() uint32 { - return h.hash.Sum32() +func (h *kvHasher) Hash() KeyValueHash { + return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision} +} + +type KeyValueHash struct { + Hash uint32 + CompactRevision int64 + Revision int64 } diff --git a/server/mvcc/hash_test.go b/server/mvcc/hash_test.go index 651551c9b..f7855f128 100644 --- a/server/mvcc/hash_test.go +++ b/server/mvcc/hash_test.go @@ -41,7 +41,7 @@ func TestHashByRevValue(t *testing.T) { assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) assert.Less(t, int64(compactionCycle*10), totalRevisions) var rev int64 - var got []kvHash + var got []KeyValueHash for ; rev < totalRevisions; rev += compactionCycle { putKVs(s, rev, compactionCycle) hash := testHashByRev(t, s, rev+compactionCycle/2) @@ -50,7 +50,7 @@ func TestHashByRevValue(t *testing.T) { putKVs(s, rev, totalRevisions) hash := testHashByRev(t, s, rev+totalRevisions/2) got = append(got, hash) - assert.Equal(t, []kvHash{ + assert.Equal(t, []KeyValueHash{ {4082599214, -1, 35}, {2279933401, 35, 106}, {3284231217, 106, 177}, @@ -81,7 +81,7 @@ func TestHashByRevValueZero(t *testing.T) { assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) assert.Less(t, int64(compactionCycle*10), totalRevisions) var rev int64 - var got []kvHash + var got []KeyValueHash for ; rev < totalRevisions; rev += compactionCycle { putKVs(s, rev, compactionCycle) hash := testHashByRev(t, s, 0) @@ -90,7 +90,7 @@ func TestHashByRevValueZero(t *testing.T) { putKVs(s, rev, totalRevisions) hash := testHashByRev(t, s, 0) got = append(got, hash) - assert.Equal(t, []kvHash{ + assert.Equal(t, []KeyValueHash{ {1913897190, -1, 73}, {224860069, 73, 145}, {1565167519, 145, 217}, @@ -119,8 +119,8 @@ func putKVs(s *store, rev, count int64) { } } -func testHashByRev(t *testing.T, s *store, rev int64) kvHash { - hash, currentRev, compactRev, err := s.HashByRev(rev) +func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { + hash, currentRev, _, err := s.HashByRev(rev) assert.NoError(t, err, "error on rev %v", rev) if rev == 0 { @@ -128,13 +128,7 @@ func testHashByRev(t *testing.T, s *store, rev int64) kvHash { } _, err = s.Compact(traceutil.TODO(), rev) assert.NoError(t, err, "error on compact %v", rev) - return kvHash{hash: hash, compactRevision: compactRev, revision: rev} -} - -type kvHash struct { - hash uint32 - compactRevision int64 - revision int64 + return hash } // TODO: Change this to fuzz test diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index 9dbb29947..c71d58c42 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -136,7 +136,7 @@ type Hasher interface { Hash() (hash uint32, revision int64, err error) // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) + HashByRev(rev int64) (hash KeyValueHash, revision int64, compactRev int64, err error) } // WatchableKV is a KV that can be watched. diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index cdc318894..9042359c3 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -164,7 +164,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } -func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { +func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compactRev int64, err error) { start := time.Now() s.mu.RLock() @@ -174,10 +174,10 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev if rev > 0 && rev <= compactRev { s.mu.RUnlock() - return 0, 0, compactRev, ErrCompacted + return KeyValueHash{}, 0, compactRev, ErrCompacted } else if rev > 0 && rev > currentRev { s.mu.RUnlock() - return 0, currentRev, 0, ErrFutureRev + return KeyValueHash{}, currentRev, 0, ErrFutureRev } if rev == 0 { diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 7b8d2fef6..a1028e122 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32, error) { +func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) { totalStart := time.Now() keep := s.kvindex.Compact(compactMainRev) indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) @@ -67,7 +67,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32 "finished scheduled compaction", zap.Int64("compact-revision", compactMainRev), zap.Duration("took", time.Since(totalStart)), - zap.Uint32("hash", hash), + zap.Uint32("hash", hash.Hash), ) return hash, nil } @@ -82,7 +82,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32 select { case <-time.After(10 * time.Millisecond): case <-s.stopc: - return 0, fmt.Errorf("interrupted due to stop signal") + return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") } } } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index dae5f48ee..eb9582be4 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -559,7 +559,7 @@ func TestHashKVWhenCompacting(t *testing.T) { select { case <-donec: return - case hashCompactc <- hashKVResult{hash, compactRev}: + case hashCompactc <- hashKVResult{hash.Hash, compactRev}: } } }() @@ -618,7 +618,7 @@ func TestHashKVZeroRevision(t *testing.T) { if err != nil { t.Fatal(err) } - var hash2 uint32 + var hash2 KeyValueHash hash2, _, _, err = s.HashByRev(0) if err != nil { t.Fatal(err)