mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14049 from serathius/compact-hash
Calculate hash during compaction
This commit is contained in:
commit
2c12954158
@ -250,8 +250,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
|
||||
// newly started member ("memberInitialized==false")
|
||||
// does not need corruption check
|
||||
if memberInitialized {
|
||||
if err = e.Server.CheckInitialHashKV(); err != nil {
|
||||
if memberInitialized && srvcfg.InitialCorruptCheck {
|
||||
if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil {
|
||||
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
|
||||
// (nothing to close since rafthttp transports have not been started)
|
||||
|
||||
|
@ -70,20 +70,20 @@ type ClusterStatusGetter interface {
|
||||
}
|
||||
|
||||
type maintenanceServer struct {
|
||||
lg *zap.Logger
|
||||
rg apply.RaftStatusGetter
|
||||
kg KVGetter
|
||||
bg BackendGetter
|
||||
a Alarmer
|
||||
lt LeaderTransferrer
|
||||
hdr header
|
||||
cs ClusterStatusGetter
|
||||
d Downgrader
|
||||
vs serverversion.Server
|
||||
lg *zap.Logger
|
||||
rg apply.RaftStatusGetter
|
||||
hasher mvcc.HashStorage
|
||||
bg BackendGetter
|
||||
a Alarmer
|
||||
lt LeaderTransferrer
|
||||
hdr header
|
||||
cs ClusterStatusGetter
|
||||
d Downgrader
|
||||
vs serverversion.Server
|
||||
}
|
||||
|
||||
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
|
||||
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
|
||||
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
|
||||
if srv.lg == nil {
|
||||
srv.lg = zap.NewNop()
|
||||
}
|
||||
@ -193,7 +193,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
|
||||
}
|
||||
|
||||
func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
|
||||
h, rev, err := ms.kg.KV().Hash()
|
||||
h, rev, err := ms.hasher.Hash()
|
||||
if err != nil {
|
||||
return nil, togRPCError(err)
|
||||
}
|
||||
@ -203,12 +203,12 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H
|
||||
}
|
||||
|
||||
func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
|
||||
h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
|
||||
h, rev, err := ms.hasher.HashByRev(r.Revision)
|
||||
if err != nil {
|
||||
return nil, togRPCError(err)
|
||||
}
|
||||
|
||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
|
||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision}
|
||||
ms.hdr.fill(resp.Header)
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -32,36 +32,70 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CheckInitialHashKV compares initial hash values with its peers
|
||||
type corruptionMonitor struct {
|
||||
lg *zap.Logger
|
||||
|
||||
hasher Hasher
|
||||
}
|
||||
|
||||
type Hasher interface {
|
||||
mvcc.HashStorage
|
||||
ReqTimeout() time.Duration
|
||||
MemberId() types.ID
|
||||
PeerHashByRev(int64) []*peerHashKVResp
|
||||
LinearizableReadNotify(context.Context) error
|
||||
TriggerCorruptAlarm(uint64)
|
||||
}
|
||||
|
||||
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
|
||||
return &corruptionMonitor{
|
||||
lg: lg,
|
||||
hasher: hasherAdapter{s, s.KV().HashStorage()},
|
||||
}
|
||||
}
|
||||
|
||||
type hasherAdapter struct {
|
||||
*EtcdServer
|
||||
mvcc.HashStorage
|
||||
}
|
||||
|
||||
func (h hasherAdapter) ReqTimeout() time.Duration {
|
||||
return h.EtcdServer.Cfg.ReqTimeout()
|
||||
}
|
||||
|
||||
func (h hasherAdapter) PeerHashByRev(rev int64) []*peerHashKVResp {
|
||||
return h.EtcdServer.getPeerHashKVs(rev)
|
||||
}
|
||||
|
||||
func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
|
||||
h.EtcdServer.triggerCorruptAlarm(memberID)
|
||||
}
|
||||
|
||||
// InitialCheck compares initial hash values with its peers
|
||||
// before serving any peer/client traffic. Only mismatch when hashes
|
||||
// are different at requested revision, with same compact revision.
|
||||
func (s *EtcdServer) CheckInitialHashKV() error {
|
||||
if !s.Cfg.InitialCorruptCheck {
|
||||
return nil
|
||||
}
|
||||
func (cm *corruptionMonitor) InitialCheck() error {
|
||||
|
||||
lg := s.Logger()
|
||||
|
||||
lg.Info(
|
||||
cm.lg.Info(
|
||||
"starting initial corruption check",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.Duration("timeout", s.Cfg.ReqTimeout()),
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
||||
)
|
||||
|
||||
h, rev, crev, err := s.kv.HashByRev(0)
|
||||
h, rev, err := cm.hasher.HashByRev(0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s failed to fetch hash (%v)", s.MemberId(), err)
|
||||
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err)
|
||||
}
|
||||
peers := s.getPeerHashKVs(rev)
|
||||
peers := cm.hasher.PeerHashByRev(rev)
|
||||
mismatch := 0
|
||||
for _, p := range peers {
|
||||
if p.resp != nil {
|
||||
peerID := types.ID(p.resp.Header.MemberId)
|
||||
fields := []zap.Field{
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.Int64("local-member-revision", rev),
|
||||
zap.Int64("local-member-compact-revision", crev),
|
||||
zap.Uint32("local-member-hash", h),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
zap.String("remote-peer-id", peerID.String()),
|
||||
zap.Strings("remote-peer-endpoints", p.eps),
|
||||
zap.Int64("remote-peer-revision", p.resp.Header.Revision),
|
||||
@ -69,12 +103,12 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
||||
zap.Uint32("remote-peer-hash", p.resp.Hash),
|
||||
}
|
||||
|
||||
if h != p.resp.Hash {
|
||||
if crev == p.resp.CompactRevision {
|
||||
lg.Warn("found different hash values from remote peer", fields...)
|
||||
if h.Hash != p.resp.Hash {
|
||||
if h.CompactRevision == p.resp.CompactRevision {
|
||||
cm.lg.Warn("found different hash values from remote peer", fields...)
|
||||
mismatch++
|
||||
} else {
|
||||
lg.Warn("found different compact revision values from remote peer", fields...)
|
||||
cm.lg.Warn("found different compact revision values from remote peer", fields...)
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,23 +118,23 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
||||
if p.err != nil {
|
||||
switch p.err {
|
||||
case rpctypes.ErrFutureRev:
|
||||
lg.Warn(
|
||||
cm.lg.Warn(
|
||||
"cannot fetch hash from slow remote peer",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.Int64("local-member-revision", rev),
|
||||
zap.Int64("local-member-compact-revision", crev),
|
||||
zap.Uint32("local-member-hash", h),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
zap.String("remote-peer-id", p.id.String()),
|
||||
zap.Strings("remote-peer-endpoints", p.eps),
|
||||
zap.Error(err),
|
||||
)
|
||||
case rpctypes.ErrCompacted:
|
||||
lg.Warn(
|
||||
cm.lg.Warn(
|
||||
"cannot fetch hash from remote peer; local member is behind",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.Int64("local-member-revision", rev),
|
||||
zap.Int64("local-member-compact-revision", crev),
|
||||
zap.Uint32("local-member-hash", h),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
zap.String("remote-peer-id", p.id.String()),
|
||||
zap.Strings("remote-peer-endpoints", p.eps),
|
||||
zap.Error(err),
|
||||
@ -109,61 +143,31 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
||||
}
|
||||
}
|
||||
if mismatch > 0 {
|
||||
return fmt.Errorf("%s found data inconsistency with peers", s.MemberId())
|
||||
return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId())
|
||||
}
|
||||
|
||||
lg.Info(
|
||||
cm.lg.Info(
|
||||
"initial corruption checking passed; no corruption",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) monitorKVHash() {
|
||||
t := s.Cfg.CorruptCheckTime
|
||||
if t == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
lg := s.Logger()
|
||||
lg.Info(
|
||||
"enabled corruption checking",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.Duration("interval", t),
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopping:
|
||||
return
|
||||
case <-time.After(t):
|
||||
}
|
||||
if !s.isLeader() {
|
||||
continue
|
||||
}
|
||||
if err := s.checkHashKV(); err != nil {
|
||||
lg.Warn("failed to check hash KV", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) checkHashKV() error {
|
||||
lg := s.Logger()
|
||||
|
||||
h, rev, crev, err := s.kv.HashByRev(0)
|
||||
func (cm *corruptionMonitor) periodicCheck() error {
|
||||
h, rev, err := cm.hasher.HashByRev(0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
peers := s.getPeerHashKVs(rev)
|
||||
peers := cm.hasher.PeerHashByRev(rev)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
err = s.linearizableReadNotify(ctx)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cm.hasher.ReqTimeout())
|
||||
err = cm.hasher.LinearizableReadNotify(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h2, rev2, crev2, err := s.kv.HashByRev(0)
|
||||
h2, rev2, err := cm.hasher.HashByRev(0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -174,27 +178,20 @@ func (s *EtcdServer) checkHashKV() error {
|
||||
return
|
||||
}
|
||||
alarmed = true
|
||||
a := &pb.AlarmRequest{
|
||||
MemberID: id,
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
Alarm: pb.AlarmType_CORRUPT,
|
||||
}
|
||||
s.GoAttach(func() {
|
||||
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
|
||||
})
|
||||
cm.hasher.TriggerCorruptAlarm(id)
|
||||
}
|
||||
|
||||
if h2 != h && rev2 == rev && crev == crev2 {
|
||||
lg.Warn(
|
||||
if h2.Hash != h.Hash && rev2 == rev && h.CompactRevision == h2.CompactRevision {
|
||||
cm.lg.Warn(
|
||||
"found hash mismatch",
|
||||
zap.Int64("revision-1", rev),
|
||||
zap.Int64("compact-revision-1", crev),
|
||||
zap.Uint32("hash-1", h),
|
||||
zap.Int64("compact-revision-1", h.CompactRevision),
|
||||
zap.Uint32("hash-1", h.Hash),
|
||||
zap.Int64("revision-2", rev2),
|
||||
zap.Int64("compact-revision-2", crev2),
|
||||
zap.Uint32("hash-2", h2),
|
||||
zap.Int64("compact-revision-2", h2.CompactRevision),
|
||||
zap.Uint32("hash-2", h2.Hash),
|
||||
)
|
||||
mismatch(uint64(s.MemberId()))
|
||||
mismatch(uint64(cm.hasher.MemberId()))
|
||||
}
|
||||
|
||||
checkedCount := 0
|
||||
@ -207,7 +204,7 @@ func (s *EtcdServer) checkHashKV() error {
|
||||
|
||||
// leader expects follower's latest revision less than or equal to leader's
|
||||
if p.resp.Header.Revision > rev2 {
|
||||
lg.Warn(
|
||||
cm.lg.Warn(
|
||||
"revision from follower must be less than or equal to leader's",
|
||||
zap.Int64("leader-revision", rev2),
|
||||
zap.Int64("follower-revision", p.resp.Header.Revision),
|
||||
@ -217,10 +214,10 @@ func (s *EtcdServer) checkHashKV() error {
|
||||
}
|
||||
|
||||
// leader expects follower's latest compact revision less than or equal to leader's
|
||||
if p.resp.CompactRevision > crev2 {
|
||||
lg.Warn(
|
||||
if p.resp.CompactRevision > h2.CompactRevision {
|
||||
cm.lg.Warn(
|
||||
"compact revision from follower must be less than or equal to leader's",
|
||||
zap.Int64("leader-compact-revision", crev2),
|
||||
zap.Int64("leader-compact-revision", h2.CompactRevision),
|
||||
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
||||
zap.String("follower-peer-id", types.ID(id).String()),
|
||||
)
|
||||
@ -228,11 +225,11 @@ func (s *EtcdServer) checkHashKV() error {
|
||||
}
|
||||
|
||||
// follower's compact revision is leader's old one, then hashes must match
|
||||
if p.resp.CompactRevision == crev && p.resp.Hash != h {
|
||||
lg.Warn(
|
||||
if p.resp.CompactRevision == h.CompactRevision && p.resp.Hash != h.Hash {
|
||||
cm.lg.Warn(
|
||||
"same compact revision then hashes must match",
|
||||
zap.Int64("leader-compact-revision", crev2),
|
||||
zap.Uint32("leader-hash", h),
|
||||
zap.Int64("leader-compact-revision", h2.CompactRevision),
|
||||
zap.Uint32("leader-hash", h.Hash),
|
||||
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
||||
zap.Uint32("follower-hash", p.resp.Hash),
|
||||
zap.String("follower-peer-id", types.ID(id).String()),
|
||||
@ -240,10 +237,21 @@ func (s *EtcdServer) checkHashKV() error {
|
||||
mismatch(id)
|
||||
}
|
||||
}
|
||||
lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount))
|
||||
cm.lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) triggerCorruptAlarm(id uint64) {
|
||||
a := &pb.AlarmRequest{
|
||||
MemberID: id,
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
Alarm: pb.AlarmType_CORRUPT,
|
||||
}
|
||||
s.GoAttach(func() {
|
||||
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
|
||||
})
|
||||
}
|
||||
|
||||
type peerInfo struct {
|
||||
id types.ID
|
||||
eps []string
|
||||
@ -269,6 +277,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
|
||||
|
||||
lg := s.Logger()
|
||||
|
||||
cc := &http.Client{Transport: s.peerRt}
|
||||
var resps []*peerHashKVResp
|
||||
for _, p := range peers {
|
||||
if len(p.eps) == 0 {
|
||||
@ -279,7 +288,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
|
||||
var lastErr error
|
||||
for _, ep := range p.eps {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
resp, lastErr := s.getPeerHashKVHTTP(ctx, ep, rev)
|
||||
resp, lastErr := HashByRev(ctx, cc, ep, rev)
|
||||
cancel()
|
||||
if lastErr == nil {
|
||||
resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil})
|
||||
@ -337,7 +346,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision)
|
||||
hash, rev, err := h.server.KV().HashStorage().HashByRev(req.Revision)
|
||||
if err != nil {
|
||||
h.lg.Warn(
|
||||
"failed to get hashKV",
|
||||
@ -347,7 +356,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev}
|
||||
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: hash.CompactRevision}
|
||||
respBytes, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
h.lg.Warn("failed to marshal hashKV response", zap.Error(err))
|
||||
@ -360,9 +369,8 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write(respBytes)
|
||||
}
|
||||
|
||||
// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url
|
||||
func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) {
|
||||
cc := &http.Client{Transport: s.peerRt}
|
||||
// HashByRev fetch hash of kv store at the given rev via http call to the given url
|
||||
func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) {
|
||||
hashReq := &pb.HashKVRequest{Revision: rev}
|
||||
hashReqBytes, err := json.Marshal(hashReq)
|
||||
if err != nil {
|
||||
|
281
server/etcdserver/corrupt_test.go
Normal file
281
server/etcdserver/corrupt_test.go
Normal file
@ -0,0 +1,281 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestInitialCheck(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
hasher fakeHasher
|
||||
expectError bool
|
||||
expectCorrupt bool
|
||||
expectActions []string
|
||||
}{
|
||||
{
|
||||
name: "No peers",
|
||||
hasher: fakeHasher{
|
||||
hashByRevResponses: []hashByRev{{revision: 10}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(10)", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Error getting hash",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "MemberId()"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Peer with empty response",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned ErrFutureRev",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrFutureRev}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned ErrCompacted",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCompacted}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned other error",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCorrupt}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned same hash",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned different hash with same compaction rev",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Peer returned different hash and compaction rev",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
monitor := corruptionMonitor{
|
||||
lg: zaptest.NewLogger(t),
|
||||
hasher: &tc.hasher,
|
||||
}
|
||||
err := monitor.InitialCheck()
|
||||
if gotError := err != nil; gotError != tc.expectError {
|
||||
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
|
||||
}
|
||||
if tc.hasher.alarmTriggered != tc.expectCorrupt {
|
||||
t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
|
||||
}
|
||||
assert.Equal(t, tc.expectActions, tc.hasher.actions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicCheck(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
hasher fakeHasher
|
||||
expectError bool
|
||||
expectCorrupt bool
|
||||
expectActions []string
|
||||
}{
|
||||
{
|
||||
name: "Same local hash and no peers",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{revision: 10}, {revision: 10}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(10)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
},
|
||||
{
|
||||
name: "Error getting hash first time",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}},
|
||||
expectActions: []string{"HashByRev(0)"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Error getting hash second time",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{revision: 11}, {err: fmt.Errorf("error getting hash")}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(11)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Error linearizableReadNotify",
|
||||
hasher: fakeHasher{linearizableReadNotify: fmt.Errorf("error getting linearizableReadNotify")},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Different local hash and revision",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
},
|
||||
{
|
||||
name: "Different local hash and compaction revision",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
},
|
||||
{
|
||||
name: "Different local hash and same revisions",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1}, revision: 1}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
name: "Peer with nil response",
|
||||
hasher: fakeHasher{
|
||||
peerHashes: []*peerHashKVResp{{}},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
},
|
||||
{
|
||||
name: "Peer with newer revision",
|
||||
hasher: fakeHasher{
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 42}}}},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(42)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
name: "Peer with newer compact revision",
|
||||
hasher: fakeHasher{
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10, MemberId: 88}, CompactRevision: 2}}},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
name: "Peer with same hash and compact revision",
|
||||
hasher: fakeHasher{
|
||||
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
|
||||
},
|
||||
{
|
||||
name: "Peer with different hash and same compact revision as first local",
|
||||
hasher: fakeHasher{
|
||||
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
name: "Multiple corrupted peers trigger one alarm",
|
||||
hasher: fakeHasher{
|
||||
peerHashes: []*peerHashKVResp{
|
||||
{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10, MemberId: 88}, CompactRevision: 2}},
|
||||
{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10, MemberId: 89}, CompactRevision: 2}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
monitor := corruptionMonitor{
|
||||
lg: zaptest.NewLogger(t),
|
||||
hasher: &tc.hasher,
|
||||
}
|
||||
err := monitor.periodicCheck()
|
||||
if gotError := err != nil; gotError != tc.expectError {
|
||||
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
|
||||
}
|
||||
if tc.hasher.alarmTriggered != tc.expectCorrupt {
|
||||
t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
|
||||
}
|
||||
assert.Equal(t, tc.expectActions, tc.hasher.actions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeHasher struct {
|
||||
peerHashes []*peerHashKVResp
|
||||
hashByRevIndex int
|
||||
hashByRevResponses []hashByRev
|
||||
linearizableReadNotify error
|
||||
|
||||
alarmTriggered bool
|
||||
actions []string
|
||||
}
|
||||
|
||||
type hashByRev struct {
|
||||
hash mvcc.KeyValueHash
|
||||
revision int64
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, err error) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev))
|
||||
if len(f.hashByRevResponses) == 0 {
|
||||
return mvcc.KeyValueHash{}, 0, nil
|
||||
}
|
||||
hashByRev := f.hashByRevResponses[f.hashByRevIndex]
|
||||
f.hashByRevIndex++
|
||||
return hashByRev.hash, hashByRev.revision, hashByRev.err
|
||||
}
|
||||
|
||||
func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (f *fakeHasher) ReqTimeout() time.Duration {
|
||||
f.actions = append(f.actions, "ReqTimeout()")
|
||||
return time.Second
|
||||
}
|
||||
|
||||
func (f *fakeHasher) MemberId() types.ID {
|
||||
f.actions = append(f.actions, "MemberId()")
|
||||
return 1
|
||||
}
|
||||
|
||||
func (f *fakeHasher) PeerHashByRev(rev int64) []*peerHashKVResp {
|
||||
f.actions = append(f.actions, fmt.Sprintf("PeerHashByRev(%d)", rev))
|
||||
return f.peerHashes
|
||||
}
|
||||
|
||||
func (f *fakeHasher) LinearizableReadNotify(ctx context.Context) error {
|
||||
f.actions = append(f.actions, "LinearizableReadNotify()")
|
||||
return f.linearizableReadNotify
|
||||
}
|
||||
|
||||
func (f *fakeHasher) TriggerCorruptAlarm(memberId uint64) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId))
|
||||
f.alarmTriggered = true
|
||||
}
|
@ -2187,6 +2187,34 @@ func (s *EtcdServer) monitorStorageVersion() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) monitorKVHash() {
|
||||
t := s.Cfg.CorruptCheckTime
|
||||
if t == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
lg := s.Logger()
|
||||
lg.Info(
|
||||
"enabled corruption checking",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.Duration("interval", t),
|
||||
)
|
||||
monitor := NewCorruptionMonitor(lg, s)
|
||||
for {
|
||||
select {
|
||||
case <-s.stopping:
|
||||
return
|
||||
case <-time.After(t):
|
||||
}
|
||||
if !s.isLeader() {
|
||||
continue
|
||||
}
|
||||
if err := monitor.periodicCheck(); err != nil {
|
||||
lg.Warn("failed to check hash KV", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) updateClusterVersionV2(ver string) {
|
||||
lg := s.Logger()
|
||||
|
||||
|
148
server/storage/mvcc/hash.go
Normal file
148
server/storage/mvcc/hash.go
Normal file
@ -0,0 +1,148 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
hashStorageMaxSize = 10
|
||||
)
|
||||
|
||||
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
|
||||
h := newKVHasher(compactRevision, revision, keep)
|
||||
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
|
||||
h.WriteKeyValue(k, v)
|
||||
return nil
|
||||
})
|
||||
return h.Hash(), err
|
||||
}
|
||||
|
||||
type kvHasher struct {
|
||||
hash hash.Hash32
|
||||
compactRevision int64
|
||||
revision int64
|
||||
keep map[revision]struct{}
|
||||
}
|
||||
|
||||
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
h.Write(schema.Key.Name())
|
||||
return kvHasher{
|
||||
hash: h,
|
||||
compactRevision: compactRev,
|
||||
revision: rev,
|
||||
keep: keep,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *kvHasher) WriteKeyValue(k, v []byte) {
|
||||
kr := bytesToRev(k)
|
||||
upper := revision{main: h.revision + 1}
|
||||
if !upper.GreaterThan(kr) {
|
||||
return
|
||||
}
|
||||
lower := revision{main: h.compactRevision + 1}
|
||||
// skip revisions that are scheduled for deletion
|
||||
// due to compacting; don't skip if there isn't one.
|
||||
if lower.GreaterThan(kr) && len(h.keep) > 0 {
|
||||
if _, ok := h.keep[kr]; !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
h.hash.Write(k)
|
||||
h.hash.Write(v)
|
||||
}
|
||||
|
||||
func (h *kvHasher) Hash() KeyValueHash {
|
||||
return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision}
|
||||
}
|
||||
|
||||
type KeyValueHash struct {
|
||||
Hash uint32
|
||||
CompactRevision int64
|
||||
Revision int64
|
||||
}
|
||||
|
||||
type HashStorage interface {
|
||||
// Hash computes the hash of the KV's backend.
|
||||
Hash() (hash uint32, revision int64, err error)
|
||||
|
||||
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
||||
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
|
||||
|
||||
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
|
||||
Store(valueHash KeyValueHash)
|
||||
}
|
||||
|
||||
type hashStorage struct {
|
||||
store *store
|
||||
hashMu sync.RWMutex
|
||||
hashes []KeyValueHash
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
|
||||
return &hashStorage{
|
||||
store: s,
|
||||
lg: lg,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
|
||||
return s.store.hash()
|
||||
}
|
||||
|
||||
func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
|
||||
s.hashMu.RLock()
|
||||
for _, h := range s.hashes {
|
||||
if rev == h.Revision {
|
||||
s.hashMu.RUnlock()
|
||||
|
||||
s.store.revMu.RLock()
|
||||
currentRev := s.store.currentRev
|
||||
s.store.revMu.RUnlock()
|
||||
return h, currentRev, nil
|
||||
}
|
||||
}
|
||||
s.hashMu.RUnlock()
|
||||
|
||||
return s.store.hashByRev(rev)
|
||||
}
|
||||
|
||||
func (s *hashStorage) Store(hash KeyValueHash) {
|
||||
s.lg.Info("storing new hash",
|
||||
zap.Uint32("hash", hash.Hash),
|
||||
zap.Int64("revision", hash.Revision),
|
||||
zap.Int64("compact-revision", hash.CompactRevision),
|
||||
)
|
||||
s.hashMu.Lock()
|
||||
defer s.hashMu.Unlock()
|
||||
s.hashes = append(s.hashes, hash)
|
||||
sort.Slice(s.hashes, func(i, j int) bool {
|
||||
return s.hashes[i].Revision < s.hashes[j].Revision
|
||||
})
|
||||
if len(s.hashes) > hashStorageMaxSize {
|
||||
s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
|
||||
}
|
||||
}
|
222
server/storage/mvcc/hash_test.go
Normal file
222
server/storage/mvcc/hash_test.go
Normal file
@ -0,0 +1,222 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
// Test HashByRevValue values to ensure we don't change the output which would
|
||||
// have catastrophic consequences. Expected output is just hardcoded, so please
|
||||
// regenerate it every time you change input parameters.
|
||||
func TestHashByRevValue(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
var totalRevisions int64 = 1210
|
||||
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
|
||||
assert.Less(t, int64(testutil.CompactionCycle*10), totalRevisions)
|
||||
var rev int64
|
||||
var got []KeyValueHash
|
||||
for ; rev < totalRevisions; rev += testutil.CompactionCycle {
|
||||
putKVs(s, rev, testutil.CompactionCycle)
|
||||
hash := testHashByRev(t, s, rev+testutil.CompactionCycle/2)
|
||||
got = append(got, hash)
|
||||
}
|
||||
putKVs(s, rev, totalRevisions)
|
||||
hash := testHashByRev(t, s, rev+totalRevisions/2)
|
||||
got = append(got, hash)
|
||||
assert.Equal(t, []KeyValueHash{
|
||||
{4082599214, -1, 35},
|
||||
{2279933401, 35, 106},
|
||||
{3284231217, 106, 177},
|
||||
{126286495, 177, 248},
|
||||
{900108730, 248, 319},
|
||||
{2475485232, 319, 390},
|
||||
{1226296507, 390, 461},
|
||||
{2503661030, 461, 532},
|
||||
{4155130747, 532, 603},
|
||||
{106915399, 603, 674},
|
||||
{406914006, 674, 745},
|
||||
{1882211381, 745, 816},
|
||||
{806177088, 816, 887},
|
||||
{664311366, 887, 958},
|
||||
{1496914449, 958, 1029},
|
||||
{2434525091, 1029, 1100},
|
||||
{3988652253, 1100, 1171},
|
||||
{1122462288, 1171, 1242},
|
||||
{724436716, 1242, 1883},
|
||||
}, got)
|
||||
}
|
||||
|
||||
func TestHashByRevValueLastRevision(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
var totalRevisions int64 = 1210
|
||||
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
|
||||
assert.Less(t, int64(testutil.CompactionCycle*10), totalRevisions)
|
||||
var rev int64
|
||||
var got []KeyValueHash
|
||||
for ; rev < totalRevisions; rev += testutil.CompactionCycle {
|
||||
putKVs(s, rev, testutil.CompactionCycle)
|
||||
hash := testHashByRev(t, s, 0)
|
||||
got = append(got, hash)
|
||||
}
|
||||
putKVs(s, rev, totalRevisions)
|
||||
hash := testHashByRev(t, s, 0)
|
||||
got = append(got, hash)
|
||||
assert.Equal(t, []KeyValueHash{
|
||||
{1913897190, -1, 73},
|
||||
{224860069, 73, 145},
|
||||
{1565167519, 145, 217},
|
||||
{1566261620, 217, 289},
|
||||
{2037173024, 289, 361},
|
||||
{691659396, 361, 433},
|
||||
{2713730748, 433, 505},
|
||||
{3919322507, 505, 577},
|
||||
{769967540, 577, 649},
|
||||
{2909194793, 649, 721},
|
||||
{1576921157, 721, 793},
|
||||
{4067701532, 793, 865},
|
||||
{2226384237, 865, 937},
|
||||
{2923408134, 937, 1009},
|
||||
{2680329256, 1009, 1081},
|
||||
{1546717673, 1081, 1153},
|
||||
{2713657846, 1153, 1225},
|
||||
{1046575299, 1225, 1297},
|
||||
{2017735779, 1297, 2508},
|
||||
}, got)
|
||||
}
|
||||
|
||||
func putKVs(s *store, rev, count int64) {
|
||||
for i := rev; i <= rev+count; i++ {
|
||||
s.Put([]byte(testutil.PickKey(i)), []byte(fmt.Sprint(i)), 0)
|
||||
}
|
||||
}
|
||||
|
||||
func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
|
||||
if rev == 0 {
|
||||
rev = s.Rev()
|
||||
}
|
||||
hash, _, err := s.hashByRev(rev)
|
||||
assert.NoError(t, err, "error on rev %v", rev)
|
||||
_, err = s.Compact(traceutil.TODO(), rev)
|
||||
assert.NoError(t, err, "error on compact %v", rev)
|
||||
return hash
|
||||
}
|
||||
|
||||
// TODO: Change this to fuzz test
|
||||
func TestCompactionHash(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit)
|
||||
}
|
||||
|
||||
type hashTestCase struct {
|
||||
*store
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Put(ctx context.Context, key, value string) error {
|
||||
tc.store.Put([]byte(key), []byte(value), 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Delete(ctx context.Context, key string) error {
|
||||
tc.store.DeleteRange([]byte(key), nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) {
|
||||
hash, _, err := tc.store.HashStorage().HashByRev(rev)
|
||||
return testutil.KeyValueHash{Hash: hash.Hash, CompactRevision: hash.CompactRevision, Revision: hash.Revision}, err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Defrag(ctx context.Context) error {
|
||||
return tc.store.b.Defrag()
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Compact(ctx context.Context, rev int64) error {
|
||||
done, err := tc.store.Compact(traceutil.TODO(), rev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHasherStore(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
s := newHashStorage(lg, newFakeStore(lg))
|
||||
var hashes []KeyValueHash
|
||||
for i := 0; i < hashStorageMaxSize; i++ {
|
||||
hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100}
|
||||
hashes = append(hashes, hash)
|
||||
s.Store(hash)
|
||||
}
|
||||
|
||||
for _, want := range hashes {
|
||||
got, _, err := s.HashByRev(want.Revision)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if want.Hash != got.Hash {
|
||||
t.Errorf("Expected stored hash to match, got: %d, expected: %d", want.Hash, got.Hash)
|
||||
}
|
||||
if want.Revision != got.Revision {
|
||||
t.Errorf("Expected stored revision to match, got: %d, expected: %d", want.Revision, got.Revision)
|
||||
}
|
||||
if want.CompactRevision != got.CompactRevision {
|
||||
t.Errorf("Expected stored compact revision to match, got: %d, expected: %d", want.CompactRevision, got.CompactRevision)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHasherStoreFull(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
s := newHashStorage(lg, newFakeStore(lg))
|
||||
var minRevision int64 = 100
|
||||
var maxRevision = minRevision + hashStorageMaxSize
|
||||
for i := 0; i < hashStorageMaxSize; i++ {
|
||||
s.Store(KeyValueHash{Revision: int64(i) + minRevision})
|
||||
}
|
||||
|
||||
// Hash for old revision should be discarded as storage is already full
|
||||
s.Store(KeyValueHash{Revision: minRevision - 1})
|
||||
hash, _, err := s.HashByRev(minRevision - 1)
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error as old revision should be discarded, got: %v", hash)
|
||||
}
|
||||
// Hash for new revision should be stored even when storage is full
|
||||
s.Store(KeyValueHash{Revision: maxRevision + 1})
|
||||
_, _, err = s.HashByRev(maxRevision + 1)
|
||||
if err != nil {
|
||||
t.Errorf("Didn't expect error for new revision, err: %v", err)
|
||||
}
|
||||
}
|
@ -119,11 +119,8 @@ type KV interface {
|
||||
// Write creates a write transaction.
|
||||
Write(trace *traceutil.Trace) TxnWrite
|
||||
|
||||
// Hash computes the hash of the KV's backend.
|
||||
Hash() (hash uint32, revision int64, err error)
|
||||
|
||||
// HashByRev computes the hash of all MVCC revisions up to a given revision.
|
||||
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
|
||||
// HashStorage returns HashStorage interface for KV storage.
|
||||
HashStorage() HashStorage
|
||||
|
||||
// Compact frees all superseded keys with revisions less than rev.
|
||||
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
|
||||
|
@ -639,7 +639,7 @@ func TestKVHash(t *testing.T) {
|
||||
kv := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
|
||||
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
|
||||
hashes[i], _, err = kv.Hash()
|
||||
hashes[i], _, err = kv.hash()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get hash: %v", err)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
@ -83,7 +82,8 @@ type store struct {
|
||||
|
||||
stopc chan struct{}
|
||||
|
||||
lg *zap.Logger
|
||||
lg *zap.Logger
|
||||
hashes HashStorage
|
||||
}
|
||||
|
||||
// NewStore returns a new store. It is useful to create a store inside
|
||||
@ -114,6 +114,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
|
||||
lg: lg,
|
||||
}
|
||||
s.hashes = newHashStorage(lg, s)
|
||||
s.ReadView = &readView{s}
|
||||
s.WriteView = &writeView{s}
|
||||
if s.le != nil {
|
||||
@ -156,7 +157,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||
func (s *store) hash() (hash uint32, revision int64, err error) {
|
||||
// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
|
||||
start := time.Now()
|
||||
|
||||
@ -167,7 +168,8 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||
return h, s.currentRev, err
|
||||
}
|
||||
|
||||
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
|
||||
func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) {
|
||||
var compactRev int64
|
||||
start := time.Now()
|
||||
|
||||
s.mu.RLock()
|
||||
@ -177,12 +179,11 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
|
||||
|
||||
if rev > 0 && rev <= compactRev {
|
||||
s.mu.RUnlock()
|
||||
return 0, 0, compactRev, ErrCompacted
|
||||
return KeyValueHash{}, 0, ErrCompacted
|
||||
} else if rev > 0 && rev > currentRev {
|
||||
s.mu.RUnlock()
|
||||
return 0, currentRev, 0, ErrFutureRev
|
||||
return KeyValueHash{}, currentRev, ErrFutureRev
|
||||
}
|
||||
|
||||
if rev == 0 {
|
||||
rev = currentRev
|
||||
}
|
||||
@ -192,48 +193,25 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
s.mu.RUnlock()
|
||||
|
||||
upper := revision{main: rev + 1}
|
||||
lower := revision{main: compactRev + 1}
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
|
||||
h.Write(schema.Key.Name())
|
||||
err = tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
|
||||
kr := bytesToRev(k)
|
||||
if !upper.GreaterThan(kr) {
|
||||
return nil
|
||||
}
|
||||
// skip revisions that are scheduled for deletion
|
||||
// due to compacting; don't skip if there isn't one.
|
||||
if lower.GreaterThan(kr) && len(keep) > 0 {
|
||||
if _, ok := keep[kr]; !ok {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
h.Write(k)
|
||||
h.Write(v)
|
||||
return nil
|
||||
})
|
||||
hash = h.Sum32()
|
||||
|
||||
hash, err = unsafeHashByRev(tx, compactRev, rev, keep)
|
||||
hashRevSec.Observe(time.Since(start).Seconds())
|
||||
return hash, currentRev, compactRev, err
|
||||
return hash, currentRev, err
|
||||
}
|
||||
|
||||
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
||||
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||
s.revMu.Lock()
|
||||
if rev <= s.compactMainRev {
|
||||
ch := make(chan struct{})
|
||||
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
|
||||
s.fifoSched.Schedule(f)
|
||||
s.revMu.Unlock()
|
||||
return ch, ErrCompacted
|
||||
return ch, 0, ErrCompacted
|
||||
}
|
||||
if rev > s.currentRev {
|
||||
s.revMu.Unlock()
|
||||
return nil, ErrFutureRev
|
||||
return nil, 0, ErrFutureRev
|
||||
}
|
||||
|
||||
compactMainRev := s.compactMainRev
|
||||
s.compactMainRev = rev
|
||||
|
||||
SetScheduledCompact(s.b.BatchTx(), rev)
|
||||
@ -242,23 +220,23 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
||||
|
||||
s.revMu.Unlock()
|
||||
|
||||
return nil, nil
|
||||
return nil, compactMainRev, nil
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
|
||||
ch := make(chan struct{})
|
||||
var j = func(ctx context.Context) {
|
||||
if ctx.Err() != nil {
|
||||
s.compactBarrier(ctx, ch)
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
keep := s.kvindex.Compact(rev)
|
||||
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||
if !s.scheduleCompaction(rev, keep) {
|
||||
hash, err := s.scheduleCompaction(rev, prevCompactRev)
|
||||
if err != nil {
|
||||
s.lg.Warn("Failed compaction", zap.Error(err))
|
||||
s.compactBarrier(context.TODO(), ch)
|
||||
return
|
||||
}
|
||||
s.hashes.Store(hash)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
@ -268,18 +246,18 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
}
|
||||
|
||||
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
||||
ch, err := s.updateCompactRev(rev)
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
if err != nil {
|
||||
return ch, err
|
||||
}
|
||||
|
||||
return s.compact(traceutil.TODO(), rev)
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev)
|
||||
}
|
||||
|
||||
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||
s.mu.Lock()
|
||||
|
||||
ch, err := s.updateCompactRev(rev)
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
trace.Step("check and update compact revision")
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
@ -287,7 +265,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
return s.compact(trace, rev)
|
||||
return s.compact(trace, rev, prevCompactRev)
|
||||
}
|
||||
|
||||
func (s *store) Commit() {
|
||||
@ -538,3 +516,7 @@ func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
|
||||
func isTombstone(b []byte) bool {
|
||||
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
|
||||
}
|
||||
|
||||
func (s *store) HashStorage() HashStorage {
|
||||
return s.hashes
|
||||
}
|
||||
|
@ -16,14 +16,19 @@ package mvcc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
|
||||
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
|
||||
totalStart := time.Now()
|
||||
keep := s.kvindex.Compact(compactMainRev)
|
||||
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
|
||||
|
||||
totalStart = time.Now()
|
||||
defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
|
||||
keyCompactions := 0
|
||||
defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
|
||||
@ -34,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
|
||||
batchNum := s.cfg.CompactionBatchLimit
|
||||
batchInterval := s.cfg.CompactionSleepInterval
|
||||
|
||||
h := newKVHasher(prevCompactRev, compactMainRev, keep)
|
||||
last := make([]byte, 8+1+8)
|
||||
for {
|
||||
var rev revision
|
||||
@ -43,24 +48,27 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.LockOutsideApply()
|
||||
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
|
||||
for i := range keys {
|
||||
rev = bytesToRev(keys[i])
|
||||
if _, ok := keep[rev]; !ok {
|
||||
tx.UnsafeDelete(schema.Key, key)
|
||||
tx.UnsafeDelete(schema.Key, keys[i])
|
||||
keyCompactions++
|
||||
}
|
||||
h.WriteKeyValue(keys[i], values[i])
|
||||
}
|
||||
|
||||
if len(keys) < batchNum {
|
||||
UnsafeSetFinishedCompact(tx, compactMainRev)
|
||||
tx.Unlock()
|
||||
hash := h.Hash()
|
||||
s.lg.Info(
|
||||
"finished scheduled compaction",
|
||||
zap.Int64("compact-revision", compactMainRev),
|
||||
zap.Duration("took", time.Since(totalStart)),
|
||||
zap.Uint32("hash", hash.Hash),
|
||||
)
|
||||
return true
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
tx.Unlock()
|
||||
@ -73,7 +81,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
select {
|
||||
case <-time.After(batchInterval):
|
||||
case <-s.stopc:
|
||||
return false
|
||||
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,10 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
fi := newFakeIndex()
|
||||
fi.indexCompactRespc <- tt.keep
|
||||
s.kvindex = fi
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
@ -79,7 +83,10 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
}
|
||||
tx.Unlock()
|
||||
|
||||
s.scheduleCompaction(tt.rev, tt.keep)
|
||||
_, err := s.scheduleCompaction(tt.rev, 0)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
tx.Lock()
|
||||
for _, rev := range tt.wrevs {
|
||||
|
@ -337,7 +337,7 @@ func TestStoreCompact(t *testing.T) {
|
||||
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
|
||||
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
|
||||
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
|
||||
|
||||
s.Compact(traceutil.TODO(), 3)
|
||||
s.fifoSched.WaitFinish(1)
|
||||
@ -560,14 +560,14 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
hash, _, compactRev, err := s.HashByRev(int64(rev))
|
||||
hash, _, err := s.HashStorage().HashByRev(int64(rev))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
select {
|
||||
case <-donec:
|
||||
return
|
||||
case hashCompactc <- hashKVResult{hash, compactRev}:
|
||||
case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}:
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -622,12 +622,12 @@ func TestHashKVZeroRevision(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hash1, _, _, err := s.HashByRev(int64(rev))
|
||||
hash1, _, err := s.HashStorage().HashByRev(int64(rev))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var hash2 uint32
|
||||
hash2, _, _, err = s.HashByRev(0)
|
||||
var hash2 KeyValueHash
|
||||
hash2, _, err = s.HashStorage().HashByRev(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -849,18 +849,11 @@ func newFakeStore(lg *zap.Logger) *store {
|
||||
b := &fakeBackend{&fakeBatchTx{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
rangeRespc: make(chan rangeResp, 5)}}
|
||||
fi := &fakeIndex{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
indexGetRespc: make(chan indexGetResp, 1),
|
||||
indexRangeRespc: make(chan indexRangeResp, 1),
|
||||
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
|
||||
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
||||
}
|
||||
s := &store{
|
||||
cfg: StoreConfig{CompactionBatchLimit: 10000},
|
||||
b: b,
|
||||
le: &lease.FakeLessor{},
|
||||
kvindex: fi,
|
||||
kvindex: newFakeIndex(),
|
||||
currentRev: 0,
|
||||
compactMainRev: -1,
|
||||
fifoSched: schedule.NewFIFOScheduler(),
|
||||
@ -868,9 +861,20 @@ func newFakeStore(lg *zap.Logger) *store {
|
||||
lg: lg,
|
||||
}
|
||||
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
|
||||
s.hashes = newHashStorage(lg, s)
|
||||
return s
|
||||
}
|
||||
|
||||
func newFakeIndex() *fakeIndex {
|
||||
return &fakeIndex{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
indexGetRespc: make(chan indexGetResp, 1),
|
||||
indexRangeRespc: make(chan indexRangeResp, 1),
|
||||
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
|
||||
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type rangeResp struct {
|
||||
keys [][]byte
|
||||
vals [][]byte
|
||||
|
105
server/storage/mvcc/testutil/hash.go
Normal file
105
server/storage/mvcc/testutil/hash.go
Normal file
@ -0,0 +1,105 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package testutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
// CompactionCycle is high prime used to test hash calculation between compactions.
|
||||
CompactionCycle = 71
|
||||
)
|
||||
|
||||
func TestCompactionHash(ctx context.Context, t *testing.T, h CompactionHashTestCase, compactionBatchLimit int) {
|
||||
var totalRevisions int64 = 1210
|
||||
assert.Less(t, int64(compactionBatchLimit), totalRevisions)
|
||||
assert.Less(t, int64(CompactionCycle*10), totalRevisions)
|
||||
var rev int64
|
||||
for ; rev < totalRevisions; rev += CompactionCycle {
|
||||
testCompactionHash(ctx, t, h, rev, rev+CompactionCycle)
|
||||
}
|
||||
testCompactionHash(ctx, t, h, rev, rev+totalRevisions)
|
||||
}
|
||||
|
||||
type CompactionHashTestCase interface {
|
||||
Put(ctx context.Context, key, value string) error
|
||||
Delete(ctx context.Context, key string) error
|
||||
HashByRev(ctx context.Context, rev int64) (KeyValueHash, error)
|
||||
Defrag(ctx context.Context) error
|
||||
Compact(ctx context.Context, rev int64) error
|
||||
}
|
||||
|
||||
type KeyValueHash struct {
|
||||
Hash uint32
|
||||
CompactRevision int64
|
||||
Revision int64
|
||||
}
|
||||
|
||||
func testCompactionHash(ctx context.Context, t *testing.T, h CompactionHashTestCase, start, stop int64) {
|
||||
for i := start; i <= stop; i++ {
|
||||
if i%67 == 0 {
|
||||
err := h.Delete(ctx, PickKey(i+83))
|
||||
assert.NoError(t, err, "error on delete")
|
||||
} else {
|
||||
err := h.Put(ctx, PickKey(i), fmt.Sprint(i))
|
||||
assert.NoError(t, err, "error on put")
|
||||
}
|
||||
}
|
||||
hash1, err := h.HashByRev(ctx, stop)
|
||||
assert.NoError(t, err, "error on hash (rev %v)", stop)
|
||||
|
||||
err = h.Compact(ctx, stop)
|
||||
assert.NoError(t, err, "error on compact (rev %v)", stop)
|
||||
|
||||
err = h.Defrag(ctx)
|
||||
assert.NoError(t, err, "error on defrag")
|
||||
|
||||
hash2, err := h.HashByRev(ctx, stop)
|
||||
assert.NoError(t, err, "error on hash (rev %v)", stop)
|
||||
assert.Equal(t, hash1, hash2, "hashes do not match on rev %v", stop)
|
||||
}
|
||||
|
||||
func PickKey(i int64) string {
|
||||
if i%(CompactionCycle*2) == 30 {
|
||||
return "zenek"
|
||||
}
|
||||
if i%CompactionCycle == 30 {
|
||||
return "xavery"
|
||||
}
|
||||
// Use low prime number to ensure repeats without alignment
|
||||
switch i % 7 {
|
||||
case 0:
|
||||
return "alice"
|
||||
case 1:
|
||||
return "bob"
|
||||
case 2:
|
||||
return "celine"
|
||||
case 3:
|
||||
return "dominik"
|
||||
case 4:
|
||||
return "eve"
|
||||
case 5:
|
||||
return "frederica"
|
||||
case 6:
|
||||
return "gorge"
|
||||
default:
|
||||
panic("Can't count")
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"google.golang.org/grpc"
|
||||
@ -69,6 +70,53 @@ func TestMaintenanceHashKV(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Change this to fuzz test
|
||||
func TestCompactionHash(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
cc, err := clus.ClusterClient()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL()}, 1000)
|
||||
}
|
||||
|
||||
type hashTestCase struct {
|
||||
*clientv3.Client
|
||||
url string
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Put(ctx context.Context, key, value string) error {
|
||||
_, err := tc.Client.Put(ctx, key, value)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Delete(ctx context.Context, key string) error {
|
||||
_, err := tc.Client.Delete(ctx, key)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) {
|
||||
resp, err := tc.Client.HashKV(ctx, tc.url, rev)
|
||||
return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Defrag(ctx context.Context) error {
|
||||
_, err := tc.Client.Defragment(ctx, tc.url)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Compact(ctx context.Context, rev int64) error {
|
||||
_, err := tc.Client.Compact(ctx, rev)
|
||||
// Wait for compaction to be compacted
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
return err
|
||||
}
|
||||
|
||||
func TestMaintenanceMoveLeader(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
|
84
tests/integration/hashkv_test.go
Normal file
84
tests/integration/hashkv_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
// TODO: Change this to fuzz test
|
||||
func TestCompactionHash(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
cc, err := clus.ClusterClient()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", clus.Members[0].PeerURLs[0].Host)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client}, 1000)
|
||||
}
|
||||
|
||||
type hashTestCase struct {
|
||||
*clientv3.Client
|
||||
url string
|
||||
http *http.Client
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Put(ctx context.Context, key, value string) error {
|
||||
_, err := tc.Client.Put(ctx, key, value)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Delete(ctx context.Context, key string) error {
|
||||
_, err := tc.Client.Delete(ctx, key)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) {
|
||||
resp, err := etcdserver.HashByRev(ctx, tc.http, "http://unix", rev)
|
||||
return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Defrag(ctx context.Context) error {
|
||||
_, err := tc.Client.Defragment(ctx, tc.url)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tc hashTestCase) Compact(ctx context.Context, rev int64) error {
|
||||
_, err := tc.Client.Compact(ctx, rev)
|
||||
// Wait for compaction to be compacted
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
return err
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user