mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
v2 etcdctl backup: producing consistent state of membership
This commit is contained in:
parent
a70386a1a4
commit
067521981e
@ -15,7 +15,6 @@
|
|||||||
package command
|
package command
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
@ -25,11 +24,15 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||||
"go.etcd.io/etcd/pkg/v3/idutil"
|
"go.etcd.io/etcd/pkg/v3/idutil"
|
||||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
"go.etcd.io/etcd/server/v3/wal"
|
"go.etcd.io/etcd/server/v3/wal"
|
||||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||||
|
|
||||||
@ -54,11 +57,41 @@ func NewBackupCommand() cli.Command {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type desiredCluster struct {
|
||||||
|
clusterId types.ID
|
||||||
|
nodeId types.ID
|
||||||
|
members []*membership.Member
|
||||||
|
confState raftpb.ConfState
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDesiredCluster() desiredCluster {
|
||||||
|
idgen := idutil.NewGenerator(0, time.Now())
|
||||||
|
nodeID := idgen.Next()
|
||||||
|
clusterID := idgen.Next()
|
||||||
|
|
||||||
|
return desiredCluster{
|
||||||
|
clusterId: types.ID(clusterID),
|
||||||
|
nodeId: types.ID(nodeID),
|
||||||
|
members: []*membership.Member{
|
||||||
|
{
|
||||||
|
ID: types.ID(nodeID),
|
||||||
|
Attributes: membership.Attributes{
|
||||||
|
Name: "etcdctl-v2-backup",
|
||||||
|
},
|
||||||
|
RaftAttributes: membership.RaftAttributes{
|
||||||
|
PeerURLs: []string{"http://use-flag--force-new-cluster:2080"},
|
||||||
|
}}},
|
||||||
|
confState: raftpb.ConfState{Voters: []uint64{nodeID}},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleBackup handles a request that intends to do a backup.
|
// handleBackup handles a request that intends to do a backup.
|
||||||
func handleBackup(c *cli.Context) error {
|
func handleBackup(c *cli.Context) error {
|
||||||
var srcWAL string
|
var srcWAL string
|
||||||
var destWAL string
|
var destWAL string
|
||||||
|
|
||||||
|
lg := zap.NewExample()
|
||||||
|
|
||||||
withV3 := c.Bool("with-v3")
|
withV3 := c.Bool("with-v3")
|
||||||
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
|
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
|
||||||
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
|
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
|
||||||
@ -79,13 +112,12 @@ func handleBackup(c *cli.Context) error {
|
|||||||
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
walsnap := saveSnap(destSnap, srcSnap)
|
desired := newDesiredCluster()
|
||||||
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
|
|
||||||
saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3)
|
|
||||||
|
|
||||||
idgen := idutil.NewGenerator(0, time.Now())
|
walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
|
||||||
metadata.NodeID = idgen.Next()
|
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
|
||||||
metadata.ClusterID = idgen.Next()
|
destDbPath := filepath.Join(destSnap, "db")
|
||||||
|
saveDB(lg, destDbPath, filepath.Join(srcSnap, "db"), state.Commit, &desired, withV3)
|
||||||
|
|
||||||
neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata))
|
neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -102,15 +134,17 @@ func handleBackup(c *cli.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
|
func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) {
|
||||||
ss := snap.New(zap.NewExample(), srcSnap)
|
ss := snap.New(lg, srcSnap)
|
||||||
snapshot, err := ss.Load()
|
snapshot, err := ss.Load()
|
||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState
|
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState
|
||||||
newss := snap.New(zap.NewExample(), destSnap)
|
newss := snap.New(lg, destSnap)
|
||||||
|
snapshot.Metadata.ConfState = desired.confState
|
||||||
|
snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired)
|
||||||
if err = newss.SaveSnap(*snapshot); err != nil {
|
if err = newss.SaveSnap(*snapshot); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -118,6 +152,26 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
|
|||||||
return walsnap
|
return walsnap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mustTranslateV2store processes storeData such that they match 'desiredCluster'.
|
||||||
|
// In particular the method overrides membership information.
|
||||||
|
func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredCluster) []byte {
|
||||||
|
st := v2store.New()
|
||||||
|
if err := st.Recovery(storeData); err != nil {
|
||||||
|
lg.Panic("cannot translate v2store", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
|
||||||
|
raftCluster.SetID(desired.nodeId, desired.clusterId)
|
||||||
|
raftCluster.SetStore(st)
|
||||||
|
raftCluster.PushMembershipToStorage()
|
||||||
|
|
||||||
|
outputData, err := st.Save()
|
||||||
|
if err != nil {
|
||||||
|
lg.Panic("cannot save v2store", zap.Error(err))
|
||||||
|
}
|
||||||
|
return outputData
|
||||||
|
}
|
||||||
|
|
||||||
func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
|
func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
|
||||||
w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap)
|
w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -190,7 +244,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
|
|||||||
}
|
}
|
||||||
|
|
||||||
// saveDB copies the v3 backend and strips cluster information.
|
// saveDB copies the v3 backend and strips cluster information.
|
||||||
func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
|
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {
|
||||||
|
|
||||||
// open src db to safely copy db state
|
// open src db to safely copy db state
|
||||||
if v3 {
|
if v3 {
|
||||||
var src *bolt.DB
|
var src *bolt.DB
|
||||||
@ -229,37 +284,26 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := bolt.Open(destDB, 0644, &bolt.Options{})
|
be := backend.NewDefaultBackend(destDB)
|
||||||
if err != nil {
|
defer be.Close()
|
||||||
log.Fatal(err)
|
|
||||||
}
|
if err := membership.TrimClusterFromBackend(be); err != nil {
|
||||||
tx, err := db.Begin(true)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove membership information; should be clobbered by --force-new-cluster
|
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
|
||||||
// TODO: Consider refactoring to use backend.Backend instead of bolt
|
raftCluster.SetID(desired.nodeId, desired.clusterId)
|
||||||
// and membership.TrimMembershipFromBackend.
|
raftCluster.SetBackend(be)
|
||||||
for _, bucket := range []string{"members", "members_removed", "cluster"} {
|
raftCluster.PushMembershipToStorage()
|
||||||
tx.DeleteBucket([]byte(bucket))
|
|
||||||
}
|
|
||||||
|
|
||||||
// update consistent index to match hard state
|
|
||||||
if !v3 {
|
if !v3 {
|
||||||
idxBytes := make([]byte, 8)
|
tx := be.BatchTx()
|
||||||
binary.BigEndian.PutUint64(idxBytes, idx)
|
tx.Lock()
|
||||||
b, err := tx.CreateBucketIfNotExists([]byte("meta"))
|
defer tx.Unlock()
|
||||||
if err != nil {
|
tx.UnsafeCreateBucket([]byte("meta"))
|
||||||
log.Fatal(err)
|
ci := cindex.NewConsistentIndex(tx)
|
||||||
}
|
ci.SetConsistentIndex(idx)
|
||||||
b.Put([]byte("consistent_index"), idxBytes)
|
ci.UnsafeSave(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := db.Close(); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
"go.etcd.io/etcd/client/v3/snapshot"
|
"go.etcd.io/etcd/client/v3/snapshot"
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
|
||||||
"go.etcd.io/etcd/raft/v3"
|
"go.etcd.io/etcd/raft/v3"
|
||||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||||
"go.etcd.io/etcd/server/v3/config"
|
"go.etcd.io/etcd/server/v3/config"
|
||||||
@ -40,6 +39,7 @@ import (
|
|||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
"go.etcd.io/etcd/server/v3/wal"
|
"go.etcd.io/etcd/server/v3/wal"
|
||||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||||
@ -255,12 +255,19 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||||||
zap.String("snap-dir", s.snapDir),
|
zap.String("snap-dir", s.snapDir),
|
||||||
zap.Stack("stack"),
|
zap.Stack("stack"),
|
||||||
)
|
)
|
||||||
|
|
||||||
if err = s.saveDB(); err != nil {
|
if err = s.saveDB(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = s.saveWALAndSnap(); err != nil {
|
hardstate, err := s.saveWALAndSnap()
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.updateCIndex(hardstate.Commit); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.lg.Info(
|
s.lg.Info(
|
||||||
"restored snapshot",
|
"restored snapshot",
|
||||||
zap.String("path", s.srcDbPath),
|
zap.String("path", s.srcDbPath),
|
||||||
@ -295,7 +302,7 @@ func (s *v3Manager) saveDB() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *v3Manager) copyAndVerifyDB() error {
|
func (s *v3Manager) copyAndVerifyDB() error {
|
||||||
srcf, ferr := os.OpenFile(s.srcDbPath, os.O_RDONLY, 0600)
|
srcf, ferr := os.Open(s.srcDbPath)
|
||||||
if ferr != nil {
|
if ferr != nil {
|
||||||
return ferr
|
return ferr
|
||||||
}
|
}
|
||||||
@ -373,9 +380,9 @@ func (s *v3Manager) copyAndVerifyDB() error {
|
|||||||
// saveWALAndSnap creates a WAL for the initial cluster
|
// saveWALAndSnap creates a WAL for the initial cluster
|
||||||
//
|
//
|
||||||
// TODO: This code ignores learners !!!
|
// TODO: This code ignores learners !!!
|
||||||
func (s *v3Manager) saveWALAndSnap() error {
|
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
|
||||||
if err := fileutil.CreateDirAll(s.walDir); err != nil {
|
if err := fileutil.CreateDirAll(s.walDir); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// add members again to persist them to the store we create.
|
// add members again to persist them to the store we create.
|
||||||
@ -392,11 +399,11 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||||||
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
|
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
|
||||||
metadata, merr := md.Marshal()
|
metadata, merr := md.Marshal()
|
||||||
if merr != nil {
|
if merr != nil {
|
||||||
return merr
|
return nil, merr
|
||||||
}
|
}
|
||||||
w, walerr := wal.Create(s.lg, s.walDir, metadata)
|
w, walerr := wal.Create(s.lg, s.walDir, metadata)
|
||||||
if walerr != nil {
|
if walerr != nil {
|
||||||
return walerr
|
return nil, walerr
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
@ -404,7 +411,7 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||||||
for i, id := range s.cl.MemberIDs() {
|
for i, id := range s.cl.MemberIDs() {
|
||||||
ctx, err := json.Marshal((*s.cl).Member(id))
|
ctx, err := json.Marshal((*s.cl).Member(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
||||||
}
|
}
|
||||||
@ -420,7 +427,7 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||||||
}
|
}
|
||||||
d, err := cc.Marshal()
|
d, err := cc.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
ents[i] = raftpb.Entry{
|
ents[i] = raftpb.Entry{
|
||||||
Type: raftpb.EntryConfChange,
|
Type: raftpb.EntryConfChange,
|
||||||
@ -431,17 +438,18 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
commit, term := uint64(len(ents)), uint64(1)
|
commit, term := uint64(len(ents)), uint64(1)
|
||||||
if err := w.Save(raftpb.HardState{
|
hardState := raftpb.HardState{
|
||||||
Term: term,
|
Term: term,
|
||||||
Vote: peers[0].ID,
|
Vote: peers[0].ID,
|
||||||
Commit: commit,
|
Commit: commit,
|
||||||
}, ents); err != nil {
|
}
|
||||||
return err
|
if err := w.Save(hardState, ents); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, berr := st.Save()
|
b, berr := st.Save()
|
||||||
if berr != nil {
|
if berr != nil {
|
||||||
return berr
|
return nil, berr
|
||||||
}
|
}
|
||||||
confState := raftpb.ConfState{
|
confState := raftpb.ConfState{
|
||||||
Voters: nodeIDs,
|
Voters: nodeIDs,
|
||||||
@ -456,7 +464,17 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||||||
}
|
}
|
||||||
sn := snap.New(s.lg, s.snapDir)
|
sn := snap.New(s.lg, s.snapDir)
|
||||||
if err := sn.SaveSnap(raftSnap); err != nil {
|
if err := sn.SaveSnap(raftSnap); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState})
|
snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}
|
||||||
|
return &hardState, w.SaveSnapshot(snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *v3Manager) updateCIndex(commit uint64) error {
|
||||||
|
be := backend.NewDefaultBackend(s.outDbPath())
|
||||||
|
defer be.Close()
|
||||||
|
ci := cindex.NewConsistentIndex(be.BatchTx())
|
||||||
|
ci.SetConsistentIndex(commit)
|
||||||
|
ci.UnsafeSave(be.BatchTx())
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -859,3 +859,20 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
|
|||||||
sort.Sort(types.IDSlice(ids))
|
sort.Sort(types.IDSlice(ids))
|
||||||
return ids
|
return ids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PushMembershipToStorage is overriding storage information about cluster's
|
||||||
|
// members, such that they fully reflect internal RaftCluster's storage.
|
||||||
|
func (c *RaftCluster) PushMembershipToStorage() {
|
||||||
|
if c.be != nil {
|
||||||
|
TrimMembershipFromBackend(c.lg, c.be)
|
||||||
|
for _, m := range c.members {
|
||||||
|
mustSaveMemberToBackend(c.lg, c.be, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if c.v2store != nil {
|
||||||
|
TrimMembershipFromV2Store(c.lg, c.v2store)
|
||||||
|
for _, m := range c.members {
|
||||||
|
mustSaveMemberToStore(c.lg, c.v2store, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||||
@ -61,12 +62,10 @@ func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, no
|
|||||||
}
|
}
|
||||||
|
|
||||||
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
||||||
var b []byte
|
|
||||||
peerURLstrs := peerURLs.StringSlice()
|
peerURLstrs := peerURLs.StringSlice()
|
||||||
sort.Strings(peerURLstrs)
|
sort.Strings(peerURLstrs)
|
||||||
for _, p := range peerURLstrs {
|
joinedPeerUrls := strings.Join(peerURLstrs, "")
|
||||||
b = append(b, []byte(p)...)
|
b := []byte(joinedPeerUrls)
|
||||||
}
|
|
||||||
|
|
||||||
b = append(b, []byte(clusterName)...)
|
b = append(b, []byte(clusterName)...)
|
||||||
if now != nil {
|
if now != nil {
|
||||||
|
@ -57,6 +57,8 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
|
|||||||
tx.UnsafePut(membersBucketName, mkey, mvalue)
|
tx.UnsafePut(membersBucketName, mkey, mvalue)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrimClusterFromBackend removes all information about cluster (versions)
|
||||||
|
// from the v3 backend.
|
||||||
func TrimClusterFromBackend(be backend.Backend) error {
|
func TrimClusterFromBackend(be backend.Backend) error {
|
||||||
tx := be.BatchTx()
|
tx := be.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
@ -83,7 +85,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
|
|||||||
tx.RLock()
|
tx.RLock()
|
||||||
defer tx.RUnlock()
|
defer tx.RUnlock()
|
||||||
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
||||||
memberId := MustParseMemberIDFromBytes(lg, k)
|
memberId := mustParseMemberIDFromBytes(lg, k)
|
||||||
m := &Member{ID: memberId}
|
m := &Member{ID: memberId}
|
||||||
if err := json.Unmarshal(v, &m); err != nil {
|
if err := json.Unmarshal(v, &m); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -96,7 +98,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
||||||
memberId := MustParseMemberIDFromBytes(lg, k)
|
memberId := mustParseMemberIDFromBytes(lg, k)
|
||||||
removed[memberId] = true
|
removed[memberId] = true
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -114,24 +116,48 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
|
|||||||
return members, removed
|
return members, removed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrimMembershipFromBackend removes all information about members &
|
||||||
|
// removed_members from the v3 backend.
|
||||||
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
||||||
|
lg.Info("Trimming membership information from the backend...")
|
||||||
tx := be.BatchTx()
|
tx := be.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
||||||
tx.UnsafeDelete(membersBucketName, k)
|
tx.UnsafeDelete(membersBucketName, k)
|
||||||
|
lg.Debug("Removed member from the backend",
|
||||||
|
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
return tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
||||||
tx.UnsafeDelete(membersRemovedBucketName, k)
|
tx.UnsafeDelete(membersRemovedBucketName, k)
|
||||||
|
lg.Debug("Removed removed_member from the backend",
|
||||||
|
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrimMembershipFromV2Store removes all information about members &
|
||||||
|
// removed_members from the v2 store.
|
||||||
|
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
|
||||||
|
members, removed := membersFromStore(lg, s)
|
||||||
|
|
||||||
|
for mID := range members {
|
||||||
|
_, err := s.Delete(MemberStoreKey(mID), true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
for mID := range removed {
|
||||||
|
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -289,7 +315,7 @@ func MemberAttributesStorePath(id types.ID) string {
|
|||||||
return path.Join(MemberStoreKey(id), attributesSuffix)
|
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
|
func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
|
||||||
id, err := types.IDFromString(string(key))
|
id, err := types.IDFromString(string(key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("failed to parse member id from key", zap.Error(err))
|
lg.Panic("failed to parse member id from key", zap.Error(err))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user