mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: Add experimental-compaction-batch-limit flag
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
parent
b30c1eb2c8
commit
1e213b7ab6
@ -446,6 +446,11 @@ Follow the instructions when using these flags.
|
|||||||
+ default: 0s
|
+ default: 0s
|
||||||
+ env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME
|
+ env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME
|
||||||
|
|
||||||
|
### --experimental-compaction-batch-limit
|
||||||
|
+ Sets the maximum revisions deleted in each compaction batch.
|
||||||
|
+ default: 1000
|
||||||
|
+ env variable: ETCD_EXPERIMENTAL_COMPACTION_BATCH_LIMIT
|
||||||
|
|
||||||
[build-cluster]: clustering.md#static
|
[build-cluster]: clustering.md#static
|
||||||
[reconfig]: runtime-configuration.md
|
[reconfig]: runtime-configuration.md
|
||||||
[discovery]: clustering.md#discovery
|
[discovery]: clustering.md#discovery
|
||||||
|
@ -383,7 +383,7 @@ func (s *v3Manager) saveDB() error {
|
|||||||
// a lessor never timeouts leases
|
// a lessor never timeouts leases
|
||||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
||||||
|
|
||||||
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
|
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||||
txn := mvs.Write()
|
txn := mvs.Write()
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
del := func(k, v []byte) error {
|
del := func(k, v []byte) error {
|
||||||
|
@ -280,6 +280,7 @@ type Config struct {
|
|||||||
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
|
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
|
||||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||||
|
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||||
|
|
||||||
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
||||||
ForceNewCluster bool `json:"force-new-cluster"`
|
ForceNewCluster bool `json:"force-new-cluster"`
|
||||||
|
@ -205,6 +205,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
ForceNewCluster: cfg.ForceNewCluster,
|
ForceNewCluster: cfg.ForceNewCluster,
|
||||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
}
|
}
|
||||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||||
|
@ -255,6 +255,7 @@ func newConfig() *config {
|
|||||||
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
||||||
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
|
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||||
|
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||||
|
|
||||||
// unsafe
|
// unsafe
|
||||||
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
|
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
|
||||||
|
@ -204,6 +204,8 @@ Experimental feature:
|
|||||||
ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
|
ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
|
||||||
--experimental-enable-lease-checkpoint
|
--experimental-enable-lease-checkpoint
|
||||||
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||||
|
--experimental-compaction-batch-limit
|
||||||
|
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
||||||
|
|
||||||
Unsafe feature:
|
Unsafe feature:
|
||||||
--force-new-cluster 'false'
|
--force-new-cluster 'false'
|
||||||
|
@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
|
|||||||
// case, replace the db with the snapshot db sent by the leader.
|
// case, replace the db with the snapshot db sent by the leader.
|
||||||
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
var cIndex consistentIndex
|
var cIndex consistentIndex
|
||||||
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex)
|
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
defer kv.Close()
|
defer kv.Close()
|
||||||
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
||||||
return oldbe, nil
|
return oldbe, nil
|
||||||
|
@ -112,6 +112,7 @@ type ServerConfig struct {
|
|||||||
|
|
||||||
AutoCompactionRetention time.Duration
|
AutoCompactionRetention time.Duration
|
||||||
AutoCompactionMode string
|
AutoCompactionMode string
|
||||||
|
CompactionBatchLimit int
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
MaxTxnOps uint
|
MaxTxnOps uint
|
||||||
|
|
||||||
|
@ -539,7 +539,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||||
})
|
})
|
||||||
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
|
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
if beExist {
|
if beExist {
|
||||||
kvindex := srv.kv.ConsistentIndex()
|
kvindex := srv.kv.ConsistentIndex()
|
||||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||||
|
@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) {
|
|||||||
r: *r,
|
r: *r,
|
||||||
v2store: st,
|
v2store: st,
|
||||||
}
|
}
|
||||||
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
|
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
|
||||||
srv.be = be
|
srv.be = be
|
||||||
|
|
||||||
ch := make(chan struct{}, 2)
|
ch := make(chan struct{}, 2)
|
||||||
@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
|
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
defer os.RemoveAll(tmpPath)
|
defer os.RemoveAll(tmpPath)
|
||||||
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
|
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
|
||||||
s.be = be
|
s.be = be
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||||
|
|
||||||
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
|
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
|
||||||
srv.be = be
|
srv.be = be
|
||||||
|
|
||||||
srv.start()
|
srv.start()
|
||||||
@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
os.RemoveAll(tmpPath)
|
os.RemoveAll(tmpPath)
|
||||||
}()
|
}()
|
||||||
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
|
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
|
||||||
s.be = be
|
s.be = be
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
|
@ -169,7 +169,7 @@ func TestV3CorruptAlarm(t *testing.T) {
|
|||||||
clus.Members[0].Stop(t)
|
clus.Members[0].Stop(t)
|
||||||
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
|
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
|
||||||
be := backend.NewDefaultBackend(fp)
|
be := backend.NewDefaultBackend(fp)
|
||||||
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13})
|
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
|
||||||
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
|
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
|
||||||
s.Put([]byte("abc"), []byte("def"), 0)
|
s.Put([]byte("abc"), []byte("def"), 0)
|
||||||
s.Put([]byte("xyz"), []byte("123"), 0)
|
s.Put([]byte("xyz"), []byte("123"), 0)
|
||||||
|
@ -76,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRange(t *testing.T, f rangeFunc) {
|
func testKVRange(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -142,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRangeRev(t *testing.T, f rangeFunc) {
|
func testKVRangeRev(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -178,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
put3TestKVs(s)
|
put3TestKVs(s)
|
||||||
@ -211,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
|
|||||||
|
|
||||||
func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
kvs := put3TestKVs(s)
|
kvs := put3TestKVs(s)
|
||||||
@ -252,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
|
|||||||
|
|
||||||
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -314,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
|
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
|
||||||
@ -334,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
|
|||||||
|
|
||||||
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
@ -355,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
|||||||
// test that range, put, delete on single key in sequence repeatedly works correctly.
|
// test that range, put, delete on single key in sequence repeatedly works correctly.
|
||||||
func TestKVOperationInSequence(t *testing.T) {
|
func TestKVOperationInSequence(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -402,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVTxnBlockWriteOperations(t *testing.T) {
|
func TestKVTxnBlockWriteOperations(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
tests := []func(){
|
tests := []func(){
|
||||||
func() { s.Put([]byte("foo"), nil, lease.NoLease) },
|
func() { s.Put([]byte("foo"), nil, lease.NoLease) },
|
||||||
@ -435,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVTxnNonBlockRange(t *testing.T) {
|
func TestKVTxnNonBlockRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
txn := s.Write()
|
txn := s.Write()
|
||||||
@ -456,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
|
|||||||
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
|
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
|
||||||
func TestKVTxnOperationInSequence(t *testing.T) {
|
func TestKVTxnOperationInSequence(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -506,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVCompactReserveLastValue(t *testing.T) {
|
func TestKVCompactReserveLastValue(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar0"), 1)
|
s.Put([]byte("foo"), []byte("bar0"), 1)
|
||||||
@ -560,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
|
|||||||
|
|
||||||
func TestKVCompactBad(t *testing.T) {
|
func TestKVCompactBad(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
|
||||||
@ -593,7 +593,7 @@ func TestKVHash(t *testing.T) {
|
|||||||
for i := 0; i < len(hashes); i++ {
|
for i := 0; i < len(hashes); i++ {
|
||||||
var err error
|
var err error
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
|
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
|
||||||
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
|
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
|
||||||
hashes[i], _, err = kv.Hash()
|
hashes[i], _, err = kv.Hash()
|
||||||
@ -631,7 +631,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
tt(s)
|
tt(s)
|
||||||
var kvss [][]mvccpb.KeyValue
|
var kvss [][]mvccpb.KeyValue
|
||||||
for k := int64(0); k < 10; k++ {
|
for k := int64(0); k < 10; k++ {
|
||||||
@ -643,7 +643,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
s.Close()
|
s.Close()
|
||||||
|
|
||||||
// ns should recover the the previous state from backend.
|
// ns should recover the the previous state from backend.
|
||||||
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
|
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
|
||||||
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
|
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
|
||||||
@ -675,7 +675,7 @@ func readGaugeInt(g prometheus.Gauge) int {
|
|||||||
|
|
||||||
func TestKVSnapshot(t *testing.T) {
|
func TestKVSnapshot(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
wkvs := put3TestKVs(s)
|
wkvs := put3TestKVs(s)
|
||||||
@ -695,7 +695,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer ns.Close()
|
defer ns.Close()
|
||||||
r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
|
r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -711,7 +711,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchableKVWatch(t *testing.T) {
|
func TestWatchableKVWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
|
@ -60,6 +60,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var restoreChunkKeys = 10000 // non-const for testing
|
var restoreChunkKeys = 10000 // non-const for testing
|
||||||
|
var defaultCompactBatchLimit = 1000
|
||||||
|
|
||||||
// ConsistentIndexGetter is an interface that wraps the Get method.
|
// ConsistentIndexGetter is an interface that wraps the Get method.
|
||||||
// Consistent index is the offset of an entry in a consistent replicated log.
|
// Consistent index is the offset of an entry in a consistent replicated log.
|
||||||
@ -68,10 +69,16 @@ type ConsistentIndexGetter interface {
|
|||||||
ConsistentIndex() uint64
|
ConsistentIndex() uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StoreConfig struct {
|
||||||
|
CompactionBatchLimit int
|
||||||
|
}
|
||||||
|
|
||||||
type store struct {
|
type store struct {
|
||||||
ReadView
|
ReadView
|
||||||
WriteView
|
WriteView
|
||||||
|
|
||||||
|
cfg StoreConfig
|
||||||
|
|
||||||
// consistentIndex caches the "consistent_index" key's value. Accessed
|
// consistentIndex caches the "consistent_index" key's value. Accessed
|
||||||
// through atomics so must be 64-bit aligned.
|
// through atomics so must be 64-bit aligned.
|
||||||
consistentIndex uint64
|
consistentIndex uint64
|
||||||
@ -108,8 +115,12 @@ type store struct {
|
|||||||
|
|
||||||
// NewStore returns a new store. It is useful to create a store inside
|
// NewStore returns a new store. It is useful to create a store inside
|
||||||
// mvcc pkg. It should only be used for testing externally.
|
// mvcc pkg. It should only be used for testing externally.
|
||||||
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
|
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
|
||||||
|
if cfg.CompactionBatchLimit == 0 {
|
||||||
|
cfg.CompactionBatchLimit = defaultCompactBatchLimit
|
||||||
|
}
|
||||||
s := &store{
|
s := &store{
|
||||||
|
cfg: cfg,
|
||||||
b: b,
|
b: b,
|
||||||
ig: ig,
|
ig: ig,
|
||||||
kvindex: newTreeIndex(lg),
|
kvindex: newTreeIndex(lg),
|
||||||
|
@ -33,7 +33,7 @@ func (i *fakeConsistentIndex) ConsistentIndex() uint64 {
|
|||||||
func BenchmarkStorePut(b *testing.B) {
|
func BenchmarkStorePut(b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -53,7 +53,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) }
|
|||||||
func benchmarkStoreRange(b *testing.B, n int) {
|
func benchmarkStoreRange(b *testing.B, n int) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// 64 byte key/val
|
// 64 byte key/val
|
||||||
@ -81,7 +81,7 @@ func benchmarkStoreRange(b *testing.B, n int) {
|
|||||||
func BenchmarkConsistentIndex(b *testing.B) {
|
func BenchmarkConsistentIndex(b *testing.B) {
|
||||||
fci := fakeConsistentIndex(10)
|
fci := fakeConsistentIndex(10)
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
@ -100,7 +100,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
|
|||||||
func BenchmarkStorePutUpdate(b *testing.B) {
|
func BenchmarkStorePutUpdate(b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -119,7 +119,7 @@ func BenchmarkStorePutUpdate(b *testing.B) {
|
|||||||
func BenchmarkStoreTxnPut(b *testing.B) {
|
func BenchmarkStoreTxnPut(b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -140,7 +140,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
|
|||||||
func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
|
func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
// use closure to capture 's' to pick up the reassignment
|
// use closure to capture 's' to pick up the reassignment
|
||||||
defer func() { cleanup(s, be, tmpPath) }()
|
defer func() { cleanup(s, be, tmpPath) }()
|
||||||
|
|
||||||
@ -160,7 +160,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
|
|||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkStoreRestoreRevs1(b *testing.B) {
|
func BenchmarkStoreRestoreRevs1(b *testing.B) {
|
||||||
|
@ -30,25 +30,23 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
|||||||
end := make([]byte, 8)
|
end := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||||
|
|
||||||
batchsize := int64(1000)
|
|
||||||
last := make([]byte, 8+1+8)
|
last := make([]byte, 8+1+8)
|
||||||
for {
|
for {
|
||||||
var rev revision
|
var rev revision
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
|
keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
|
||||||
keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
rev = bytesToRev(key)
|
rev = bytesToRev(key)
|
||||||
if _, ok := keep[rev]; !ok {
|
if _, ok := keep[rev]; !ok {
|
||||||
tx.UnsafeDelete(keyBucketName, key)
|
tx.UnsafeDelete(keyBucketName, key)
|
||||||
keyCompactions++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(keys) < int(batchsize) {
|
if len(keys) < s.cfg.CompactionBatchLimit {
|
||||||
rbytes := make([]byte, 8+1+8)
|
rbytes := make([]byte, 8+1+8)
|
||||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||||
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
|
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
|
||||||
@ -60,7 +58,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
|||||||
zap.Duration("took", time.Since(totalStart)),
|
zap.Duration("took", time.Since(totalStart)),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
|
plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -68,6 +66,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
|||||||
// update last
|
// update last
|
||||||
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
|
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
|
// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
|
||||||
s.b.ForceCommit()
|
s.b.ForceCommit()
|
||||||
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func TestScheduleCompaction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
|
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
@ -99,7 +99,7 @@ func TestScheduleCompaction(t *testing.T) {
|
|||||||
|
|
||||||
func TestCompactAllAndRestore(t *testing.T) {
|
func TestCompactAllAndRestore(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
@ -125,7 +125,7 @@ func TestCompactAllAndRestore(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
if s1.Rev() != rev {
|
if s1.Rev() != rev {
|
||||||
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
|
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ import (
|
|||||||
|
|
||||||
func TestStoreRev(t *testing.T) {
|
func TestStoreRev(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
@ -424,7 +424,7 @@ func TestRestoreDelete(t *testing.T) {
|
|||||||
defer func() { restoreChunkKeys = oldChunk }()
|
defer func() { restoreChunkKeys = oldChunk }()
|
||||||
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
keys := make(map[string]struct{})
|
keys := make(map[string]struct{})
|
||||||
@ -450,7 +450,7 @@ func TestRestoreDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
s.Close()
|
s.Close()
|
||||||
|
|
||||||
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
ks := fmt.Sprintf("foo-%d", i)
|
ks := fmt.Sprintf("foo-%d", i)
|
||||||
@ -472,7 +472,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
|||||||
tests := []string{"recreate", "restore"}
|
tests := []string{"recreate", "restore"}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
@ -492,7 +492,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
|||||||
var s *store
|
var s *store
|
||||||
switch test {
|
switch test {
|
||||||
case "recreate":
|
case "recreate":
|
||||||
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
case "restore":
|
case "restore":
|
||||||
s0.Restore(b)
|
s0.Restore(b)
|
||||||
s = s0
|
s = s0
|
||||||
@ -534,7 +534,7 @@ type hashKVResult struct {
|
|||||||
// TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
|
// TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
|
||||||
func TestHashKVWhenCompacting(t *testing.T) {
|
func TestHashKVWhenCompacting(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
rev := 10000
|
rev := 10000
|
||||||
@ -602,10 +602,10 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
|||||||
// correct hash value with latest revision.
|
// correct hash value with latest revision.
|
||||||
func TestHashKVZeroRevision(t *testing.T) {
|
func TestHashKVZeroRevision(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
rev := 1000
|
rev := 10000
|
||||||
for i := 2; i <= rev; i++ {
|
for i := 2; i <= rev; i++ {
|
||||||
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
||||||
}
|
}
|
||||||
@ -635,7 +635,7 @@ func TestTxnPut(t *testing.T) {
|
|||||||
vals := createBytesSlice(bytesN, sliceN)
|
vals := createBytesSlice(bytesN, sliceN)
|
||||||
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
for i := 0; i < sliceN; i++ {
|
for i := 0; i < sliceN; i++ {
|
||||||
@ -651,7 +651,7 @@ func TestTxnPut(t *testing.T) {
|
|||||||
// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
|
// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
|
||||||
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
// write something to read later
|
// write something to read later
|
||||||
@ -720,7 +720,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
|||||||
mu sync.Mutex // mu protectes committedKVs
|
mu sync.Mutex // mu protectes committedKVs
|
||||||
)
|
)
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -846,6 +846,7 @@ func newFakeStore() *store {
|
|||||||
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
||||||
}
|
}
|
||||||
s := &store{
|
s := &store{
|
||||||
|
cfg: StoreConfig{CompactionBatchLimit: 10000},
|
||||||
b: b,
|
b: b,
|
||||||
le: &lease.FakeLessor{},
|
le: &lease.FakeLessor{},
|
||||||
kvindex: fi,
|
kvindex: fi,
|
||||||
|
@ -68,13 +68,13 @@ type watchableStore struct {
|
|||||||
// cancel operations.
|
// cancel operations.
|
||||||
type cancelFunc func()
|
type cancelFunc func()
|
||||||
|
|
||||||
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
|
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
|
||||||
return newWatchableStore(lg, b, le, ig)
|
return newWatchableStore(lg, b, le, ig, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
|
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(lg, b, le, ig),
|
store: NewStore(lg, b, le, ig, cfg),
|
||||||
victimc: make(chan struct{}, 1),
|
victimc: make(chan struct{}, 1),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
|
@ -27,7 +27,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkWatchableStorePut(b *testing.B) {
|
func BenchmarkWatchableStorePut(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -48,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
|
|||||||
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
|
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i)
|
s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -79,7 +79,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
|
|||||||
|
|
||||||
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
k := []byte("testkey")
|
k := []byte("testkey")
|
||||||
@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
|||||||
// we should put to simulate the real-world use cases.
|
// we should put to simulate the real-world use cases.
|
||||||
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
// manually create watchableStore instead of newWatchableStore
|
// manually create watchableStore instead of newWatchableStore
|
||||||
// because newWatchableStore periodically calls syncWatchersLoop
|
// because newWatchableStore periodically calls syncWatchersLoop
|
||||||
@ -179,7 +179,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||||||
|
|
||||||
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -53,7 +53,7 @@ func TestWatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestNewWatcherCancel(t *testing.T) {
|
func TestNewWatcherCancel(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -85,7 +85,7 @@ func TestCancelUnsynced(t *testing.T) {
|
|||||||
// method to sync watchers in unsynced map. We want to keep watchers
|
// method to sync watchers in unsynced map. We want to keep watchers
|
||||||
// in unsynced to test if syncWatchers works as expected.
|
// in unsynced to test if syncWatchers works as expected.
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
|
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
|
|
||||||
// to make the test not crash from assigning to nil map.
|
// to make the test not crash from assigning to nil map.
|
||||||
@ -140,7 +140,7 @@ func TestSyncWatchers(t *testing.T) {
|
|||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
|
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
|
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
}
|
}
|
||||||
@ -223,7 +223,7 @@ func TestSyncWatchers(t *testing.T) {
|
|||||||
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
||||||
func TestWatchCompacted(t *testing.T) {
|
func TestWatchCompacted(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -260,7 +260,7 @@ func TestWatchCompacted(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchFutureRev(t *testing.T) {
|
func TestWatchFutureRev(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -301,7 +301,7 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
test := func(delay time.Duration) func(t *testing.T) {
|
test := func(delay time.Duration) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
testKey := []byte("foo")
|
testKey := []byte("foo")
|
||||||
@ -309,7 +309,7 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||||
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil)
|
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(newStore, newBackend, newPath)
|
defer cleanup(newStore, newBackend, newPath)
|
||||||
|
|
||||||
w := newStore.NewWatchStream()
|
w := newStore.NewWatchStream()
|
||||||
@ -347,11 +347,11 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
// 5. choose the watcher from step 1, without panic
|
// 5. choose the watcher from step 1, without panic
|
||||||
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||||
b1, b1Path := backend.NewDefaultTmpBackend()
|
b1, b1Path := backend.NewDefaultTmpBackend()
|
||||||
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil)
|
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s1, b1, b1Path)
|
defer cleanup(s1, b1, b1Path)
|
||||||
|
|
||||||
b2, b2Path := backend.NewDefaultTmpBackend()
|
b2, b2Path := backend.NewDefaultTmpBackend()
|
||||||
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil)
|
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer cleanup(s2, b2, b2Path)
|
defer cleanup(s2, b2, b2Path)
|
||||||
|
|
||||||
testKey, testValue := []byte("foo"), []byte("bar")
|
testKey, testValue := []byte("foo"), []byte("bar")
|
||||||
@ -398,7 +398,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
|||||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||||
func TestWatchBatchUnsynced(t *testing.T) {
|
func TestWatchBatchUnsynced(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
oldMaxRevs := watchBatchMaxRevs
|
oldMaxRevs := watchBatchMaxRevs
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -532,7 +532,7 @@ func TestWatchVictims(t *testing.T) {
|
|||||||
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||||
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -610,7 +610,7 @@ func TestWatchVictims(t *testing.T) {
|
|||||||
// canceling its watches.
|
// canceling its watches.
|
||||||
func TestStressWatchCancelClose(t *testing.T) {
|
func TestStressWatchCancelClose(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer cleanup(watchable, be, tmpPath)
|
defer cleanup(watchable, be, tmpPath)
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
// and the watched event attaches the correct watchID.
|
// and the watched event attaches the correct watchID.
|
||||||
func TestWatcherWatchID(t *testing.T) {
|
func TestWatcherWatchID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatcherRequestsCustomID(t *testing.T) {
|
func TestWatcherRequestsCustomID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
|
|||||||
// and returns events with matching prefixes.
|
// and returns events with matching prefixes.
|
||||||
func TestWatcherWatchPrefix(t *testing.T) {
|
func TestWatcherWatchPrefix(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
|||||||
// does not create watcher, which panics when canceling in range tree.
|
// does not create watcher, which panics when canceling in range tree.
|
||||||
func TestWatcherWatchWrongRange(t *testing.T) {
|
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchDeleteRange(t *testing.T) {
|
func TestWatchDeleteRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
|
|||||||
// with given id inside watchStream.
|
// with given id inside watchStream.
|
||||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -295,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) {
|
|||||||
// method to sync watchers in unsynced map. We want to keep watchers
|
// method to sync watchers in unsynced map. We want to keep watchers
|
||||||
// in unsynced to test if syncWatchers works as expected.
|
// in unsynced to test if syncWatchers works as expected.
|
||||||
s := &watchableStore{
|
s := &watchableStore{
|
||||||
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
|
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
|
||||||
unsynced: newWatcherGroup(),
|
unsynced: newWatcherGroup(),
|
||||||
synced: newWatcherGroup(),
|
synced: newWatcherGroup(),
|
||||||
}
|
}
|
||||||
@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatcherWatchWithFilter(t *testing.T) {
|
func TestWatcherWatchWithFilter(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
|
@ -38,7 +38,7 @@ func initMVCC() {
|
|||||||
bcfg := backend.DefaultBackendConfig()
|
bcfg := backend.DefaultBackendConfig()
|
||||||
bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit
|
bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit
|
||||||
be := backend.New(bcfg)
|
be := backend.New(bcfg)
|
||||||
s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
|
s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, mvcc.StoreConfig{})
|
||||||
os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
|
os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user