Simplify KVStore interaction with cindex thanks to hooks.

This commit is contained in:
Piotr Tabor
2021-04-12 19:49:58 +02:00
parent fe3254aee3
commit 2dbecea5b2
10 changed files with 84 additions and 103 deletions

View File

@@ -34,7 +34,7 @@ import (
) )
type KVGetter interface { type KVGetter interface {
KV() mvcc.ConsistentWatchableKV KV() mvcc.WatchableKV
} }
type BackendGetter interface { type BackendGetter interface {

View File

@@ -28,6 +28,10 @@ var (
ConsistentIndexKeyName = []byte("consistent_index") ConsistentIndexKeyName = []byte("consistent_index")
) )
type Backend interface {
BatchTx() backend.BatchTx
}
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
type ConsistentIndexer interface { type ConsistentIndexer interface {
@@ -41,32 +45,38 @@ type ConsistentIndexer interface {
// It saves consistentIndex to the underlying stable storage. // It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx) UnsafeSave(tx backend.BatchTx)
// SetBatchTx set the available backend.BatchTx for ConsistentIndexer. // SetBackend set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx) SetBackend(be Backend)
} }
// consistentIndex implements the ConsistentIndexer interface. // consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct { type consistentIndex struct {
tx backend.BatchTx
// consistentIndex represents the offset of an entry in a consistent replica log. // consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value. Accessed // it caches the "consistent_index" key's value.
// through atomics so must be 64-bit aligned. // Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64 consistentIndex uint64
// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
mutex sync.Mutex mutex sync.Mutex
} }
func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { // NewConsistentIndex creates a new consistent index.
return &consistentIndex{tx: tx} // If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`.
func NewConsistentIndex(be Backend) ConsistentIndexer {
return &consistentIndex{be: be}
} }
func (ci *consistentIndex) ConsistentIndex() uint64 { func (ci *consistentIndex) ConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index return index
} }
ci.mutex.Lock() ci.mutex.Lock()
defer ci.mutex.Unlock() defer ci.mutex.Unlock()
v := ReadConsistentIndex(ci.tx)
v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
return v return v
} }
@@ -76,18 +86,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex) index := atomic.LoadUint64(&ci.consistentIndex)
UnsafeUpdateConsistentIndex(tx, index, true)
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
unsafeUpdateConsistentIndex(tx, index)
} }
func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { func (ci *consistentIndex) SetBackend(be Backend) {
ci.mutex.Lock() ci.mutex.Lock()
defer ci.mutex.Unlock() defer ci.mutex.Unlock()
ci.tx = tx ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
} }
func NewFakeConsistentIndex(index uint64) ConsistentIndexer { func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
@@ -102,13 +109,21 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.index, index)
} }
func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) { func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName) tx.UnsafeCreateBucket(MetaBucketName)
} }
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(MetaBucketName)
}
// unsafeGetConsistentIndex loads consistent index from given transaction. // unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found. // returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
@@ -128,7 +143,19 @@ func ReadConsistentIndex(tx backend.ReadTx) uint64 {
return unsafeReadConsistentIndex(tx) return unsafeReadConsistentIndex(tx)
} }
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) { func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
}
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index) binary.BigEndian.PutUint64(bs, index)
// put the index into the underlying backend // put the index into the underlying backend
@@ -136,13 +163,8 @@ func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) {
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
} }
func UpdateConsistentIndex(tx backend.BatchTx, index uint64) { func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
unsafeUpdateConsistentIndex(tx, index)
} }

View File

