*: fix auth revision corruption bug

This commit is contained in:
tangcong 2020-02-24 17:17:50 +08:00
parent 9fd7e2b802
commit 140bf5321d
11 changed files with 184 additions and 32 deletions

View File

@ -90,6 +90,10 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}
// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)
// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
AuthEnable() error
@ -178,6 +182,9 @@ type AuthStore interface {
// HasRole checks that user has role
HasRole(user, role string) bool
// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}
type TokenProvider interface {
@ -200,9 +207,14 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
tokenProvider TokenProvider
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
}
func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
@ -252,6 +264,7 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()
@ -368,6 +381,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(tx, newUser)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("added a new user: %s", r.Name)
@ -392,6 +406,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
@ -428,6 +443,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(tx, updatedUser)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
@ -468,6 +484,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("granted role %s to user %s", r.Role, r.User)
return &pb.AuthUserGrantRoleResponse{}, nil
@ -536,6 +553,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("revoked role %s from user %s", r.Role, r.Name)
return &pb.AuthUserRevokeRoleResponse{}, nil
@ -600,6 +618,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("revoked key %s from role %s", r.Key, r.Role)
return &pb.AuthRoleRevokePermissionResponse{}, nil
@ -645,6 +664,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("deleted role %s", r.Role)
return &pb.AuthRoleDeleteResponse{}, nil
@ -667,6 +687,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(tx, newRole)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("Role %s is created", r.Name)
@ -706,6 +727,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
})
if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) {
if role.KeyPermission[idx].PermType == r.Perm.PermType {
as.lg.Warn(
"ignored grant permission request to a role, existing permission",
zap.String("role-name", r.Name),
zap.ByteString("key", r.Perm.Key),
zap.ByteString("range-end", r.Perm.RangeEnd),
zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]),
)
return &pb.AuthRoleGrantPermissionResponse{}, nil
}
// update existing permission
role.KeyPermission[idx].PermType = r.Perm.PermType
} else {
@ -727,6 +758,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()
as.commitRevision(tx)
as.saveConsistentIndex(tx)
plog.Noticef("role %s's permission of key %s is updated as %s", r.Name, r.Perm.Key, authpb.Permission_Type_name[int32(r.Perm.PermType)])
@ -931,6 +963,7 @@ func NewAuthStore(be backend.Backend, tp TokenProvider) *authStore {
if as.Revision() == 0 {
as.commitRevision(tx)
as.saveConsistentIndex(tx)
}
tx.Unlock()
@ -1134,3 +1167,11 @@ func (as *authStore) HasRole(user, role string) bool {
return false
}
func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
} else {
plog.Errorf("failed to save consistentIndex,syncConsistentIndex is nil")
}
}

View File

@ -301,6 +301,55 @@ func TestListUsers(t *testing.T) {
}
}
func TestRoleGrantPermissionRevision(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)
_, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"})
if err != nil {
t.Fatal(err)
}
perm := &authpb.Permission{
PermType: authpb.WRITE,
Key: []byte("Keys"),
RangeEnd: []byte("RangeEnd"),
}
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "role-test-1",
Perm: perm,
})
if err != nil {
t.Fatal(err)
}
r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"})
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(perm, r.Perm[0]) {
t.Errorf("expected %v, got %v", perm, r.Perm[0])
}
oldRevision := as.Revision()
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "role-test-1",
Perm: perm,
})
if err != nil {
t.Error(err)
}
newRevision := as.Revision()
if oldRevision != newRevision {
t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision)
}
}
func TestRoleGrantPermission(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)

View File

@ -71,7 +71,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil

View File

@ -447,8 +447,27 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
plog.Warningf("failed to create token provider,err is %v", err)
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
@ -470,16 +489,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}()
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
tp, err := auth.NewTokenProvider(cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
plog.Errorf("failed to create token provider: %s", err)
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.be, tp)
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {

View File

@ -916,7 +916,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
store: st,
}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
ch := make(chan struct{}, 2)
@ -994,7 +994,7 @@ func TestSnapshotOrdering(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1052,7 +1052,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
srv.start()
@ -1121,7 +1121,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()

View File

@ -710,7 +710,7 @@ func TestKVSnapshot(t *testing.T) {
func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()

View File

@ -15,6 +15,7 @@
package mvcc
import (
"go.etcd.io/etcd/auth"
"sync"
"time"
@ -67,11 +68,11 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()
func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newWatchableStore(b, le, ig)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
}
func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
s := &watchableStore{
store: NewStore(b, le, ig),
victimc: make(chan struct{}, 1),
@ -85,6 +86,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
if as != nil {
// TODO: encapsulating consistentindex into a separate package
as.SetConsistentIndexSyncer(s.store.saveIndex)
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()

View File

@ -25,7 +25,7 @@ import (
func BenchmarkWatchableStorePut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(be, &lease.FakeLessor{}, nil)
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -46,7 +46,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(be, &lease.FakeLessor{}, &i)
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -67,7 +67,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
// many synced watchers receiving a Put notification.
func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(be, &lease.FakeLessor{}, nil)
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
k := []byte("testkey")
@ -162,7 +162,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(be, &lease.FakeLessor{}, nil)
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()

View File

@ -30,7 +30,11 @@ import (
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()
@ -52,7 +56,11 @@ func TestWatch(t *testing.T) {
func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()
@ -222,7 +230,11 @@ func TestSyncWatchers(t *testing.T) {
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()
@ -259,7 +271,11 @@ func TestWatchCompacted(t *testing.T) {
func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()
@ -300,7 +316,11 @@ func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer cleanup(s, b, tmpPath)
testKey := []byte("foo")
@ -308,7 +328,11 @@ func TestWatchRestore(t *testing.T) {
rev := s.Put(testKey, testValue, lease.NoLease)
newBackend, newPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
=======
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer cleanup(newStore, newBackend, newPath)
w := newStore.NewWatchStream()
@ -346,11 +370,19 @@ func TestWatchRestore(t *testing.T) {
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
=======
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
@ -397,7 +429,11 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
oldMaxRevs := watchBatchMaxRevs
defer func() {
@ -531,7 +567,11 @@ func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()
@ -609,7 +649,11 @@ func TestWatchVictims(t *testing.T) {
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer func() {
s.store.Close()

View File

@ -24,7 +24,11 @@ import (
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
watchable := newWatchableStore(be, &lease.FakeLessor{}, nil)
=======
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
defer cleanup(watchable, be, tmpPath)

View File

@ -31,7 +31,7 @@ import (
// and the watched event attaches the correct watchID.
func TestWatcherWatchID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -83,7 +83,7 @@ func TestWatcherWatchID(t *testing.T) {
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -157,7 +157,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
// does not create watcher, which panics when canceling in range tree.
func TestWatcherWatchWrongRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -177,7 +177,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
func TestWatchDeleteRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -216,7 +216,7 @@ func TestWatchDeleteRange(t *testing.T) {
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -308,7 +308,7 @@ func TestWatcherRequestProgress(t *testing.T) {
func TestWatcherWatchWithFilter(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()