From 5b6f4579fbf0799758611c18f24664d50f654c3e Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 8 Jul 2021 15:50:55 +0200 Subject: [PATCH] server: Rename buckets to schema --- etcdutl/etcdutl/backup_command.go | 8 +- etcdutl/snapshot/v3_snapshot.go | 8 +- server/auth/range_perm_cache.go | 6 +- server/auth/store.go | 82 +++++++++---------- server/etcdserver/api/v3alarm/alarms.go | 10 +-- server/etcdserver/api/v3rpc/maintenance.go | 4 +- server/etcdserver/backend.go | 4 +- server/etcdserver/bootstrap.go | 10 +-- server/etcdserver/cindex/cindex.go | 8 +- server/etcdserver/cindex/cindex_test.go | 4 +- server/etcdserver/server.go | 6 +- server/etcdserver/server_test.go | 8 +- server/etcdserver/version/version.go | 14 ++-- server/etcdserver/version/version_test.go | 28 +++---- server/lease/lessor.go | 10 +-- server/lease/lessor_test.go | 6 +- server/storage/backend/backend_bench_test.go | 6 +- server/storage/backend/backend_test.go | 58 ++++++------- server/storage/backend/batch_tx_test.go | 34 ++++---- server/storage/backend/hooks_test.go | 4 +- server/storage/mvcc/kvstore.go | 22 ++--- server/storage/mvcc/kvstore_bench_test.go | 4 +- server/storage/mvcc/kvstore_compaction.go | 6 +- .../storage/mvcc/kvstore_compaction_test.go | 8 +- server/storage/mvcc/kvstore_test.go | 32 ++++---- server/storage/mvcc/kvstore_txn.go | 8 +- server/storage/mvcc/store.go | 10 +-- server/storage/mvcc/store_test.go | 6 +- server/storage/mvcc/util.go | 4 +- server/storage/mvcc/watchable_store.go | 4 +- server/storage/{buckets => schema}/alarm.go | 2 +- server/storage/{buckets => schema}/auth.go | 2 +- .../storage/{buckets => schema}/auth_roles.go | 2 +- .../storage/{buckets => schema}/auth_test.go | 2 +- .../storage/{buckets => schema}/auth_users.go | 2 +- server/storage/{buckets => schema}/bucket.go | 2 +- server/storage/{buckets => schema}/cindex.go | 2 +- .../storage/{buckets => schema}/confstate.go | 2 +- .../{buckets => schema}/confstate_test.go | 2 +- server/storage/{buckets => schema}/lease.go | 2 +- .../storage/{buckets => schema}/membership.go | 2 +- server/storage/{buckets => schema}/version.go | 2 +- .../{buckets => schema}/version_test.go | 2 +- server/verify/verify.go | 4 +- tools/etcd-dump-db/backend.go | 4 +- 45 files changed, 228 insertions(+), 228 deletions(-) rename server/storage/{buckets => schema}/alarm.go (99%) rename server/storage/{buckets => schema}/auth.go (99%) rename server/storage/{buckets => schema}/auth_roles.go (99%) rename server/storage/{buckets => schema}/auth_test.go (99%) rename server/storage/{buckets => schema}/auth_users.go (99%) rename server/storage/{buckets => schema}/bucket.go (99%) rename server/storage/{buckets => schema}/cindex.go (99%) rename server/storage/{buckets => schema}/confstate.go (99%) rename server/storage/{buckets => schema}/confstate_test.go (99%) rename server/storage/{buckets => schema}/lease.go (99%) rename server/storage/{buckets => schema}/membership.go (99%) rename server/storage/{buckets => schema}/version.go (99%) rename server/storage/{buckets => schema}/version_test.go (99%) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 7fc594362..1f249ca5a 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -32,7 +32,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -311,7 +311,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir be := backend.NewDefaultBackend(destDB) defer be.Close() - ms := buckets.NewMembershipStore(lg, be) + ms := schema.NewMembershipStore(lg, be) if err := ms.TrimClusterFromBackend(); err != nil { lg.Fatal("bbolt tx.Membership failed", zap.Error(err)) } @@ -325,8 +325,8 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir tx := be.BatchTx() tx.Lock() defer tx.Unlock() - buckets.UnsafeCreateMetaBucket(tx) - buckets.UnsafeUpdateConsistentIndex(tx, idx, term, false) + schema.UnsafeCreateMetaBucket(tx) + schema.UnsafeUpdateConsistentIndex(tx, idx, term, false) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index fe86cc1af..eefcdb812 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -41,7 +41,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -136,7 +136,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) { return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings)) } ds.TotalSize = tx.Size() - v := buckets.ReadStorageVersionFromSnapshot(tx) + v := schema.ReadStorageVersionFromSnapshot(tx) if v != nil { ds.Version = v.String() } @@ -306,7 +306,7 @@ func (s *v3Manager) saveDB() error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - err = buckets.NewMembershipStore(s.lg, be).TrimMembershipFromBackend() + err = schema.NewMembershipStore(s.lg, be).TrimMembershipFromBackend() if err != nil { return err } @@ -403,7 +403,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { s.cl.SetStore(st) be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - s.cl.SetBackend(buckets.NewMembershipStore(s.lg, be)) + s.cl.SetBackend(schema.NewMembershipStore(s.lg, be)) for _, m := range s.cl.Members() { s.cl.AddMember(m, true) } diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index f4d0a1a92..aed9e6451 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -18,13 +18,13 @@ import ( "go.etcd.io/etcd/api/v3/authpb" "go.etcd.io/etcd/pkg/v3/adt" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions { - user := buckets.UnsafeGetUser(lg, tx, userName) + user := schema.UnsafeGetUser(lg, tx, userName) if user == nil { return nil } @@ -33,7 +33,7 @@ func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifie writePerms := adt.NewIntervalTree() for _, roleName := range user.Roles { - role := buckets.UnsafeGetRole(lg, tx, roleName) + role := schema.UnsafeGetRole(lg, tx, roleName) if role == nil { continue } diff --git a/server/auth/store.go b/server/auth/store.go index 05d016049..ceab53cdb 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -29,7 +29,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" @@ -225,7 +225,7 @@ func (as *authStore) AuthEnable() error { b.ForceCommit() }() - u := buckets.UnsafeGetUser(as.lg, tx, rootUser) + u := schema.UnsafeGetUser(as.lg, tx, rootUser) if u == nil { return ErrRootUserNotExist } @@ -234,7 +234,7 @@ func (as *authStore) AuthEnable() error { return ErrRootRoleNotExist } - buckets.UnsafeSaveAuthEnabled(tx, true) + schema.UnsafeSaveAuthEnabled(tx, true) as.enabled = true as.tokenProvider.enable() @@ -256,7 +256,7 @@ func (as *authStore) AuthDisable() { b := as.be tx := b.BatchTx() tx.Lock() - buckets.UnsafeSaveAuthEnabled(tx, false) + schema.UnsafeSaveAuthEnabled(tx, false) as.commitRevision(tx) tx.Unlock() b.ForceCommit() @@ -286,7 +286,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, username) + user := schema.UnsafeGetUser(as.lg, tx, username) if user == nil { return nil, ErrAuthFailed } @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { tx.Lock() defer tx.Unlock() - user = buckets.UnsafeGetUser(as.lg, tx, username) + user = schema.UnsafeGetUser(as.lg, tx, username) if user == nil { return 0, ErrAuthFailed } @@ -351,7 +351,7 @@ func (as *authStore) Recover(be backend.Backend) { tx := be.BatchTx() tx.Lock() - enabled := buckets.UnsafeReadAuthEnabled(tx) + enabled := schema.UnsafeReadAuthEnabled(tx) as.setRevision(getRevision(tx)) tx.Unlock() @@ -381,7 +381,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, r.Name) + user := schema.UnsafeGetUser(as.lg, tx, r.Name) if user != nil { return nil, ErrUserAlreadyExist } @@ -409,7 +409,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, Options: options, } - buckets.UnsafePutUser(as.lg, tx, newUser) + schema.UnsafePutUser(as.lg, tx, newUser) as.commitRevision(tx) @@ -427,12 +427,12 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, r.Name) + user := schema.UnsafeGetUser(as.lg, tx, r.Name) if user == nil { return nil, ErrUserNotFound } - buckets.UnsafeDeleteUser(tx, r.Name) + schema.UnsafeDeleteUser(tx, r.Name) as.commitRevision(tx) @@ -452,7 +452,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, r.Name) + user := schema.UnsafeGetUser(as.lg, tx, r.Name) if user == nil { return nil, ErrUserNotFound } @@ -474,7 +474,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p Options: user.Options, } - buckets.UnsafePutUser(as.lg, tx, updatedUser) + schema.UnsafePutUser(as.lg, tx, updatedUser) as.commitRevision(tx) @@ -494,13 +494,13 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, r.User) + user := schema.UnsafeGetUser(as.lg, tx, r.User) if user == nil { return nil, ErrUserNotFound } if r.Role != rootRole { - role := buckets.UnsafeGetRole(as.lg, tx, r.Role) + role := schema.UnsafeGetRole(as.lg, tx, r.Role) if role == nil { return nil, ErrRoleNotFound } @@ -520,7 +520,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser user.Roles = append(user.Roles, r.Role) sort.Strings(user.Roles) - buckets.UnsafePutUser(as.lg, tx, user) + schema.UnsafePutUser(as.lg, tx, user) as.invalidateCachedPerm(r.User) @@ -538,7 +538,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { tx := as.be.BatchTx() tx.Lock() - user := buckets.UnsafeGetUser(as.lg, tx, r.Name) + user := schema.UnsafeGetUser(as.lg, tx, r.Name) tx.Unlock() if user == nil { @@ -553,7 +553,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { tx := as.be.BatchTx() tx.Lock() - users := buckets.UnsafeGetAllUsers(as.lg, tx) + users := schema.UnsafeGetAllUsers(as.lg, tx) tx.Unlock() resp := &pb.AuthUserListResponse{Users: make([]string, len(users))} @@ -577,7 +577,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, r.Name) + user := schema.UnsafeGetUser(as.lg, tx, r.Name) if user == nil { return nil, ErrUserNotFound } @@ -598,7 +598,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs return nil, ErrRoleNotGranted } - buckets.UnsafePutUser(as.lg, tx, updatedUser) + schema.UnsafePutUser(as.lg, tx, updatedUser) as.invalidateCachedPerm(r.Name) @@ -621,7 +621,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, var resp pb.AuthRoleGetResponse - role := buckets.UnsafeGetRole(as.lg, tx, r.Role) + role := schema.UnsafeGetRole(as.lg, tx, r.Role) if role == nil { return nil, ErrRoleNotFound } @@ -636,7 +636,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { tx := as.be.BatchTx() tx.Lock() - roles := buckets.UnsafeGetAllRoles(as.lg, tx) + roles := schema.UnsafeGetAllRoles(as.lg, tx) tx.Unlock() resp := &pb.AuthRoleListResponse{Roles: make([]string, len(roles))} @@ -651,7 +651,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) tx.Lock() defer tx.Unlock() - role := buckets.UnsafeGetRole(as.lg, tx, r.Role) + role := schema.UnsafeGetRole(as.lg, tx, r.Role) if role == nil { return nil, ErrRoleNotFound } @@ -670,7 +670,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) return nil, ErrPermissionNotGranted } - buckets.UnsafePutRole(as.lg, tx, updatedRole) + schema.UnsafePutRole(as.lg, tx, updatedRole) // TODO(mitake): currently single role update invalidates every cache // It should be optimized. @@ -697,14 +697,14 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete tx.Lock() defer tx.Unlock() - role := buckets.UnsafeGetRole(as.lg, tx, r.Role) + role := schema.UnsafeGetRole(as.lg, tx, r.Role) if role == nil { return nil, ErrRoleNotFound } - buckets.UnsafeDeleteRole(tx, r.Role) + schema.UnsafeDeleteRole(tx, r.Role) - users := buckets.UnsafeGetAllUsers(as.lg, tx) + users := schema.UnsafeGetAllUsers(as.lg, tx) for _, user := range users { updatedUser := &authpb.User{ Name: user.Name, @@ -722,7 +722,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete continue } - buckets.UnsafePutUser(as.lg, tx, updatedUser) + schema.UnsafePutUser(as.lg, tx, updatedUser) as.invalidateCachedPerm(string(user.Name)) } @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, tx.Lock() defer tx.Unlock() - role := buckets.UnsafeGetRole(as.lg, tx, r.Name) + role := schema.UnsafeGetRole(as.lg, tx, r.Name) if role != nil { return nil, ErrRoleAlreadyExist } @@ -751,7 +751,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, Name: []byte(r.Name), } - buckets.UnsafePutRole(as.lg, tx, newRole) + schema.UnsafePutRole(as.lg, tx, newRole) as.commitRevision(tx) @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( tx.Lock() defer tx.Unlock() - role := buckets.UnsafeGetRole(as.lg, tx, r.Name) + role := schema.UnsafeGetRole(as.lg, tx, r.Name) if role == nil { return nil, ErrRoleNotFound } @@ -810,7 +810,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( sort.Sort(permSlice(role.KeyPermission)) } - buckets.UnsafePutRole(as.lg, tx, role) + schema.UnsafePutRole(as.lg, tx, role) // TODO(mitake): currently single role update invalidates every cache // It should be optimized. @@ -850,7 +850,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE tx.Lock() defer tx.Unlock() - user := buckets.UnsafeGetUser(as.lg, tx, userName) + user := schema.UnsafeGetUser(as.lg, tx, userName) if user == nil { as.lg.Error("cannot find a user for permission check", zap.String("user-name", userName)) return ErrPermissionDenied @@ -890,7 +890,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { tx := as.be.BatchTx() tx.Lock() - u := buckets.UnsafeGetUser(as.lg, tx, authInfo.Username) + u := schema.UnsafeGetUser(as.lg, tx, authInfo.Username) tx.Unlock() if u == nil { @@ -930,11 +930,11 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo tx := be.BatchTx() tx.Lock() - buckets.UnsafeCreateAuthBucket(tx) - tx.UnsafeCreateBucket(buckets.AuthUsers) - tx.UnsafeCreateBucket(buckets.AuthRoles) + schema.UnsafeCreateAuthBucket(tx) + tx.UnsafeCreateBucket(schema.AuthUsers) + tx.UnsafeCreateBucket(schema.AuthRoles) - enabled := buckets.UnsafeReadAuthEnabled(tx) + enabled := schema.UnsafeReadAuthEnabled(tx) as := &authStore{ revision: getRevision(tx), @@ -970,11 +970,11 @@ func hasRootRole(u *authpb.User) bool { func (as *authStore) commitRevision(tx backend.BatchTx) { atomic.AddUint64(&as.revision, 1) - buckets.UnsafeSaveAuthRevision(tx, as.Revision()) + schema.UnsafeSaveAuthRevision(tx, as.Revision()) } func getRevision(tx backend.BatchTx) uint64 { - return buckets.UnsafeReadAuthRevision(tx) + return schema.UnsafeReadAuthRevision(tx) } func (as *authStore) setRevision(rev uint64) { @@ -1169,7 +1169,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context { func (as *authStore) HasRole(user, role string) bool { tx := as.be.BatchTx() tx.Lock() - u := buckets.UnsafeGetUser(as.lg, tx, user) + u := schema.UnsafeGetUser(as.lg, tx, user) tx.Unlock() if u == nil { diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 1cda05071..4dc56f106 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -21,7 +21,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -59,7 +59,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { return m } - buckets.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm) + schema.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm) return newAlarm } @@ -79,7 +79,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { delete(t, id) - buckets.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m) + schema.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m) return m } @@ -105,8 +105,8 @@ func (a *AlarmStore) restore() error { tx := b.BatchTx() tx.Lock() - buckets.UnsafeCreateAlarmBucket(tx) - ms, err := buckets.UnsafeGetAllAlarms(tx) + schema.UnsafeCreateAlarmBucket(tx) + ms, err := schema.UnsafeGetAllAlarms(tx) tx.Unlock() if err != nil { return err diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 02b2ac874..59732f619 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -28,8 +28,8 @@ import ( "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -101,7 +101,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe const snapshotSendBufferSize = 32 * 1024 func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { - ver := buckets.ReadStorageVersion(ms.bg.Backend().ReadTx()) + ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx()) storageVersion := "" if ver != nil { storageVersion = ver.String() diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 885a5e54a..e0da65c6c 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -23,7 +23,7 @@ import ( "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = buckets.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 831577c70..d0f95f4c4 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -42,7 +42,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" ) @@ -147,7 +147,7 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co beHooks = &backendHooks{lg: cfg.Logger, indexer: ci} be = openBackend(cfg, beHooks) ci.SetBackend(be) - buckets.CreateMetaBucket(be.BatchTx()) + schema.CreateMetaBucket(be.BatchTx()) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { err := maybeDefragBackend(cfg, be) if err != nil { @@ -200,7 +200,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe remotes := existingCluster.Members() cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be)) br := bootstrapRaftFromCluster(cfg, cl, nil) cl.SetID(br.wal.id, existingCluster.ID()) return &bootstrappedServer{ @@ -240,7 +240,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } } cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be)) br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) cl.SetID(br.wal.id, cl.ID()) return &bootstrappedServer{ @@ -330,7 +330,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back } r.raft.cl.SetStore(st) - r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + r.raft.cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be)) r.raft.cl.Recover(api.UpdateCapability) if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { bepath := cfg.BackendPath() diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index c73abd737..24dad6603 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -19,7 +19,7 @@ import ( "sync/atomic" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) type Backend interface { @@ -73,7 +73,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := buckets.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -86,7 +86,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) term := atomic.LoadUint64(&ci.term) - buckets.UnsafeUpdateConsistentIndex(tx, index, term, true) + schema.UnsafeUpdateConsistentIndex(tx, index, term, true) } func (ci *consistentIndex) SetBackend(be Backend) { @@ -119,5 +119,5 @@ func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - buckets.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) + schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index 36d48b00b..9122c6e33 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) // TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together. @@ -37,7 +37,7 @@ func TestConsistentIndex(t *testing.T) { } tx.Lock() - buckets.UnsafeCreateMetaBucket(tx) + schema.UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() r := uint64(7890123) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 54f76fb0d..48608fb31 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -63,8 +63,8 @@ import ( "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.etcd.io/etcd/server/v3/storage/schema" ) const ( @@ -313,7 +313,7 @@ func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { bh.confStateLock.Lock() defer bh.confStateLock.Unlock() if bh.confStateDirty { - buckets.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) + schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) // save bh.confState bh.confStateDirty = false } @@ -1075,7 +1075,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Info("restored v2 store") - s.cluster.SetBackend(buckets.NewMembershipStore(lg, newbe)) + s.cluster.SetBackend(schema.NewMembershipStore(lg, newbe)) lg.Info("restoring cluster configuration") diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index d0230e02f..bda37ceaf 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -52,8 +52,8 @@ import ( "go.etcd.io/etcd/server/v3/mock/mockstore" "go.etcd.io/etcd/server/v3/mock/mockwait" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -651,7 +651,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) - buckets.CreateMetaBucket(be.BatchTx()) + schema.CreateMetaBucket(be.BatchTx()) ci := cindex.NewConsistentIndex(be) srv := &EtcdServer{ @@ -696,9 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { tx.Lock() defer tx.Unlock() srv.beHooks.OnPreCommitUnsafe(tx) - assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx)) + assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := buckets.ReadConsistentIndex(be.BatchTx()) + rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) assert.Equal(t, uint64(4), rterm) } diff --git a/server/etcdserver/version/version.go b/server/etcdserver/version/version.go index d498abf00..485156745 100644 --- a/server/etcdserver/version/version.go +++ b/server/etcdserver/version/version.go @@ -21,7 +21,7 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) var ( @@ -41,7 +41,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error { case V3_5: lg.Warn("setting storage version", zap.String("storage-version", V3_6.String())) // All meta keys introduced in v3.6 should be filled in here. - buckets.UnsafeSetStorageVersion(tx, &V3_6) + schema.UnsafeSetStorageVersion(tx, &V3_6) case V3_6: default: lg.Warn("unknown storage version", zap.String("storage-version", v.String())) @@ -50,17 +50,17 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error { } func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) { - v := buckets.UnsafeReadStorageVersion(tx) + v := schema.UnsafeReadStorageVersion(tx) if v != nil { return v, nil } - confstate := buckets.UnsafeConfStateFromBackend(lg, tx) + confstate := schema.UnsafeConfStateFromBackend(lg, tx) if confstate == nil { - return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName) + return nil, fmt.Errorf("missing %q key", schema.MetaConfStateName) } - _, term := buckets.UnsafeReadConsistentIndex(tx) + _, term := schema.UnsafeReadConsistentIndex(tx) if term == 0 { - return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName) + return nil, fmt.Errorf("missing %q key", schema.MetaTermKeyName) } copied := V3_5 return &copied, nil diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 6b82b7120..4c34bb18d 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -24,7 +24,7 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -47,7 +47,7 @@ func TestUpdateStorageVersion(t *testing.T) { { name: `Backend before 3.6 without "term" should be rejected`, version: "", - metaKeys: [][]byte{buckets.MetaConfStateName}, + metaKeys: [][]byte{schema.MetaConfStateName}, expectVersion: nil, expectError: true, expectedErrorMsg: `cannot determine storage version: missing "term" key`, @@ -55,25 +55,25 @@ func TestUpdateStorageVersion(t *testing.T) { { name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", version: "", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName}, + metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName}, expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { name: "Backend in 3.6.0 should be skipped", version: "3.6.0", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName}, expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { name: "Backend with current version should be skipped", version: version.Version, - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName}, + metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName}, expectVersion: &semver.Version{Major: 3, Minor: 6}, }, { name: "Backend in 3.7.0 should be skipped", version: "3.7.0", - metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")}, + metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName, []byte("future-key")}, expectVersion: &semver.Version{Major: 3, Minor: 7}, }, } @@ -86,19 +86,19 @@ func TestUpdateStorageVersion(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - buckets.UnsafeCreateMetaBucket(tx) + schema.UnsafeCreateMetaBucket(tx) for _, k := range tc.metaKeys { switch string(k) { - case string(buckets.MetaConfStateName): - buckets.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{}) - case string(buckets.MetaTermKeyName): - buckets.UnsafeUpdateConsistentIndex(tx, 1, 1, false) + case string(schema.MetaConfStateName): + schema.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{}) + case string(schema.MetaTermKeyName): + schema.UnsafeUpdateConsistentIndex(tx, 1, 1, false) default: - tx.UnsafePut(buckets.Meta, k, []byte{}) + tx.UnsafePut(schema.Meta, k, []byte{}) } } if tc.version != "" { - buckets.UnsafeSetStorageVersion(tx, semver.New(tc.version)) + schema.UnsafeSetStorageVersion(tx, semver.New(tc.version)) } tx.Unlock() be.ForceCommit() @@ -113,7 +113,7 @@ func TestUpdateStorageVersion(t *testing.T) { if err != nil && err.Error() != tc.expectedErrorMsg { t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg) } - v := buckets.UnsafeReadStorageVersion(b.BatchTx()) + v := schema.UnsafeReadStorageVersion(b.BatchTx()) assert.Equal(t, tc.expectVersion, v) }) } diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 676c32fe1..715b82079 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -27,7 +27,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error { // lease deletion needs to be in the same backend transaction with the // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. - buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)}) + schema.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)}) txn.End() @@ -770,8 +770,8 @@ func (le *lessor) initAndRecover() { tx := le.b.BatchTx() tx.Lock() - buckets.UnsafeCreateLeaseBucket(tx) - lpbs := buckets.MustUnsafeGetAllLeases(tx) + schema.UnsafeCreateLeaseBucket(tx) + lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() for _, lpb := range lpbs { ID := LeaseID(lpb.ID) @@ -818,7 +818,7 @@ func (l *Lease) persistTo(b backend.Backend) { tx := b.BatchTx() tx.Lock() defer tx.Unlock() - buckets.MustUnsafePutLease(tx, &lpb) + schema.MustUnsafePutLease(tx, &lpb) } // TTL returns the TTL of the Lease. diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index f62af386d..99d57a70f 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -28,7 +28,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -95,7 +95,7 @@ func TestLessorGrant(t *testing.T) { tx := be.BatchTx() tx.Lock() defer tx.Unlock() - lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID)) + lpb := schema.MustUnsafeGetLease(tx, int64(l.ID)) if lpb == nil { t.Errorf("lpb = %d, want not nil", lpb) } @@ -199,7 +199,7 @@ func TestLessorRevoke(t *testing.T) { tx := be.BatchTx() tx.Lock() defer tx.Unlock() - lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID)) + lpb := schema.MustUnsafeGetLease(tx, int64(l.ID)) if lpb != nil { t.Errorf("lpb = %d, want nil", lpb) } diff --git a/server/storage/backend/backend_bench_test.go b/server/storage/backend/backend_bench_test.go index 950707658..204d7de61 100644 --- a/server/storage/backend/backend_bench_test.go +++ b/server/storage/backend/backend_bench_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) func BenchmarkBackendPut(b *testing.B) { @@ -42,13 +42,13 @@ func BenchmarkBackendPut(b *testing.B) { batchTx := backend.BatchTx() batchTx.Lock() - batchTx.UnsafeCreateBucket(buckets.Test) + batchTx.UnsafeCreateBucket(schema.Test) batchTx.Unlock() b.ResetTimer() for i := 0; i < b.N; i++ { batchTx.Lock() - batchTx.UnsafePut(buckets.Test, keys[i], value) + batchTx.UnsafePut(schema.Test, keys[i], value) batchTx.Unlock() } } diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index c18b63189..ff210ae6b 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -25,7 +25,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) func TestBackendClose(t *testing.T) { @@ -53,8 +53,8 @@ func TestBackendSnapshot(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) tx.Unlock() b.ForceCommit() @@ -78,7 +78,7 @@ func TestBackendSnapshot(t *testing.T) { newTx := nb.BatchTx() newTx.Lock() - ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0) + ks, _ := newTx.UnsafeRange(schema.Test, []byte("foo"), []byte("goo"), 0) if len(ks) != 1 { t.Errorf("len(kvs) = %d, want 1", len(ks)) } @@ -95,8 +95,8 @@ func TestBackendBatchIntervalCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) tx.Unlock() for i := 0; i < 10; i++ { @@ -127,9 +127,9 @@ func TestBackendDefrag(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafeCreateBucket(schema.Test) for i := 0; i < backend.DefragLimitForTest()+100; i++ { - tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar")) + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar")) } tx.Unlock() b.ForceCommit() @@ -138,7 +138,7 @@ func TestBackendDefrag(t *testing.T) { tx = b.BatchTx() tx.Lock() for i := 0; i < 50; i++ { - tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i))) + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("foo_%d", i))) } tx.Unlock() b.ForceCommit() @@ -172,8 +172,8 @@ func TestBackendDefrag(t *testing.T) { // try put more keys after shrink. tx = b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("more"), []byte("bar")) tx.Unlock() b.ForceCommit() } @@ -185,15 +185,15 @@ func TestBackendWriteback(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Key) - tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar")) - tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz")) - tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1")) + tx.UnsafeCreateBucket(schema.Key) + tx.UnsafePut(schema.Key, []byte("abc"), []byte("bar")) + tx.UnsafePut(schema.Key, []byte("def"), []byte("baz")) + tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("1")) tx.Unlock() // overwrites should be propagated too tx.Lock() - tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2")) + tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("2")) tx.Unlock() keys := []struct { @@ -246,7 +246,7 @@ func TestBackendWriteback(t *testing.T) { func() { rtx.RLock() defer rtx.RUnlock() - k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit) + k, v := rtx.UnsafeRange(schema.Key, tt.key, tt.end, tt.limit) if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) { t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v) } @@ -261,20 +261,20 @@ func TestConcurrentReadTx(t *testing.T) { wtx1 := b.BatchTx() wtx1.Lock() - wtx1.UnsafeCreateBucket(buckets.Key) - wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC")) - wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1")) + wtx1.UnsafeCreateBucket(schema.Key) + wtx1.UnsafePut(schema.Key, []byte("abc"), []byte("ABC")) + wtx1.UnsafePut(schema.Key, []byte("overwrite"), []byte("1")) wtx1.Unlock() wtx2 := b.BatchTx() wtx2.Lock() - wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF")) - wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2")) + wtx2.UnsafePut(schema.Key, []byte("def"), []byte("DEF")) + wtx2.UnsafePut(schema.Key, []byte("overwrite"), []byte("2")) wtx2.Unlock() rtx := b.ConcurrentReadTx() rtx.RLock() // no-op - k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0) + k, v := rtx.UnsafeRange(schema.Key, []byte("abc"), []byte("\xff"), 0) rtx.RUnlock() wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")} wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")} @@ -291,10 +291,10 @@ func TestBackendWritebackForEach(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Key) + tx.UnsafeCreateBucket(schema.Key) for i := 0; i < 5; i++ { k := []byte(fmt.Sprintf("%04d", i)) - tx.UnsafePut(buckets.Key, k, []byte("bar")) + tx.UnsafePut(schema.Key, k, []byte("bar")) } tx.Unlock() @@ -302,10 +302,10 @@ func TestBackendWritebackForEach(t *testing.T) { b.ForceCommit() tx.Lock() - tx.UnsafeCreateBucket(buckets.Key) + tx.UnsafeCreateBucket(schema.Key) for i := 5; i < 20; i++ { k := []byte(fmt.Sprintf("%04d", i)) - tx.UnsafePut(buckets.Key, k, []byte("bar")) + tx.UnsafePut(schema.Key, k, []byte("bar")) } tx.Unlock() @@ -316,7 +316,7 @@ func TestBackendWritebackForEach(t *testing.T) { } rtx := b.ReadTx() rtx.RLock() - assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq)) + assert.NoError(t, rtx.UnsafeForEach(schema.Key, getSeq)) rtx.RUnlock() partialSeq := seq @@ -325,7 +325,7 @@ func TestBackendWritebackForEach(t *testing.T) { b.ForceCommit() tx.Lock() - assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq)) + assert.NoError(t, tx.UnsafeForEach(schema.Key, getSeq)) tx.Unlock() if seq != partialSeq { diff --git a/server/storage/backend/batch_tx_test.go b/server/storage/backend/batch_tx_test.go index d5e8febf3..6fd2bbae6 100644 --- a/server/storage/backend/batch_tx_test.go +++ b/server/storage/backend/batch_tx_test.go @@ -22,7 +22,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) func TestBatchTxPut(t *testing.T) { @@ -34,18 +34,18 @@ func TestBatchTxPut(t *testing.T) { tx.Lock() // create bucket - tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafeCreateBucket(schema.Test) // put v := []byte("bar") - tx.UnsafePut(buckets.Test, []byte("foo"), v) + tx.UnsafePut(schema.Test, []byte("foo"), v) tx.Unlock() // check put result before and after tx is committed for k := 0; k < 2; k++ { tx.Lock() - _, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0) + _, gv := tx.UnsafeRange(schema.Test, []byte("foo"), nil, 0) tx.Unlock() if !reflect.DeepEqual(gv[0], v) { t.Errorf("v = %s, want %s", string(gv[0]), string(v)) @@ -62,12 +62,12 @@ func TestBatchTxRange(t *testing.T) { tx.Lock() defer tx.Unlock() - tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafeCreateBucket(schema.Test) // put keys allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")} for i := range allKeys { - tx.UnsafePut(buckets.Test, allKeys[i], allVals[i]) + tx.UnsafePut(schema.Test, allKeys[i], allVals[i]) } tests := []struct { @@ -115,7 +115,7 @@ func TestBatchTxRange(t *testing.T) { }, } for i, tt := range tests { - keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit) + keys, vals := tx.UnsafeRange(schema.Test, tt.key, tt.endKey, tt.limit) if !reflect.DeepEqual(keys, tt.wkeys) { t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys) } @@ -132,17 +132,17 @@ func TestBatchTxDelete(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) - tx.UnsafeDelete(buckets.Test, []byte("foo")) + tx.UnsafeDelete(schema.Test, []byte("foo")) tx.Unlock() // check put result before and after tx is committed for k := 0; k < 2; k++ { tx.Lock() - ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0) + ks, _ := tx.UnsafeRange(schema.Test, []byte("foo"), nil, 0) tx.Unlock() if len(ks) != 0 { t.Errorf("keys on foo = %v, want nil", ks) @@ -157,15 +157,15 @@ func TestBatchTxCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) tx.Unlock() tx.Commit() // check whether put happens via db view backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(buckets.Test.Name()) + bucket := tx.Bucket(schema.Test.Name()) if bucket == nil { t.Errorf("bucket test does not exit") return nil @@ -186,14 +186,14 @@ func TestBatchTxBatchLimitCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Test) - tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) tx.Unlock() // batch limit commit should have been triggered // check whether put happens via db view backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(buckets.Test.Name()) + bucket := tx.Bucket(schema.Test.Name()) if bucket == nil { t.Errorf("bucket test does not exit") return nil diff --git a/server/storage/backend/hooks_test.go b/server/storage/backend/hooks_test.go index d1a93f8dc..5197ee2d7 100644 --- a/server/storage/backend/hooks_test.go +++ b/server/storage/backend/hooks_test.go @@ -22,11 +22,11 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) var ( - bucket = buckets.Test + bucket = schema.Test key = []byte("key") ) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index be30d6a43..135509bcf 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -28,7 +28,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -122,8 +122,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi tx := s.b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Key) - tx.UnsafeCreateBucket(buckets.Meta) + tx.UnsafeCreateBucket(schema.Key) + tx.UnsafeCreateBucket(schema.Meta) tx.Unlock() s.b.ForceCommit() @@ -161,7 +161,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { start := time.Now() s.b.ForceCommit() - h, err := s.b.Hash(buckets.DefaultIgnores) + h, err := s.b.Hash(schema.DefaultIgnores) hashSec.Observe(time.Since(start).Seconds()) return h, s.currentRev, err @@ -197,8 +197,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev lower := revision{main: compactRev + 1} h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - h.Write(buckets.Key.Name()) - err = tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { + h.Write(schema.Key.Name()) + err = tx.UnsafeForEach(schema.Key, func(k, v []byte) error { kr := bytesToRev(k) if !upper.GreaterThan(kr) { return nil @@ -340,8 +340,8 @@ func (s *store) restore() error { s.lg.Info( "restored last compact revision", - zap.Stringer("meta-bucket-name", buckets.Meta), - zap.String("meta-bucket-name-key", string(buckets.FinishedCompactKeyName)), + zap.Stringer("meta-bucket-name", schema.Meta), + zap.String("meta-bucket-name-key", string(schema.FinishedCompactKeyName)), zap.Int64("restored-compact-revision", s.compactMainRev), ) s.revMu.Unlock() @@ -351,7 +351,7 @@ func (s *store) restore() error { keysGauge.Set(0) rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { - keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys)) + keys, vals := tx.UnsafeRange(schema.Key, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } @@ -412,8 +412,8 @@ func (s *store) restore() error { s.lg.Info( "resume scheduled compaction", - zap.Stringer("meta-bucket-name", buckets.Meta), - zap.String("meta-bucket-name-key", string(buckets.ScheduledCompactKeyName)), + zap.Stringer("meta-bucket-name", schema.Meta), + zap.String("meta-bucket-name-key", string(schema.ScheduledCompactKeyName)), zap.Int64("scheduled-compact-revision", scheduledCompact), ) } diff --git a/server/storage/mvcc/kvstore_bench_test.go b/server/storage/mvcc/kvstore_bench_test.go index d30046d20..8f4ff6ce9 100644 --- a/server/storage/mvcc/kvstore_bench_test.go +++ b/server/storage/mvcc/kvstore_bench_test.go @@ -23,7 +23,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -84,7 +84,7 @@ func BenchmarkConsistentIndex(b *testing.B) { tx := be.BatchTx() tx.Lock() - buckets.UnsafeCreateMetaBucket(tx) + schema.UnsafeCreateMetaBucket(tx) ci.UnsafeSave(tx) tx.Unlock() diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index 17e59f570..ba9440082 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -18,7 +18,7 @@ import ( "encoding/binary" "time" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -43,11 +43,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc tx := s.b.BatchTx() tx.Lock() - keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum)) + keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) if _, ok := keep[rev]; !ok { - tx.UnsafeDelete(buckets.Key, key) + tx.UnsafeDelete(schema.Key, key) keyCompactions++ } } diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index a8d3e3baa..53209e3db 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -24,7 +24,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -75,7 +75,7 @@ func TestScheduleCompaction(t *testing.T) { ibytes := newRevBytes() for _, rev := range revs { revToBytes(rev, ibytes) - tx.UnsafePut(buckets.Key, ibytes, []byte("bar")) + tx.UnsafePut(schema.Key, ibytes, []byte("bar")) } tx.Unlock() @@ -84,14 +84,14 @@ func TestScheduleCompaction(t *testing.T) { tx.Lock() for _, rev := range tt.wrevs { revToBytes(rev, ibytes) - keys, _ := tx.UnsafeRange(buckets.Key, ibytes, nil, 0) + keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0) if len(keys) != 1 { t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) } } vals, _ := UnsafeReadFinishedCompact(tx) if !reflect.DeepEqual(vals, tt.rev) { - t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, tt.rev) + t.Errorf("#%d: vals on %v = %+v, want %+v", i, schema.FinishedCompactKeyName, vals, tt.rev) } tx.Unlock() diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 3dd500e29..87708e796 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -37,7 +37,7 @@ import ( "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -149,12 +149,12 @@ func TestStorePut(t *testing.T) { } wact := []testutil.Action{ - {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}}, } if tt.rr != nil { wact = []testutil.Action{ - {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}}, } } @@ -229,7 +229,7 @@ func TestStoreRange(t *testing.T) { wstart := newRevBytes() revToBytes(tt.idxr.revs[0], wstart) wact := []testutil.Action{ - {Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{schema.Key, wstart, []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -304,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -343,10 +343,10 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ - {Name: "put", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, - {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}}, - {Name: "delete", Params: []interface{}{buckets.Key, key2}}, - {Name: "put", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}}, + {Name: "delete", Params: []interface{}{schema.Key, key2}}, + {Name: "put", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -384,8 +384,8 @@ func TestStoreRestore(t *testing.T) { if err != nil { t.Fatal(err) } - b.tx.rangeRespc <- rangeResp{[][]byte{buckets.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{buckets.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{nil, nil} @@ -399,9 +399,9 @@ func TestStoreRestore(t *testing.T) { t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ - {Name: "range", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, + {Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{schema.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { revToBytes(revision{main: 2}, rbytes) tx := s0.b.BatchTx() tx.Lock() - tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) + tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes) tx.Unlock() s0.Close() @@ -514,7 +514,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { for i := 0; i < 5; i++ { tx := s.b.BatchTx() tx.Lock() - ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0) + ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0) tx.Unlock() if len(ks) != 0 { time.Sleep(100 * time.Millisecond) diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index 0db10ed13..56c2335c2 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -21,7 +21,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -160,7 +160,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i default: } revToBytes(revpair, revBytes) - _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0) + _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0) if len(vs) != 1 { tr.s.lg.Fatal( "range failed to find revision pair", @@ -215,7 +215,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } tw.trace.Step("marshal mvccpb.KeyValue") - tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) + tw.tx.UnsafeSeqPut(schema.Key, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) tw.trace.Step("store kv pair into bolt db") @@ -276,7 +276,7 @@ func (tw *storeTxnWrite) delete(key []byte) { ) } - tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) + tw.tx.UnsafeSeqPut(schema.Key, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { tw.storeTxnRead.s.lg.Fatal( diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index 60745fb7f..e530c82f4 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -16,11 +16,11 @@ package mvcc import ( "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) { - _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) + _, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { return bytesToRev(finishedCompactBytes[0]).main, true } @@ -28,7 +28,7 @@ func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found b } func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) { - _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0) + _, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { return bytesToRev(scheduledCompactBytes[0]).main, true } @@ -44,7 +44,7 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) { func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { rbytes := newRevBytes() revToBytes(revision{main: value}, rbytes) - tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) + tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes) } func SetFinishedCompact(tx backend.BatchTx, value int64) { @@ -56,5 +56,5 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) { func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) { rbytes := newRevBytes() revToBytes(revision{main: value}, rbytes) - tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes) + tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes) } diff --git a/server/storage/mvcc/store_test.go b/server/storage/mvcc/store_test.go index e3276462a..4d9db4435 100644 --- a/server/storage/mvcc/store_test.go +++ b/server/storage/mvcc/store_test.go @@ -10,7 +10,7 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) // TestScheduledCompact ensures that UnsafeSetScheduledCompact&UnsafeReadScheduledCompact work well together. @@ -39,7 +39,7 @@ func TestScheduledCompact(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(buckets.Meta) + tx.UnsafeCreateBucket(schema.Meta) UnsafeSetScheduledCompact(tx, tc.value) tx.Unlock() be.ForceCommit() @@ -80,7 +80,7 @@ func TestFinishedCompact(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(buckets.Meta) + tx.UnsafeCreateBucket(schema.Meta) UnsafeSetFinishedCompact(tx, tc.value) tx.Unlock() be.ForceCommit() diff --git a/server/storage/mvcc/util.go b/server/storage/mvcc/util.go index 309527097..bf5d9c196 100644 --- a/server/storage/mvcc/util.go +++ b/server/storage/mvcc/util.go @@ -19,7 +19,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" ) func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { @@ -32,6 +32,6 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { } be.BatchTx().Lock() - be.BatchTx().UnsafePut(buckets.Key, ibytes, d) + be.BatchTx().UnsafePut(schema.Key, ibytes, d) be.BatchTx().Unlock() } diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index c78b3b802..3e9606b19 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -22,7 +22,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -354,7 +354,7 @@ func (s *watchableStore) syncWatchers() int { // values are actual key-value pairs in backend. tx := s.store.b.ReadTx() tx.RLock() - revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0) + revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) tx.RUnlock() evs := kvsToEvents(s.store.lg, wg, revs, vs) diff --git a/server/storage/buckets/alarm.go b/server/storage/schema/alarm.go similarity index 99% rename from server/storage/buckets/alarm.go rename to server/storage/schema/alarm.go index cc3160df3..7400dc470 100644 --- a/server/storage/buckets/alarm.go +++ b/server/storage/schema/alarm.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "go.etcd.io/etcd/api/v3/etcdserverpb" diff --git a/server/storage/buckets/auth.go b/server/storage/schema/auth.go similarity index 99% rename from server/storage/buckets/auth.go rename to server/storage/schema/auth.go index 950810b82..cf5616e0f 100644 --- a/server/storage/buckets/auth.go +++ b/server/storage/schema/auth.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "bytes" diff --git a/server/storage/buckets/auth_roles.go b/server/storage/schema/auth_roles.go similarity index 99% rename from server/storage/buckets/auth_roles.go rename to server/storage/schema/auth_roles.go index af151ada9..c568f3a1b 100644 --- a/server/storage/buckets/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "go.etcd.io/etcd/api/v3/authpb" diff --git a/server/storage/buckets/auth_test.go b/server/storage/schema/auth_test.go similarity index 99% rename from server/storage/buckets/auth_test.go rename to server/storage/schema/auth_test.go index 3390ac0ff..92b219406 100644 --- a/server/storage/buckets/auth_test.go +++ b/server/storage/schema/auth_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "fmt" diff --git a/server/storage/buckets/auth_users.go b/server/storage/schema/auth_users.go similarity index 99% rename from server/storage/buckets/auth_users.go rename to server/storage/schema/auth_users.go index 4ae4b2d0b..334017457 100644 --- a/server/storage/buckets/auth_users.go +++ b/server/storage/schema/auth_users.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "go.etcd.io/etcd/api/v3/authpb" diff --git a/server/storage/buckets/bucket.go b/server/storage/schema/bucket.go similarity index 99% rename from server/storage/buckets/bucket.go rename to server/storage/schema/bucket.go index 3da963e1c..c39ed71e0 100644 --- a/server/storage/buckets/bucket.go +++ b/server/storage/schema/bucket.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "bytes" diff --git a/server/storage/buckets/cindex.go b/server/storage/schema/cindex.go similarity index 99% rename from server/storage/buckets/cindex.go rename to server/storage/schema/cindex.go index 619222207..5fb4d5bf6 100644 --- a/server/storage/buckets/cindex.go +++ b/server/storage/schema/cindex.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "encoding/binary" diff --git a/server/storage/buckets/confstate.go b/server/storage/schema/confstate.go similarity index 99% rename from server/storage/buckets/confstate.go rename to server/storage/schema/confstate.go index 37b2d0105..a0fdad163 100644 --- a/server/storage/buckets/confstate.go +++ b/server/storage/schema/confstate.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "encoding/json" diff --git a/server/storage/buckets/confstate_test.go b/server/storage/schema/confstate_test.go similarity index 99% rename from server/storage/buckets/confstate_test.go rename to server/storage/schema/confstate_test.go index 87f66b925..d4134ab35 100644 --- a/server/storage/buckets/confstate_test.go +++ b/server/storage/schema/confstate_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "testing" diff --git a/server/storage/buckets/lease.go b/server/storage/schema/lease.go similarity index 99% rename from server/storage/buckets/lease.go rename to server/storage/schema/lease.go index 8ba412d59..43a0c06e4 100644 --- a/server/storage/buckets/lease.go +++ b/server/storage/schema/lease.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "encoding/binary" diff --git a/server/storage/buckets/membership.go b/server/storage/schema/membership.go similarity index 99% rename from server/storage/buckets/membership.go rename to server/storage/schema/membership.go index 8fdfa33dc..b188ef7d9 100644 --- a/server/storage/buckets/membership.go +++ b/server/storage/schema/membership.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "encoding/json" diff --git a/server/storage/buckets/version.go b/server/storage/schema/version.go similarity index 99% rename from server/storage/buckets/version.go rename to server/storage/schema/version.go index 32a9bc1a5..4b8738a77 100644 --- a/server/storage/buckets/version.go +++ b/server/storage/schema/version.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "github.com/coreos/go-semver/semver" diff --git a/server/storage/buckets/version_test.go b/server/storage/schema/version_test.go similarity index 99% rename from server/storage/buckets/version_test.go rename to server/storage/schema/version_test.go index b9ea7bb9e..60e7cf776 100644 --- a/server/storage/buckets/version_test.go +++ b/server/storage/schema/version_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buckets +package schema import ( "testing" diff --git a/server/verify/verify.go b/server/verify/verify.go index df55aab20..fe34f7a2c 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -21,7 +21,7 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/datadir" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" wal2 "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" @@ -109,7 +109,7 @@ func MustVerifyIfEnabled(cfg Config) { func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - index, term := buckets.ReadConsistentIndex(tx) + index, term := schema.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index d53d86358..a0e14bc67 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -20,7 +20,7 @@ import ( "path/filepath" "go.etcd.io/etcd/api/v3/authpb" - "go.etcd.io/etcd/server/v3/storage/buckets" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/server/v3/lease/leasepb" @@ -163,7 +163,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) func getHash(dbPath string) (hash uint32, err error) { b := backend.NewDefaultBackend(dbPath) - return b.Hash(buckets.DefaultIgnores) + return b.Hash(schema.DefaultIgnores) } // TODO: revert by revision and find specified hash value