diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index fa6df6da4..03f1142a9 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -190,12 +190,12 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H } func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) { - h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision) + h, rev, err := ms.kg.KV().HashByRev(r.Revision) if err != nil { return nil, togRPCError(err) } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision} ms.hdr.fill(resp.Header) return resp, nil } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 5990c8daf..9eaadfba7 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -87,7 +87,7 @@ func (cm *corruptionMonitor) InitialCheck() error { zap.Duration("timeout", cm.hasher.ReqTimeout()), ) - h, rev, crev, err := cm.hasher.HashByRev(0) + h, rev, err := cm.hasher.HashByRev(0) if err != nil { return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err) } @@ -99,7 +99,7 @@ func (cm *corruptionMonitor) InitialCheck() error { fields := []zap.Field{ zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), + zap.Int64("local-member-compact-revision", h.CompactRevision), zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", peerID.String()), zap.Strings("remote-peer-endpoints", p.eps), @@ -109,7 +109,7 @@ func (cm *corruptionMonitor) InitialCheck() error { } if h.Hash != p.resp.Hash { - if crev == p.resp.CompactRevision { + if h.CompactRevision == p.resp.CompactRevision { cm.lg.Warn("found different hash values from remote peer", fields...) mismatch++ } else { @@ -127,7 +127,7 @@ func (cm *corruptionMonitor) InitialCheck() error { "cannot fetch hash from slow remote peer", zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), + zap.Int64("local-member-compact-revision", h.CompactRevision), zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), @@ -138,7 +138,7 @@ func (cm *corruptionMonitor) InitialCheck() error { "cannot fetch hash from remote peer; local member is behind", zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), + zap.Int64("local-member-compact-revision", h.CompactRevision), zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), @@ -159,7 +159,7 @@ func (cm *corruptionMonitor) InitialCheck() error { } func (cm *corruptionMonitor) periodicCheck() error { - h, rev, crev, err := cm.hasher.HashByRev(0) + h, rev, err := cm.hasher.HashByRev(0) if err != nil { return err } @@ -172,7 +172,7 @@ func (cm *corruptionMonitor) periodicCheck() error { return err } - h2, rev2, crev2, err := cm.hasher.HashByRev(0) + h2, rev2, err := cm.hasher.HashByRev(0) if err != nil { return err } @@ -186,14 +186,14 @@ func (cm *corruptionMonitor) periodicCheck() error { cm.hasher.TriggerCorruptAlarm(id) } - if h2.Hash != h.Hash && rev2 == rev && crev == crev2 { + if h2.Hash != h.Hash && rev2 == rev && h.CompactRevision == h2.CompactRevision { cm.lg.Warn( "found hash mismatch", zap.Int64("revision-1", rev), - zap.Int64("compact-revision-1", crev), + zap.Int64("compact-revision-1", h.CompactRevision), zap.Uint32("hash-1", h.Hash), zap.Int64("revision-2", rev2), - zap.Int64("compact-revision-2", crev2), + zap.Int64("compact-revision-2", h2.CompactRevision), zap.Uint32("hash-2", h2.Hash), ) mismatch(uint64(cm.hasher.MemberId())) @@ -219,10 +219,10 @@ func (cm *corruptionMonitor) periodicCheck() error { } // leader expects follower's latest compact revision less than or equal to leader's - if p.resp.CompactRevision > crev2 { + if p.resp.CompactRevision > h2.CompactRevision { cm.lg.Warn( "compact revision from follower must be less than or equal to leader's", - zap.Int64("leader-compact-revision", crev2), + zap.Int64("leader-compact-revision", h2.CompactRevision), zap.Int64("follower-compact-revision", p.resp.CompactRevision), zap.String("follower-peer-id", types.ID(id).String()), ) @@ -230,10 +230,10 @@ func (cm *corruptionMonitor) periodicCheck() error { } // follower's compact revision is leader's old one, then hashes must match - if p.resp.CompactRevision == crev && p.resp.Hash != h.Hash { + if p.resp.CompactRevision == h.CompactRevision && p.resp.Hash != h.Hash { cm.lg.Warn( "same compact revision then hashes must match", - zap.Int64("leader-compact-revision", crev2), + zap.Int64("leader-compact-revision", h2.CompactRevision), zap.Uint32("leader-hash", h.Hash), zap.Int64("follower-compact-revision", p.resp.CompactRevision), zap.Uint32("follower-hash", p.resp.Hash), @@ -384,7 +384,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } - hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision) + hash, rev, err := h.server.KV().HashByRev(req.Revision) if err != nil { h.lg.Warn( "failed to get hashKV", @@ -394,7 +394,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: hash.CompactRevision} respBytes, err := json.Marshal(resp) if err != nil { h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 617c2fbcd..eef5d19a2 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -76,13 +76,13 @@ func TestInitialCheck(t *testing.T) { }, { name: "Peer returned different hash with same compaction rev", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, expectError: true, }, { name: "Peer returned different hash and compaction rev", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, } @@ -142,12 +142,12 @@ func TestPeriodicCheck(t *testing.T) { }, { name: "Different local hash and compaction revision", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, compactRev: 2}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, }, { name: "Different local hash and same revisions", - hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 1, compactRev: 1}}}, + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1}, revision: 1}}}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"}, expectCorrupt: true, }, @@ -177,7 +177,7 @@ func TestPeriodicCheck(t *testing.T) { { name: "Peer with same hash and compact revision", hasher: fakeHasher{ - hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}}, + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}}, }, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, @@ -185,7 +185,7 @@ func TestPeriodicCheck(t *testing.T) { { name: "Peer with different hash and same compact revision as first local", hasher: fakeHasher{ - hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}}, + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}}, }, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"}, @@ -232,24 +232,23 @@ type fakeHasher struct { } type hashByRev struct { - hash mvcc.KeyValueHash - revision int64 - compactRev int64 - err error + hash mvcc.KeyValueHash + revision int64 + err error } func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) { panic("not implemented") } -func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, compactRev int64, err error) { +func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, err error) { f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev)) if len(f.hashByRevResponses) == 0 { - return mvcc.KeyValueHash{}, 0, 0, nil + return mvcc.KeyValueHash{}, 0, nil } hashByRev := f.hashByRevResponses[f.hashByRevIndex] f.hashByRevIndex++ - return hashByRev.hash, hashByRev.revision, hashByRev.compactRev, hashByRev.err + return hashByRev.hash, hashByRev.revision, hashByRev.err } func (f *fakeHasher) ReqTimeout() time.Duration { diff --git a/server/mvcc/hash_test.go b/server/mvcc/hash_test.go index f7855f128..5d73ffcd7 100644 --- a/server/mvcc/hash_test.go +++ b/server/mvcc/hash_test.go @@ -120,7 +120,7 @@ func putKVs(s *store, rev, count int64) { } func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { - hash, currentRev, _, err := s.HashByRev(rev) + hash, currentRev, err := s.HashByRev(rev) assert.NoError(t, err, "error on rev %v", rev) if rev == 0 { @@ -150,7 +150,7 @@ func testCompactionHash(t *testing.T, s *store, start, stop int64) { for i := start; i <= stop; i++ { s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0) } - hash1, _, _, err := s.HashByRev(stop) + hash1, _, err := s.HashByRev(stop) assert.NoError(t, err, "error on rev %v", stop) _, prevCompactRev, err := s.updateCompactRev(stop) diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index c71d58c42..5a3fbc323 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -136,7 +136,7 @@ type Hasher interface { Hash() (hash uint32, revision int64, err error) // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash KeyValueHash, revision int64, compactRev int64, err error) + HashByRev(rev int64) (hash KeyValueHash, revision int64, err error) } // WatchableKV is a KV that can be watched. diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 9042359c3..622374933 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -164,7 +164,8 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } -func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compactRev int64, err error) { +func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) { + var compactRev int64 start := time.Now() s.mu.RLock() @@ -174,10 +175,10 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compa if rev > 0 && rev <= compactRev { s.mu.RUnlock() - return KeyValueHash{}, 0, compactRev, ErrCompacted + return KeyValueHash{}, 0, ErrCompacted } else if rev > 0 && rev > currentRev { s.mu.RUnlock() - return KeyValueHash{}, currentRev, 0, ErrFutureRev + return KeyValueHash{}, currentRev, ErrFutureRev } if rev == 0 { @@ -191,7 +192,7 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compa s.mu.RUnlock() hash, err = unsafeHashByRev(tx, compactRev, rev, keep) hashRevSec.Observe(time.Since(start).Seconds()) - return hash, currentRev, compactRev, err + return hash, currentRev, err } func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index eb9582be4..cc4b186da 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -552,14 +552,14 @@ func TestHashKVWhenCompacting(t *testing.T) { go func() { defer wg.Done() for { - hash, _, compactRev, err := s.HashByRev(int64(rev)) + hash, _, err := s.HashByRev(int64(rev)) if err != nil { t.Error(err) } select { case <-donec: return - case hashCompactc <- hashKVResult{hash.Hash, compactRev}: + case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}: } } }() @@ -614,12 +614,12 @@ func TestHashKVZeroRevision(t *testing.T) { t.Fatal(err) } - hash1, _, _, err := s.HashByRev(int64(rev)) + hash1, _, err := s.HashByRev(int64(rev)) if err != nil { t.Fatal(err) } var hash2 KeyValueHash - hash2, _, _, err = s.HashByRev(0) + hash2, _, err = s.HashByRev(0) if err != nil { t.Fatal(err) }