@@ -27,13 +27,14 @@ import (
func TestConsistentIndex(t *testing.T) { func TestConsistentIndex(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
ci := NewConsistentIndex(be.BatchTx()) ci := NewConsistentIndex(be)
tx := be.BatchTx() tx := be.BatchTx()
if tx == nil { if tx == nil {
t.Fatal("batch tx is nil") t.Fatal("batch tx is nil")
} }
tx.Lock() tx.Lock()
UnsafeCreateMetaBucket(tx) UnsafeCreateMetaBucket(tx)
tx.Unlock() tx.Unlock()
be.ForceCommit() be.ForceCommit()
@@ -51,14 +52,13 @@ func TestConsistentIndex(t *testing.T) {
b := backend.NewDefaultBackend(tmpPath) b := backend.NewDefaultBackend(tmpPath)
defer b.Close() defer b.Close()
ci.SetConsistentIndex(0) ci.SetBackend(b)
ci.SetBatchTx(b.BatchTx())
index = ci.ConsistentIndex() index = ci.ConsistentIndex()
if index != r { if index != r {
t.Errorf("expected %d,got %d", r, index) t.Errorf("expected %d,got %d", r, index)
} }
ci = NewConsistentIndex(b.BatchTx()) ci = NewConsistentIndex(b)
index = ci.ConsistentIndex() index = ci.ConsistentIndex()
if index != r { if index != r {
t.Errorf("expected %d,got %d", r, index) t.Errorf("expected %d,got %d", r, index)

View File

@@ -256,7 +256,7 @@ type EtcdServer struct {
applyV3Internal applierV3Internal applyV3Internal applierV3Internal
applyWait wait.WaitTime applyWait wait.WaitTime
kv mvcc.ConsistentWatchableKV kv mvcc.WatchableKV
lessor lease.Lessor lessor lease.Lessor
bemu sync.Mutex bemu sync.Mutex
be backend.Backend be backend.Backend
@@ -1210,8 +1210,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err)) lg.Panic("failed to restore mvcc store", zap.Error(err))
} }
s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
lg.Info("restored mvcc store")
// Closing old backend might block until all the txns // Closing old backend might block until all the txns
// on the backend are finished. // on the backend are finished.
@@ -2522,7 +2521,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
} }
} }
func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend { func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock() s.bemu.Lock()
defer s.bemu.Unlock() defer s.bemu.Unlock()

View File

@@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
} }
le.leaseMap[id] = l le.leaseMap[id] = l
l.persistTo(le.b, le.ci) l.persistTo(le.b)
leaseTotalTTLs.Observe(float64(l.ttl)) leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc() leaseGranted.Inc()
@@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error {
// kv deletion. Or we might end up with not executing the revoke or not // kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between. // deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
if le.ci != nil && len(keys) == 0 {
le.ci.UnsafeSave(le.b.BatchTx())
}
txn.End() txn.End()
@@ -828,7 +824,7 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0 return l.Remaining() <= 0
} }
func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID)) key := int64ToBytes(int64(l.ID))
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
@@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
b.BatchTx().Lock() b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val) b.BatchTx().UnsafePut(leaseBucketName, key, val)
if ci != nil {
ci.UnsafeSave(b.BatchTx())
}
b.BatchTx().Unlock() b.BatchTx().Unlock()
} }

View File

@@ -139,14 +139,3 @@ type Watchable interface {
// watch events happened or happening on the KV. // watch events happened or happening on the KV.
NewWatchStream() WatchStream NewWatchStream() WatchStream
} }
// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the current consistent index of the KV.
ConsistentIndex() uint64
}

View File

