Merge pull request #11752 from tangcong/automated-cherry-pick-of-#11652-#11670-#11710-origin-release-3.4

Automated cherry pick of #11652 #11670 #11710
This commit is contained in:
Jingyi Hu 2020-04-10 23:21:45 +08:00 committed by GitHub
commit f1eca4e1fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 210 additions and 53 deletions

42
auth/metrics.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"github.com/prometheus/client_golang/prometheus"
"sync"
)
var (
currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "auth",
Name: "revision",
Help: "The current revision of auth store.",
},
func() float64 {
reportCurrentAuthRevMu.RLock()
defer reportCurrentAuthRevMu.RUnlock()
return reportCurrentAuthRev()
},
)
// overridden by auth store initialization
reportCurrentAuthRevMu sync.RWMutex
reportCurrentAuthRev = func() float64 { return 0 }
)
func init() {
prometheus.MustRegister(currentAuthRevision)
}

View File

@ -94,6 +94,9 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}
// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)
// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
@ -186,6 +189,9 @@ type AuthStore interface {
// HasRole checks that user has role
HasRole(user, role string) bool
// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}
type TokenProvider interface {
@ -209,10 +215,14 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
}
func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
@ -269,6 +279,7 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()
@ -430,6 +441,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(as.lg, tx, newUser)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info("added a user", zap.String("user-name", r.Name))
@ -461,6 +473,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
@ -513,6 +526,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(as.lg, tx, updatedUser)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
@ -569,6 +583,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info(
@ -655,6 +670,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info(
@ -729,6 +745,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info(
@ -788,6 +805,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
@ -818,6 +836,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(as.lg, tx, newRole)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info("created a role", zap.String("role-name", r.Name))
@ -881,6 +900,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()
as.commitRevision(tx)
as.saveConsistentIndex(tx)
if as.lg != nil {
as.lg.Info(
@ -904,8 +924,21 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
if revision == 0 {
return ErrUserEmpty
}
if revision < as.Revision() {
rev := as.Revision()
if revision < rev {
if as.lg != nil {
as.lg.Warn("request auth revision is less than current node auth revision",
zap.Uint64("current node auth revision", rev),
zap.Uint64("request auth revision", revision),
zap.ByteString("request key", key),
zap.Error(ErrAuthOldRevision))
} else {
plog.Warningf("request auth revision is less than current node auth revision,"+
"current node auth revision is %d,"+
"request auth revision is %d,"+
"request key is %s, "+
"err is %v", rev, revision, key, ErrAuthOldRevision)
}
return ErrAuthOldRevision
}
@ -1145,6 +1178,8 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
as.commitRevision(tx)
}
as.setupMetricsReporter()
tx.Unlock()
be.ForceCommit()
@ -1419,3 +1454,23 @@ func (as *authStore) HasRole(user, role string) bool {
func (as *authStore) BcryptCost() int {
return as.bcryptCost
}
func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
} else {
if as.lg != nil {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
} else {
plog.Error("failed to save consistentIndex,syncConsistentIndex is nil")
}
}
}
func (as *authStore) setupMetricsReporter() {
reportCurrentAuthRevMu.Lock()
reportCurrentAuthRev = func() float64 {
return float64(as.Revision())
}
reportCurrentAuthRevMu.Unlock()
}

View File

@ -116,6 +116,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
}(time.Now())
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls

View File

@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil

View File

@ -540,7 +540,23 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
if cfg.Logger != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
} else {
plog.Warningf("failed to create token provider,err is %v", err)
}
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
@ -569,20 +585,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}()
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
if cfg.Logger != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
} else {
plog.Errorf("failed to create token provider: %s", err)
}
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {

View File

@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
v2store: st,
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
ch := make(chan struct{}, 2)
@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
srv.start()
@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()

View File

@ -111,6 +111,25 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
}
func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
d := time.Since(now)
if lg != nil {
lg.Warn(
"failed to apply request",
zap.Duration("took", d),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
} else {
plog.Warningf("failed to apply request %q with response %q took (%v) to execute, err is %v", reqStringer.String(), resp, d, err)
}
}
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string

View File

@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) {
func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()

View File

@ -15,6 +15,7 @@
package mvcc
import (
"go.etcd.io/etcd/auth"
"sync"
"time"
@ -69,11 +70,11 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, ig, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
}
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
s := &watchableStore{
store: NewStore(lg, b, le, ig, cfg),
victimc: make(chan struct{}, 1),
@ -87,6 +88,10 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
}
if as != nil {
// TODO: encapsulating consistentindex into a separate package
as.SetConsistentIndexSyncer(s.store.saveIndex)
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()

View File

@ -28,7 +28,7 @@ import (
func BenchmarkWatchableStorePut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -49,7 +49,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
k := []byte("testkey")
@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()

View File

@ -32,7 +32,7 @@ import (
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -54,7 +54,7 @@ func TestWatch(t *testing.T) {
func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -224,7 +224,7 @@ func TestSyncWatchers(t *testing.T) {
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -261,7 +261,7 @@ func TestWatchCompacted(t *testing.T) {
func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -302,7 +302,7 @@ func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s, b, tmpPath)
testKey := []byte("foo")
@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) {
rev := s.Put(testKey, testValue, lease.NoLease)
newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{})
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(newStore, newBackend, newPath)
w := newStore.NewWatchStream()
@ -348,11 +348,11 @@ func TestWatchRestore(t *testing.T) {
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{})
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{})
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
@ -399,7 +399,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
oldMaxRevs := watchBatchMaxRevs
defer func() {
@ -533,7 +533,7 @@ func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -611,7 +611,7 @@ func TestWatchVictims(t *testing.T) {
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()

View File

@ -26,7 +26,7 @@ import (
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(watchable, be, tmpPath)

View File

@ -32,7 +32,7 @@ import (
// and the watched event attaches the correct watchID.
func TestWatcherWatchID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
func TestWatcherRequestsCustomID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
// does not create watcher, which panics when canceling in range tree.
func TestWatcherWatchWrongRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
func TestWatchDeleteRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
func TestWatcherWatchWithFilter(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()

View File

@ -17,6 +17,7 @@ package main
import (
"encoding/binary"
"fmt"
"go.etcd.io/etcd/auth/authpb"
"path/filepath"
"go.etcd.io/etcd/lease/leasepb"
@ -52,8 +53,11 @@ func getBuckets(dbPath string) (buckets []string, err error) {
type decoder func(k, v []byte)
var decoders = map[string]decoder{
"key": keyDecoder,
"lease": leaseDecoder,
"key": keyDecoder,
"lease": leaseDecoder,
"auth": authDecoder,
"authRoles": authRolesDecoder,
"authUsers": authUsersDecoder,
}
type revision struct {
@ -93,6 +97,33 @@ func leaseDecoder(k, v []byte) {
fmt.Printf("lease ID=%016x, TTL=%ds\n", leaseID, lpb.TTL)
}
func authDecoder(k, v []byte) {
if string(k) == "authRevision" {
rev := binary.BigEndian.Uint64(v)
fmt.Printf("key=%q, value=%v\n", k, rev)
} else {
fmt.Printf("key=%q, value=%v\n", k, v)
}
}
func authRolesDecoder(k, v []byte) {
role := &authpb.Role{}
err := role.Unmarshal(v)
if err != nil {
panic(err)
}
fmt.Printf("role=%q, keyPermission=%v\n", string(role.Name), role.KeyPermission)
}
func authUsersDecoder(k, v []byte) {
user := &authpb.User{}
err := user.Unmarshal(v)
if err != nil {
panic(err)
}
fmt.Printf("user=%q, roles=%q, password=%q, option=%v\n", user.Name, user.Roles, string(user.Password), user.Options)
}
func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) {
db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout})
if err != nil {