mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Remove duplicated compaction revision
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
a3f609d742
commit
631107285a
@ -190,12 +190,12 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
|
func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
|
||||||
h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
|
h, rev, err := ms.kg.KV().HashByRev(r.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, togRPCError(err)
|
return nil, togRPCError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: compactRev}
|
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision}
|
||||||
ms.hdr.fill(resp.Header)
|
ms.hdr.fill(resp.Header)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
||||||
)
|
)
|
||||||
|
|
||||||
h, rev, crev, err := cm.hasher.HashByRev(0)
|
h, rev, err := cm.hasher.HashByRev(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err)
|
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err)
|
||||||
}
|
}
|
||||||
@ -99,7 +99,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||||
zap.Int64("local-member-revision", rev),
|
zap.Int64("local-member-revision", rev),
|
||||||
zap.Int64("local-member-compact-revision", crev),
|
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||||
zap.Uint32("local-member-hash", h.Hash),
|
zap.Uint32("local-member-hash", h.Hash),
|
||||||
zap.String("remote-peer-id", peerID.String()),
|
zap.String("remote-peer-id", peerID.String()),
|
||||||
zap.Strings("remote-peer-endpoints", p.eps),
|
zap.Strings("remote-peer-endpoints", p.eps),
|
||||||
@ -109,7 +109,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if h.Hash != p.resp.Hash {
|
if h.Hash != p.resp.Hash {
|
||||||
if crev == p.resp.CompactRevision {
|
if h.CompactRevision == p.resp.CompactRevision {
|
||||||
cm.lg.Warn("found different hash values from remote peer", fields...)
|
cm.lg.Warn("found different hash values from remote peer", fields...)
|
||||||
mismatch++
|
mismatch++
|
||||||
} else {
|
} else {
|
||||||
@ -127,7 +127,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
"cannot fetch hash from slow remote peer",
|
"cannot fetch hash from slow remote peer",
|
||||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||||
zap.Int64("local-member-revision", rev),
|
zap.Int64("local-member-revision", rev),
|
||||||
zap.Int64("local-member-compact-revision", crev),
|
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||||
zap.Uint32("local-member-hash", h.Hash),
|
zap.Uint32("local-member-hash", h.Hash),
|
||||||
zap.String("remote-peer-id", p.id.String()),
|
zap.String("remote-peer-id", p.id.String()),
|
||||||
zap.Strings("remote-peer-endpoints", p.eps),
|
zap.Strings("remote-peer-endpoints", p.eps),
|
||||||
@ -138,7 +138,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
"cannot fetch hash from remote peer; local member is behind",
|
"cannot fetch hash from remote peer; local member is behind",
|
||||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||||
zap.Int64("local-member-revision", rev),
|
zap.Int64("local-member-revision", rev),
|
||||||
zap.Int64("local-member-compact-revision", crev),
|
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||||
zap.Uint32("local-member-hash", h.Hash),
|
zap.Uint32("local-member-hash", h.Hash),
|
||||||
zap.String("remote-peer-id", p.id.String()),
|
zap.String("remote-peer-id", p.id.String()),
|
||||||
zap.Strings("remote-peer-endpoints", p.eps),
|
zap.Strings("remote-peer-endpoints", p.eps),
|
||||||
@ -159,7 +159,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cm *corruptionMonitor) periodicCheck() error {
|
func (cm *corruptionMonitor) periodicCheck() error {
|
||||||
h, rev, crev, err := cm.hasher.HashByRev(0)
|
h, rev, err := cm.hasher.HashByRev(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -172,7 +172,7 @@ func (cm *corruptionMonitor) periodicCheck() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
h2, rev2, crev2, err := cm.hasher.HashByRev(0)
|
h2, rev2, err := cm.hasher.HashByRev(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -186,14 +186,14 @@ func (cm *corruptionMonitor) periodicCheck() error {
|
|||||||
cm.hasher.TriggerCorruptAlarm(id)
|
cm.hasher.TriggerCorruptAlarm(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if h2.Hash != h.Hash && rev2 == rev && crev == crev2 {
|
if h2.Hash != h.Hash && rev2 == rev && h.CompactRevision == h2.CompactRevision {
|
||||||
cm.lg.Warn(
|
cm.lg.Warn(
|
||||||
"found hash mismatch",
|
"found hash mismatch",
|
||||||
zap.Int64("revision-1", rev),
|
zap.Int64("revision-1", rev),
|
||||||
zap.Int64("compact-revision-1", crev),
|
zap.Int64("compact-revision-1", h.CompactRevision),
|
||||||
zap.Uint32("hash-1", h.Hash),
|
zap.Uint32("hash-1", h.Hash),
|
||||||
zap.Int64("revision-2", rev2),
|
zap.Int64("revision-2", rev2),
|
||||||
zap.Int64("compact-revision-2", crev2),
|
zap.Int64("compact-revision-2", h2.CompactRevision),
|
||||||
zap.Uint32("hash-2", h2.Hash),
|
zap.Uint32("hash-2", h2.Hash),
|
||||||
)
|
)
|
||||||
mismatch(uint64(cm.hasher.MemberId()))
|
mismatch(uint64(cm.hasher.MemberId()))
|
||||||
@ -219,10 +219,10 @@ func (cm *corruptionMonitor) periodicCheck() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// leader expects follower's latest compact revision less than or equal to leader's
|
// leader expects follower's latest compact revision less than or equal to leader's
|
||||||
if p.resp.CompactRevision > crev2 {
|
if p.resp.CompactRevision > h2.CompactRevision {
|
||||||
cm.lg.Warn(
|
cm.lg.Warn(
|
||||||
"compact revision from follower must be less than or equal to leader's",
|
"compact revision from follower must be less than or equal to leader's",
|
||||||
zap.Int64("leader-compact-revision", crev2),
|
zap.Int64("leader-compact-revision", h2.CompactRevision),
|
||||||
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
||||||
zap.String("follower-peer-id", types.ID(id).String()),
|
zap.String("follower-peer-id", types.ID(id).String()),
|
||||||
)
|
)
|
||||||
@ -230,10 +230,10 @@ func (cm *corruptionMonitor) periodicCheck() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// follower's compact revision is leader's old one, then hashes must match
|
// follower's compact revision is leader's old one, then hashes must match
|
||||||
if p.resp.CompactRevision == crev && p.resp.Hash != h.Hash {
|
if p.resp.CompactRevision == h.CompactRevision && p.resp.Hash != h.Hash {
|
||||||
cm.lg.Warn(
|
cm.lg.Warn(
|
||||||
"same compact revision then hashes must match",
|
"same compact revision then hashes must match",
|
||||||
zap.Int64("leader-compact-revision", crev2),
|
zap.Int64("leader-compact-revision", h2.CompactRevision),
|
||||||
zap.Uint32("leader-hash", h.Hash),
|
zap.Uint32("leader-hash", h.Hash),
|
||||||
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
||||||
zap.Uint32("follower-hash", p.resp.Hash),
|
zap.Uint32("follower-hash", p.resp.Hash),
|
||||||
@ -384,7 +384,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
|
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision)
|
hash, rev, err := h.server.KV().HashByRev(req.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.lg.Warn(
|
h.lg.Warn(
|
||||||
"failed to get hashKV",
|
"failed to get hashKV",
|
||||||
@ -394,7 +394,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: compactRev}
|
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: hash.CompactRevision}
|
||||||
respBytes, err := json.Marshal(resp)
|
respBytes, err := json.Marshal(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.lg.Warn("failed to marshal hashKV response", zap.Error(err))
|
h.lg.Warn("failed to marshal hashKV response", zap.Error(err))
|
||||||
|
@ -76,13 +76,13 @@ func TestInitialCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Peer returned different hash with same compaction rev",
|
name: "Peer returned different hash with same compaction rev",
|
||||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
|
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
|
||||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||||
expectError: true,
|
expectError: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Peer returned different hash and compaction rev",
|
name: "Peer returned different hash and compaction rev",
|
||||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
|
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
|
||||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -142,12 +142,12 @@ func TestPeriodicCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Different local hash and compaction revision",
|
name: "Different local hash and compaction revision",
|
||||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, compactRev: 2}}},
|
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}}}},
|
||||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Different local hash and same revisions",
|
name: "Different local hash and same revisions",
|
||||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 1, compactRev: 1}}},
|
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1}, revision: 1}}},
|
||||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"},
|
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"},
|
||||||
expectCorrupt: true,
|
expectCorrupt: true,
|
||||||
},
|
},
|
||||||
@ -177,7 +177,7 @@ func TestPeriodicCheck(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "Peer with same hash and compact revision",
|
name: "Peer with same hash and compact revision",
|
||||||
hasher: fakeHasher{
|
hasher: fakeHasher{
|
||||||
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}},
|
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
|
||||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}},
|
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}},
|
||||||
},
|
},
|
||||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||||
@ -185,7 +185,7 @@ func TestPeriodicCheck(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "Peer with different hash and same compact revision as first local",
|
name: "Peer with different hash and same compact revision as first local",
|
||||||
hasher: fakeHasher{
|
hasher: fakeHasher{
|
||||||
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}},
|
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
|
||||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}},
|
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}},
|
||||||
},
|
},
|
||||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"},
|
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"},
|
||||||
@ -232,24 +232,23 @@ type fakeHasher struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type hashByRev struct {
|
type hashByRev struct {
|
||||||
hash mvcc.KeyValueHash
|
hash mvcc.KeyValueHash
|
||||||
revision int64
|
revision int64
|
||||||
compactRev int64
|
err error
|
||||||
err error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) {
|
func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, compactRev int64, err error) {
|
func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, err error) {
|
||||||
f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev))
|
f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev))
|
||||||
if len(f.hashByRevResponses) == 0 {
|
if len(f.hashByRevResponses) == 0 {
|
||||||
return mvcc.KeyValueHash{}, 0, 0, nil
|
return mvcc.KeyValueHash{}, 0, nil
|
||||||
}
|
}
|
||||||
hashByRev := f.hashByRevResponses[f.hashByRevIndex]
|
hashByRev := f.hashByRevResponses[f.hashByRevIndex]
|
||||||
f.hashByRevIndex++
|
f.hashByRevIndex++
|
||||||
return hashByRev.hash, hashByRev.revision, hashByRev.compactRev, hashByRev.err
|
return hashByRev.hash, hashByRev.revision, hashByRev.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeHasher) ReqTimeout() time.Duration {
|
func (f *fakeHasher) ReqTimeout() time.Duration {
|
||||||
|
@ -120,7 +120,7 @@ func putKVs(s *store, rev, count int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
|
func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
|
||||||
hash, currentRev, _, err := s.HashByRev(rev)
|
hash, currentRev, err := s.HashByRev(rev)
|
||||||
assert.NoError(t, err, "error on rev %v", rev)
|
assert.NoError(t, err, "error on rev %v", rev)
|
||||||
|
|
||||||
if rev == 0 {
|
if rev == 0 {
|
||||||
@ -150,7 +150,7 @@ func testCompactionHash(t *testing.T, s *store, start, stop int64) {
|
|||||||
for i := start; i <= stop; i++ {
|
for i := start; i <= stop; i++ {
|
||||||
s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0)
|
s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0)
|
||||||
}
|
}
|
||||||
hash1, _, _, err := s.HashByRev(stop)
|
hash1, _, err := s.HashByRev(stop)
|
||||||
assert.NoError(t, err, "error on rev %v", stop)
|
assert.NoError(t, err, "error on rev %v", stop)
|
||||||
|
|
||||||
_, prevCompactRev, err := s.updateCompactRev(stop)
|
_, prevCompactRev, err := s.updateCompactRev(stop)
|
||||||
|
@ -136,7 +136,7 @@ type Hasher interface {
|
|||||||
Hash() (hash uint32, revision int64, err error)
|
Hash() (hash uint32, revision int64, err error)
|
||||||
|
|
||||||
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
||||||
HashByRev(rev int64) (hash KeyValueHash, revision int64, compactRev int64, err error)
|
HashByRev(rev int64) (hash KeyValueHash, revision int64, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchableKV is a KV that can be watched.
|
// WatchableKV is a KV that can be watched.
|
||||||
|
@ -164,7 +164,8 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
|
|||||||
return h, s.currentRev, err
|
return h, s.currentRev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compactRev int64, err error) {
|
func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) {
|
||||||
|
var compactRev int64
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
@ -174,10 +175,10 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compa
|
|||||||
|
|
||||||
if rev > 0 && rev <= compactRev {
|
if rev > 0 && rev <= compactRev {
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
return KeyValueHash{}, 0, compactRev, ErrCompacted
|
return KeyValueHash{}, 0, ErrCompacted
|
||||||
} else if rev > 0 && rev > currentRev {
|
} else if rev > 0 && rev > currentRev {
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
return KeyValueHash{}, currentRev, 0, ErrFutureRev
|
return KeyValueHash{}, currentRev, ErrFutureRev
|
||||||
}
|
}
|
||||||
|
|
||||||
if rev == 0 {
|
if rev == 0 {
|
||||||
@ -191,7 +192,7 @@ func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compa
|
|||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
hash, err = unsafeHashByRev(tx, compactRev, rev, keep)
|
hash, err = unsafeHashByRev(tx, compactRev, rev, keep)
|
||||||
hashRevSec.Observe(time.Since(start).Seconds())
|
hashRevSec.Observe(time.Since(start).Seconds())
|
||||||
return hash, currentRev, compactRev, err
|
return hash, currentRev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||||
|
@ -552,14 +552,14 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
hash, _, compactRev, err := s.HashByRev(int64(rev))
|
hash, _, err := s.HashByRev(int64(rev))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-donec:
|
case <-donec:
|
||||||
return
|
return
|
||||||
case hashCompactc <- hashKVResult{hash.Hash, compactRev}:
|
case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -614,12 +614,12 @@ func TestHashKVZeroRevision(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hash1, _, _, err := s.HashByRev(int64(rev))
|
hash1, _, err := s.HashByRev(int64(rev))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
var hash2 KeyValueHash
|
var hash2 KeyValueHash
|
||||||
hash2, _, _, err = s.HashByRev(0)
|
hash2, _, err = s.HashByRev(0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user