@@ -69,8 +69,6 @@ type store struct {
// mu read locks for txns and write locks for non-txn store changes. // mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex mu sync.RWMutex
ci cindex.ConsistentIndexer
b backend.Backend b backend.Backend
kvindex index kvindex index
@@ -94,7 +92,7 @@ type store struct {
// NewStore returns a new store. It is useful to create a store inside // NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally. // mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
if lg == nil { if lg == nil {
lg = zap.NewNop() lg = zap.NewNop()
} }
@@ -104,7 +102,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons
s := &store{ s := &store{
cfg: cfg, cfg: cfg,
b: b, b: b,
ci: ci,
kvindex: newTreeIndex(lg), kvindex: newTreeIndex(lg),
le: le, le: le,
@@ -314,11 +311,6 @@ func init() {
func (s *store) Commit() { func (s *store) Commit() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()
s.b.ForceCommit() s.b.ForceCommit()
} }
@@ -342,8 +334,6 @@ func (s *store) Restore(b backend.Backend) error {
s.fifoSched = schedule.NewFIFOScheduler() s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{}) s.stopc = make(chan struct{})
s.ci.SetBatchTx(b.BatchTx())
s.ci.SetConsistentIndex(0)
return s.restore() return s.restore()
} }
@@ -436,9 +426,7 @@ func (s *store) restore() error {
tx.Unlock() tx.Unlock()
s.lg.Info("kvstore restored", s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))
zap.Uint64("consistent-index", s.ConsistentIndex()),
zap.Int64("current-rev", s.currentRev))
if scheduledCompact != 0 { if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil { if _, err := s.compactLockfree(scheduledCompact); err != nil {
@@ -533,19 +521,6 @@ func (s *store) Close() error {
return nil return nil
} }
func (s *store) saveIndex(tx backend.BatchTx) {
if s.ci != nil {
s.ci.UnsafeSave(tx)
}
}
func (s *store) ConsistentIndex() uint64 {
if s.ci != nil {
return s.ci.ConsistentIndex()
}
return 0
}
func (s *store) setupMetricsReporter() { func (s *store) setupMetricsReporter() {
b := s.b b := s.b
reportDbTotalSizeInBytesMu.Lock() reportDbTotalSizeInBytesMu.Lock()

View File

@@ -18,6 +18,7 @@ import (
"context" "context"
"testing" "testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
@@ -28,7 +29,7 @@ import (
func BenchmarkStorePut(b *testing.B) { func BenchmarkStorePut(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath) defer cleanup(s, be, tmpPath)
// arbitrary number of bytes // arbitrary number of bytes
@@ -47,7 +48,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) }
func benchmarkStoreRange(b *testing.B, n int) { func benchmarkStoreRange(b *testing.B, n int) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath) defer cleanup(s, be, tmpPath)
// 64 byte key/val // 64 byte key/val
@@ -73,26 +74,30 @@ func benchmarkStoreRange(b *testing.B, n int) {
} }
func BenchmarkConsistentIndex(b *testing.B) { func BenchmarkConsistentIndex(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, _ := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) ci := cindex.NewConsistentIndex(be)
defer cleanup(s, be, tmpPath) defer betesting.Close(b, be)
tx := s.b.BatchTx() // This will force the index to be reread from scratch on each call.
ci.SetConsistentIndex(0)
tx := be.BatchTx()
tx.Lock() tx.Lock()
s.saveIndex(tx) cindex.UnsafeCreateMetaBucket(tx)
ci.UnsafeSave(tx)
tx.Unlock() tx.Unlock()
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.ConsistentIndex() ci.ConsistentIndex()
} }
} }
// BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key // BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key
func BenchmarkStorePutUpdate(b *testing.B) { func BenchmarkStorePutUpdate(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath) defer cleanup(s, be, tmpPath)
// arbitrary number of bytes // arbitrary number of bytes
@@ -110,7 +115,7 @@ func BenchmarkStorePutUpdate(b *testing.B) {
// some synchronization operations, such as mutex locking. // some synchronization operations, such as mutex locking.
func BenchmarkStoreTxnPut(b *testing.B) { func BenchmarkStoreTxnPut(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath) defer cleanup(s, be, tmpPath)
// arbitrary number of bytes // arbitrary number of bytes
@@ -130,7 +135,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
// benchmarkStoreRestore benchmarks the restore operation // benchmarkStoreRestore benchmarks the restore operation
func benchmarkStoreRestore(revsPerKey int, b *testing.B) { func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b) be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
// use closure to capture 's' to pick up the reassignment // use closure to capture 's' to pick up the reassignment
defer func() { cleanup(s, be, tmpPath) }() defer func() { cleanup(s, be, tmpPath) }()
@@ -146,11 +151,11 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
txn.End() txn.End()
} }
} }
s.Close() assert.NoError(b, s.Close())
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
} }
func BenchmarkStoreRestoreRevs1(b *testing.B) { func BenchmarkStoreRestoreRevs1(b *testing.B) {

View File

@@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
func (tw *storeTxnWrite) End() { func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state. // only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 { if len(tw.changes) != 0 {
tw.s.saveIndex(tw.tx)
// hold revMu lock to prevent new read txns from opening until writeback. // hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock() tw.s.revMu.Lock()
tw.s.currentRev++ tw.s.currentRev++

View File

@@ -20,7 +20,6 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
@@ -70,16 +69,16 @@ type watchableStore struct {
// cancel operations. // cancel operations.
type cancelFunc func() type cancelFunc func()
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, ci, cfg) return newWatchableStore(lg, b, le, cfg)
} }
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore { func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
if lg == nil { if lg == nil {
lg = zap.NewNop() lg = zap.NewNop()
} }
s := &watchableStore{ s := &watchableStore{
store: NewStore(lg, b, le, ci, cfg), store: NewStore(lg, b, le, cfg),
victimc: make(chan struct{}, 1), victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(), unsynced: newWatcherGroup(),
synced: newWatcherGroup(), synced: newWatcherGroup(),