mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11652 from tangcong/fix-auth-store-corruption-bug
auth/store: save consistentIndex to fix a data corruption bug
This commit is contained in:
commit
cb633418a2
42
auth/metrics.go
Normal file
42
auth/metrics.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
// Copyright 2015 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd_debugging",
|
||||||
|
Subsystem: "auth",
|
||||||
|
Name: "revision",
|
||||||
|
Help: "The current revision of auth store.",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportCurrentAuthRevMu.RLock()
|
||||||
|
defer reportCurrentAuthRevMu.RUnlock()
|
||||||
|
return reportCurrentAuthRev()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// overridden by auth store initialization
|
||||||
|
reportCurrentAuthRevMu sync.RWMutex
|
||||||
|
reportCurrentAuthRev = func() float64 { return 0 }
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(currentAuthRevision)
|
||||||
|
}
|
@ -91,6 +91,9 @@ type AuthenticateParamIndex struct{}
|
|||||||
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
|
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
|
||||||
type AuthenticateParamSimpleTokenPrefix struct{}
|
type AuthenticateParamSimpleTokenPrefix struct{}
|
||||||
|
|
||||||
|
// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
|
||||||
|
type saveConsistentIndexFunc func(tx backend.BatchTx)
|
||||||
|
|
||||||
// AuthStore defines auth storage interface.
|
// AuthStore defines auth storage interface.
|
||||||
type AuthStore interface {
|
type AuthStore interface {
|
||||||
// AuthEnable turns on the authentication feature
|
// AuthEnable turns on the authentication feature
|
||||||
@ -183,6 +186,9 @@ type AuthStore interface {
|
|||||||
|
|
||||||
// HasRole checks that user has role
|
// HasRole checks that user has role
|
||||||
HasRole(user, role string) bool
|
HasRole(user, role string) bool
|
||||||
|
|
||||||
|
// SetConsistentIndexSyncer sets consistentIndex syncer
|
||||||
|
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
type TokenProvider interface {
|
type TokenProvider interface {
|
||||||
@ -207,9 +213,13 @@ type authStore struct {
|
|||||||
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
|
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
|
||||||
|
|
||||||
tokenProvider TokenProvider
|
tokenProvider TokenProvider
|
||||||
|
syncConsistentIndex saveConsistentIndexFunc
|
||||||
bcryptCost int // the algorithm cost / strength for hashing auth passwords
|
bcryptCost int // the algorithm cost / strength for hashing auth passwords
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
|
||||||
|
as.syncConsistentIndex = syncer
|
||||||
|
}
|
||||||
func (as *authStore) AuthEnable() error {
|
func (as *authStore) AuthEnable() error {
|
||||||
as.enabledMu.Lock()
|
as.enabledMu.Lock()
|
||||||
defer as.enabledMu.Unlock()
|
defer as.enabledMu.Unlock()
|
||||||
@ -258,6 +268,7 @@ func (as *authStore) AuthDisable() {
|
|||||||
tx.Lock()
|
tx.Lock()
|
||||||
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
|
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
b.ForceCommit()
|
b.ForceCommit()
|
||||||
|
|
||||||
@ -403,6 +414,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
|
|||||||
putUser(as.lg, tx, newUser)
|
putUser(as.lg, tx, newUser)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info("added a user", zap.String("user-name", r.Name))
|
as.lg.Info("added a user", zap.String("user-name", r.Name))
|
||||||
return &pb.AuthUserAddResponse{}, nil
|
return &pb.AuthUserAddResponse{}, nil
|
||||||
@ -426,6 +438,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
|
|||||||
delUser(tx, r.Name)
|
delUser(tx, r.Name)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.invalidateCachedPerm(r.Name)
|
as.invalidateCachedPerm(r.Name)
|
||||||
as.tokenProvider.invalidateUser(r.Name)
|
as.tokenProvider.invalidateUser(r.Name)
|
||||||
@ -470,6 +483,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
|
|||||||
putUser(as.lg, tx, updatedUser)
|
putUser(as.lg, tx, updatedUser)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.invalidateCachedPerm(r.Name)
|
as.invalidateCachedPerm(r.Name)
|
||||||
as.tokenProvider.invalidateUser(r.Name)
|
as.tokenProvider.invalidateUser(r.Name)
|
||||||
@ -518,6 +532,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
|
|||||||
as.invalidateCachedPerm(r.User)
|
as.invalidateCachedPerm(r.User)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info(
|
as.lg.Info(
|
||||||
"granted a role to a user",
|
"granted a role to a user",
|
||||||
@ -596,6 +611,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
|
|||||||
as.invalidateCachedPerm(r.Name)
|
as.invalidateCachedPerm(r.Name)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info(
|
as.lg.Info(
|
||||||
"revoked a role from a user",
|
"revoked a role from a user",
|
||||||
@ -666,6 +682,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
|
|||||||
as.clearCachedPerm()
|
as.clearCachedPerm()
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info(
|
as.lg.Info(
|
||||||
"revoked a permission on range",
|
"revoked a permission on range",
|
||||||
@ -717,6 +734,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
|
|||||||
}
|
}
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
|
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
|
||||||
return &pb.AuthRoleDeleteResponse{}, nil
|
return &pb.AuthRoleDeleteResponse{}, nil
|
||||||
@ -743,6 +761,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
|
|||||||
putRole(as.lg, tx, newRole)
|
putRole(as.lg, tx, newRole)
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info("created a role", zap.String("role-name", r.Name))
|
as.lg.Info("created a role", zap.String("role-name", r.Name))
|
||||||
return &pb.AuthRoleAddResponse{}, nil
|
return &pb.AuthRoleAddResponse{}, nil
|
||||||
@ -781,6 +800,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
|
|||||||
})
|
})
|
||||||
|
|
||||||
if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) {
|
if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) {
|
||||||
|
if role.KeyPermission[idx].PermType == r.Perm.PermType {
|
||||||
|
as.lg.Warn(
|
||||||
|
"ignored grant permission request to a role, existing permission",
|
||||||
|
zap.String("role-name", r.Name),
|
||||||
|
zap.ByteString("key", r.Perm.Key),
|
||||||
|
zap.ByteString("range-end", r.Perm.RangeEnd),
|
||||||
|
zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]),
|
||||||
|
)
|
||||||
|
return &pb.AuthRoleGrantPermissionResponse{}, nil
|
||||||
|
}
|
||||||
// update existing permission
|
// update existing permission
|
||||||
role.KeyPermission[idx].PermType = r.Perm.PermType
|
role.KeyPermission[idx].PermType = r.Perm.PermType
|
||||||
} else {
|
} else {
|
||||||
@ -802,6 +831,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
|
|||||||
as.clearCachedPerm()
|
as.clearCachedPerm()
|
||||||
|
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
|
|
||||||
as.lg.Info(
|
as.lg.Info(
|
||||||
"granted/updated a permission to a user",
|
"granted/updated a permission to a user",
|
||||||
@ -1035,8 +1065,11 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
|
|||||||
|
|
||||||
if as.Revision() == 0 {
|
if as.Revision() == 0 {
|
||||||
as.commitRevision(tx)
|
as.commitRevision(tx)
|
||||||
|
as.saveConsistentIndex(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
as.setupMetricsReporter()
|
||||||
|
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
be.ForceCommit()
|
be.ForceCommit()
|
||||||
|
|
||||||
@ -1279,3 +1312,19 @@ func (as *authStore) HasRole(user, role string) bool {
|
|||||||
func (as *authStore) BcryptCost() int {
|
func (as *authStore) BcryptCost() int {
|
||||||
return as.bcryptCost
|
return as.bcryptCost
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
|
||||||
|
if as.syncConsistentIndex != nil {
|
||||||
|
as.syncConsistentIndex(tx)
|
||||||
|
} else {
|
||||||
|
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *authStore) setupMetricsReporter() {
|
||||||
|
reportCurrentAuthRevMu.Lock()
|
||||||
|
reportCurrentAuthRev = func() float64 {
|
||||||
|
return float64(as.Revision())
|
||||||
|
}
|
||||||
|
reportCurrentAuthRevMu.Unlock()
|
||||||
|
}
|
||||||
|
@ -414,6 +414,55 @@ func TestListUsers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRoleGrantPermissionRevision(t *testing.T) {
|
||||||
|
as, tearDown := setupAuthStore(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
_, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
perm := &authpb.Permission{
|
||||||
|
PermType: authpb.WRITE,
|
||||||
|
Key: []byte("Keys"),
|
||||||
|
RangeEnd: []byte("RangeEnd"),
|
||||||
|
}
|
||||||
|
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
|
||||||
|
Name: "role-test-1",
|
||||||
|
Perm: perm,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(perm, r.Perm[0]) {
|
||||||
|
t.Errorf("expected %v, got %v", perm, r.Perm[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
oldRevision := as.Revision()
|
||||||
|
|
||||||
|
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
|
||||||
|
Name: "role-test-1",
|
||||||
|
Perm: perm,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
newRevision := as.Revision()
|
||||||
|
|
||||||
|
if oldRevision != newRevision {
|
||||||
|
t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRoleGrantPermission(t *testing.T) {
|
func TestRoleGrantPermission(t *testing.T) {
|
||||||
as, tearDown := setupAuthStore(t)
|
as, tearDown := setupAuthStore(t)
|
||||||
defer tearDown(t)
|
defer tearDown(t)
|
||||||
|
@ -95,7 +95,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
|
|||||||
// case, replace the db with the snapshot db sent by the leader.
|
// case, replace the db with the snapshot db sent by the leader.
|
||||||
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
var cIndex consistentIndex
|
var cIndex consistentIndex
|
||||||
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
defer kv.Close()
|
defer kv.Close()
|
||||||
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
||||||
return oldbe, nil
|
return oldbe, nil
|
||||||
|
@ -514,7 +514,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||||
})
|
})
|
||||||
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
|
||||||
|
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
|
||||||
|
func(index uint64) <-chan struct{} {
|
||||||
|
return srv.applyWait.Wait(index)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
|
||||||
|
|
||||||
|
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
if beExist {
|
if beExist {
|
||||||
kvindex := srv.kv.ConsistentIndex()
|
kvindex := srv.kv.ConsistentIndex()
|
||||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||||
@ -539,16 +551,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||||
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
|
|
||||||
func(index uint64) <-chan struct{} {
|
|
||||||
return srv.applyWait.Wait(index)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
|
|
||||||
if num := cfg.AutoCompactionRetention; num != 0 {
|
if num := cfg.AutoCompactionRetention; num != 0 {
|
||||||
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
|
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -983,7 +983,7 @@ func TestSnapshot(t *testing.T) {
|
|||||||
r: *r,
|
r: *r,
|
||||||
v2store: st,
|
v2store: st,
|
||||||
}
|
}
|
||||||
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
|
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
|
||||||
srv.be = be
|
srv.be = be
|
||||||
|
|
||||||
ch := make(chan struct{}, 2)
|
ch := make(chan struct{}, 2)
|
||||||
@ -1064,7 +1064,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
|
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
defer os.RemoveAll(tmpPath)
|
defer os.RemoveAll(tmpPath)
|
||||||
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
|
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
|
||||||
s.be = be
|
s.be = be
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
@ -1125,7 +1125,7 @@ func TestTriggerSnap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||||
|
|
||||||
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
|
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
|
||||||
srv.be = be
|
srv.be = be
|
||||||
|
|
||||||
srv.start()
|
srv.start()
|
||||||
@ -1197,7 +1197,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
os.RemoveAll(tmpPath)
|
os.RemoveAll(tmpPath)
|
||||||
}()
|
}()
|
||||||
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
|
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
|
||||||
s.be = be
|
s.be = be
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
|
@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchableKVWatch(t *testing.T) {
|
func TestWatchableKVWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"go.etcd.io/etcd/auth"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -69,11 +70,11 @@ type watchableStore struct {
|
|||||||
// cancel operations.
|
// cancel operations.
|
||||||
type cancelFunc func()
|
type cancelFunc func()
|
||||||
|
|
||||||
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
|
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
|
||||||
return newWatchableStore(lg, b, le, ig, cfg)
|
return newWatchableStore(lg, b, le, as, ig, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
|
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
|
||||||
if lg == nil {
|
if lg == nil {
|
||||||
lg = zap.NewNop()
|
lg = zap.NewNop()
|
||||||
}
|
}
|
||||||
@ -90,6 +91,10 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
|
|||||||
// use this store as the deleter so revokes trigger watch events
|
// use this store as the deleter so revokes trigger watch events
|
||||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
||||||
}
|
}
|
||||||
|
if as != nil {
|
||||||
|
// TODO: encapsulating consistentindex into a separate package
|
||||||
|
as.SetConsistentIndexSyncer(s.store.saveIndex)
|
||||||
|
}
|
||||||
s.wg.Add(2)
|
s.wg.Add(2)
|
||||||
go s.syncWatchersLoop()
|
go s.syncWatchersLoop()
|
||||||
go s.syncVictimsLoop()
|
go s.syncVictimsLoop()
|
||||||
|
@ -28,7 +28,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkWatchableStorePut(b *testing.B) {
|
func BenchmarkWatchableStorePut(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -49,7 +49,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
|
|||||||
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
|
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
|
||||||
var i fakeConsistentIndex
|
var i fakeConsistentIndex
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
|
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
// arbitrary number of bytes
|
// arbitrary number of bytes
|
||||||
@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
|
|||||||
|
|
||||||
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(s, be, tmpPath)
|
defer cleanup(s, be, tmpPath)
|
||||||
|
|
||||||
k := []byte("testkey")
|
k := []byte("testkey")
|
||||||
@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||||||
|
|
||||||
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -54,7 +54,7 @@ func TestWatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestNewWatcherCancel(t *testing.T) {
|
func TestNewWatcherCancel(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -224,7 +224,7 @@ func TestSyncWatchers(t *testing.T) {
|
|||||||
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
||||||
func TestWatchCompacted(t *testing.T) {
|
func TestWatchCompacted(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -261,7 +261,7 @@ func TestWatchCompacted(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchFutureRev(t *testing.T) {
|
func TestWatchFutureRev(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -302,7 +302,7 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
test := func(delay time.Duration) func(t *testing.T) {
|
test := func(delay time.Duration) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
testKey := []byte("foo")
|
testKey := []byte("foo")
|
||||||
@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||||
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{})
|
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(newStore, newBackend, newPath)
|
defer cleanup(newStore, newBackend, newPath)
|
||||||
|
|
||||||
w := newStore.NewWatchStream()
|
w := newStore.NewWatchStream()
|
||||||
@ -348,11 +348,11 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
// 5. choose the watcher from step 1, without panic
|
// 5. choose the watcher from step 1, without panic
|
||||||
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||||
b1, b1Path := backend.NewDefaultTmpBackend()
|
b1, b1Path := backend.NewDefaultTmpBackend()
|
||||||
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{})
|
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(s1, b1, b1Path)
|
defer cleanup(s1, b1, b1Path)
|
||||||
|
|
||||||
b2, b2Path := backend.NewDefaultTmpBackend()
|
b2, b2Path := backend.NewDefaultTmpBackend()
|
||||||
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{})
|
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
defer cleanup(s2, b2, b2Path)
|
defer cleanup(s2, b2, b2Path)
|
||||||
|
|
||||||
testKey, testValue := []byte("foo"), []byte("bar")
|
testKey, testValue := []byte("foo"), []byte("bar")
|
||||||
@ -399,7 +399,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
|||||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||||
func TestWatchBatchUnsynced(t *testing.T) {
|
func TestWatchBatchUnsynced(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
oldMaxRevs := watchBatchMaxRevs
|
oldMaxRevs := watchBatchMaxRevs
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -533,7 +533,7 @@ func TestWatchVictims(t *testing.T) {
|
|||||||
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||||
|
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -611,7 +611,7 @@ func TestWatchVictims(t *testing.T) {
|
|||||||
// canceling its watches.
|
// canceling its watches.
|
||||||
func TestStressWatchCancelClose(t *testing.T) {
|
func TestStressWatchCancelClose(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
|
|
||||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
|
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer cleanup(watchable, be, tmpPath)
|
defer cleanup(watchable, be, tmpPath)
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
// and the watched event attaches the correct watchID.
|
// and the watched event attaches the correct watchID.
|
||||||
func TestWatcherWatchID(t *testing.T) {
|
func TestWatcherWatchID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatcherRequestsCustomID(t *testing.T) {
|
func TestWatcherRequestsCustomID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
|
|||||||
// and returns events with matching prefixes.
|
// and returns events with matching prefixes.
|
||||||
func TestWatcherWatchPrefix(t *testing.T) {
|
func TestWatcherWatchPrefix(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
|||||||
// does not create watcher, which panics when canceling in range tree.
|
// does not create watcher, which panics when canceling in range tree.
|
||||||
func TestWatcherWatchWrongRange(t *testing.T) {
|
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchDeleteRange(t *testing.T) {
|
func TestWatchDeleteRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.store.Close()
|
s.store.Close()
|
||||||
@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
|
|||||||
// with given id inside watchStream.
|
// with given id inside watchStream.
|
||||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatcherWatchWithFilter(t *testing.T) {
|
func TestWatcherWatchWithFilter(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
|
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||||
defer cleanup(s, b, tmpPath)
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
|
@ -17,6 +17,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.etcd.io/etcd/auth/authpb"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"go.etcd.io/etcd/lease/leasepb"
|
"go.etcd.io/etcd/lease/leasepb"
|
||||||
@ -54,6 +55,9 @@ type decoder func(k, v []byte)
|
|||||||
var decoders = map[string]decoder{
|
var decoders = map[string]decoder{
|
||||||
"key": keyDecoder,
|
"key": keyDecoder,
|
||||||
"lease": leaseDecoder,
|
"lease": leaseDecoder,
|
||||||
|
"auth": authDecoder,
|
||||||
|
"authRoles": authRolesDecoder,
|
||||||
|
"authUsers": authUsersDecoder,
|
||||||
}
|
}
|
||||||
|
|
||||||
type revision struct {
|
type revision struct {
|
||||||
@ -93,6 +97,33 @@ func leaseDecoder(k, v []byte) {
|
|||||||
fmt.Printf("lease ID=%016x, TTL=%ds\n", leaseID, lpb.TTL)
|
fmt.Printf("lease ID=%016x, TTL=%ds\n", leaseID, lpb.TTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func authDecoder(k, v []byte) {
|
||||||
|
if string(k) == "authRevision" {
|
||||||
|
rev := binary.BigEndian.Uint64(v)
|
||||||
|
fmt.Printf("key=%q, value=%v\n", k, rev)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("key=%q, value=%v\n", k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func authRolesDecoder(k, v []byte) {
|
||||||
|
role := &authpb.Role{}
|
||||||
|
err := role.Unmarshal(v)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fmt.Printf("role=%q, keyPermission=%v\n", string(role.Name), role.KeyPermission)
|
||||||
|
}
|
||||||
|
|
||||||
|
func authUsersDecoder(k, v []byte) {
|
||||||
|
user := &authpb.User{}
|
||||||
|
err := user.Unmarshal(v)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fmt.Printf("user=%q, roles=%q, password=%q, option=%v\n", user.Name, user.Roles, string(user.Password), user.Options)
|
||||||
|
}
|
||||||
|
|
||||||
func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) {
|
func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) {
|
||||||
db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout})
|
db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user