From 30811a06aa2651d8e90c4e61463cb3767c63ff51 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 8 Oct 2020 22:29:02 +0200 Subject: [PATCH] etcdctl, clientv3/snapshot: snapshot (Restore,Status) code out of client "snapshot" Restore/Status code was the only remaining dependency of client on 'server' code. The code is solelly used by etcdctl. Long-term the snapshot code should be migrated to 'etcdadm' style of tool such that we can distinguish tool solelly depending on networking API vs. tools that operation on etcd files directly. We left snapshot.Save() code in clientv3.snapshot package, such that clients can benefits from automated download&safe to file snapshot functionality over the wire. --- CHANGELOG-3.5.md | 4 +- clientv3/snapshot/v3_snapshot.go | 451 +---------------- etcdctl/ctlv3/command/printer.go | 2 +- etcdctl/ctlv3/command/printer_fields.go | 2 +- etcdctl/ctlv3/command/printer_json.go | 2 +- etcdctl/ctlv3/command/printer_simple.go | 2 +- etcdctl/ctlv3/command/printer_table.go | 2 +- etcdctl/ctlv3/command/snapshot_command.go | 2 +- etcdctl/snapshot/doc.go | 16 + {clientv3 => etcdctl}/snapshot/util.go | 0 etcdctl/snapshot/v3_snapshot.go | 471 ++++++++++++++++++ tests/e2e/ctl_v3_snapshot_test.go | 2 +- tests/functional/rpcpb/member.go | 2 +- .../clientv3/snapshot/v3_snapshot_test.go | 123 +++++ tests/integration/snapshot/member_test.go | 2 +- .../integration/snapshot/v3_snapshot_test.go | 24 +- 16 files changed, 639 insertions(+), 468 deletions(-) create mode 100644 etcdctl/snapshot/doc.go rename {clientv3 => etcdctl}/snapshot/util.go (100%) create mode 100644 etcdctl/snapshot/v3_snapshot.go create mode 100644 tests/integration/clientv3/snapshot/v3_snapshot_test.go diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index 001d148aa..a3dcd9968 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -21,7 +21,9 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and ### Breaking Changes -- `go.etcd.io/etcd` Go packages have moved to `go.etcd.io/etcd/v3` to follow the [Go modules](https://github.com/golang/go/wiki/Modules) conventions +- `go.etcd.io/etcd` Go packages have moved to `go.etcd.io/etcd/{api,pkg,raft,client,etcdctl,server,raft,tests}/v3` to follow the [Go modules](https://github.com/golang/go/wiki/Modules) conventions +- `go.etcd.io/clientv3/snapshot` SnapshotManager class have moved to `go.etcd.io/clientv3/etcdctl`. + The method `snapshot.Save` to download a snapshot from the remote server was preserved in 'go.etcd.io/clientv3/snapshot`. - Changed behavior of clienv3 API [MemberList](https://github.com/etcd-io/etcd/pull/11639). - Previously, it is directly served with server's local data, which could be stale. - Now, it is served with linearizable guarantee. If the server is disconnected from quorum, `MemberList` call will fail. diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 4b1196042..c7aa5b629 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -17,79 +17,17 @@ package snapshot import ( "context" "crypto/sha256" - "encoding/json" "fmt" - "hash/crc32" "io" - "math" "os" - "path/filepath" - "reflect" - "strings" "time" "github.com/dustin/go-humanize" - bolt "go.etcd.io/bbolt" - "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/etcdserver" - "go.etcd.io/etcd/v3/etcdserver/api/membership" - "go.etcd.io/etcd/v3/etcdserver/api/snap" - "go.etcd.io/etcd/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/v3/etcdserver/cindex" - "go.etcd.io/etcd/v3/lease" - "go.etcd.io/etcd/v3/mvcc" - "go.etcd.io/etcd/v3/mvcc/backend" "go.etcd.io/etcd/v3/pkg/fileutil" - "go.etcd.io/etcd/v3/pkg/traceutil" - "go.etcd.io/etcd/v3/pkg/types" - "go.etcd.io/etcd/v3/raft" - "go.etcd.io/etcd/v3/raft/raftpb" - "go.etcd.io/etcd/v3/wal" - "go.etcd.io/etcd/v3/wal/walpb" "go.uber.org/zap" ) -// Manager defines snapshot methods. -type Manager interface { - // Save fetches snapshot from remote etcd server and saves data - // to target path. If the context "ctx" is canceled or timed out, - // snapshot save stream will error out (e.g. context.Canceled, - // context.DeadlineExceeded). Make sure to specify only one endpoint - // in client configuration. Snapshot API must be requested to a - // selected node, and saved snapshot is the point-in-time state of - // the selected node. - Save(ctx context.Context, cfg clientv3.Config, dbPath string) error - - // Status returns the snapshot file information. - Status(dbPath string) (Status, error) - - // Restore restores a new etcd data directory from given snapshot - // file. It returns an error if specified data directory already - // exists, to prevent unintended data directory overwrites. - Restore(cfg RestoreConfig) error -} - -// NewV3 returns a new snapshot Manager for v3.x snapshot. -func NewV3(lg *zap.Logger) Manager { - if lg == nil { - lg = zap.NewExample() - } - return &v3Manager{lg: lg} -} - -type v3Manager struct { - lg *zap.Logger - - name string - dbPath string - walDir string - snapDir string - cl *membership.RaftCluster - - skipHashCheck bool -} - // hasChecksum returns "true" if the file size "n" // has appended sha256 hash digest. func hasChecksum(n int64) bool { @@ -98,8 +36,17 @@ func hasChecksum(n int64) bool { return (n % 512) == sha256.Size } -// Save fetches snapshot from remote etcd server and saves data to target path. -func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { +// Save fetches snapshot from remote etcd server and saves data +// to target path. If the context "ctx" is canceled or timed out, +// snapshot save stream will error out (e.g. context.Canceled, +// context.DeadlineExceeded). Make sure to specify only one endpoint +// in client configuration. Snapshot API must be requested to a +// selected node, and saved snapshot is the point-in-time state of +// the selected node. +func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error { + if lg == nil { + lg = zap.NewExample() + } if len(cfg.Endpoints) != 1 { return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints) } @@ -117,7 +64,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return fmt.Errorf("could not open %s (%v)", partpath, err) } - s.lg.Info("created temporary db file", zap.String("path", partpath)) + lg.Info("created temporary db file", zap.String("path", partpath)) now := time.Now() var rd io.ReadCloser @@ -125,7 +72,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return err } - s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0])) + lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0])) var size int64 size, err = io.Copy(f, rd) if err != nil { @@ -140,7 +87,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err = f.Close(); err != nil { return err } - s.lg.Info("fetched snapshot", + lg.Info("fetched snapshot", zap.String("endpoint", cfg.Endpoints[0]), zap.String("size", humanize.Bytes(uint64(size))), zap.String("took", humanize.Time(now)), @@ -149,374 +96,6 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err = os.Rename(partpath, dbPath); err != nil { return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) } - s.lg.Info("saved", zap.String("path", dbPath)) + lg.Info("saved", zap.String("path", dbPath)) return nil } - -// Status is the snapshot file status. -type Status struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -// Status returns the snapshot file information. -func (s *v3Manager) Status(dbPath string) (ds Status, err error) { - if _, err = os.Stat(dbPath); err != nil { - return ds, err - } - - db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) - if err != nil { - return ds, err - } - defer db.Close() - - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - if err = db.View(func(tx *bolt.Tx) error { - // check snapshot file integrity first - var dbErrStrings []string - for dbErr := range tx.Check() { - dbErrStrings = append(dbErrStrings, dbErr.Error()) - } - if len(dbErrStrings) > 0 { - return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings)) - } - ds.TotalSize = tx.Size() - c := tx.Cursor() - for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) - if b == nil { - return fmt.Errorf("cannot get hash of bucket %s", string(next)) - } - if _, err := h.Write(next); err != nil { - return fmt.Errorf("cannot write bucket %s : %v", string(next), err) - } - iskeyb := (string(next) == "key") - if err := b.ForEach(func(k, v []byte) error { - if _, err := h.Write(k); err != nil { - return fmt.Errorf("cannot write to bucket %s", err.Error()) - } - if _, err := h.Write(v); err != nil { - return fmt.Errorf("cannot write to bucket %s", err.Error()) - } - if iskeyb { - rev := bytesToRev(k) - ds.Revision = rev.main - } - ds.TotalKey++ - return nil - }); err != nil { - return fmt.Errorf("cannot write bucket %s : %v", string(next), err) - } - } - return nil - }); err != nil { - return ds, err - } - - ds.Hash = h.Sum32() - return ds, nil -} - -// RestoreConfig configures snapshot restore operation. -type RestoreConfig struct { - // SnapshotPath is the path of snapshot file to restore from. - SnapshotPath string - - // Name is the human-readable name of this member. - Name string - - // OutputDataDir is the target data directory to save restored data. - // OutputDataDir should not conflict with existing etcd data directory. - // If OutputDataDir already exists, it will return an error to prevent - // unintended data directory overwrites. - // If empty, defaults to "[Name].etcd" if not given. - OutputDataDir string - // OutputWALDir is the target WAL data directory. - // If empty, defaults to "[OutputDataDir]/member/wal" if not given. - OutputWALDir string - - // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. - PeerURLs []string - - // InitialCluster is the initial cluster configuration for restore bootstrap. - InitialCluster string - // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. - InitialClusterToken string - - // SkipHashCheck is "true" to ignore snapshot integrity hash value - // (required if copied from data directory). - SkipHashCheck bool -} - -// Restore restores a new etcd data directory from given snapshot file. -func (s *v3Manager) Restore(cfg RestoreConfig) error { - pURLs, err := types.NewURLs(cfg.PeerURLs) - if err != nil { - return err - } - var ics types.URLsMap - ics, err = types.NewURLsMap(cfg.InitialCluster) - if err != nil { - return err - } - - srv := etcdserver.ServerConfig{ - Logger: s.lg, - Name: cfg.Name, - PeerURLs: pURLs, - InitialPeerURLsMap: ics, - InitialClusterToken: cfg.InitialClusterToken, - } - if err = srv.VerifyBootstrap(); err != nil { - return err - } - - s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics) - if err != nil { - return err - } - - dataDir := cfg.OutputDataDir - if dataDir == "" { - dataDir = cfg.Name + ".etcd" - } - if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) { - return fmt.Errorf("data-dir %q not empty or could not be read", dataDir) - } - - walDir := cfg.OutputWALDir - if walDir == "" { - walDir = filepath.Join(dataDir, "member", "wal") - } else if fileutil.Exist(walDir) { - return fmt.Errorf("wal-dir %q exists", walDir) - } - - s.name = cfg.Name - s.dbPath = cfg.SnapshotPath - s.walDir = walDir - s.snapDir = filepath.Join(dataDir, "member", "snap") - s.skipHashCheck = cfg.SkipHashCheck - - s.lg.Info( - "restoring snapshot", - zap.String("path", s.dbPath), - zap.String("wal-dir", s.walDir), - zap.String("data-dir", dataDir), - zap.String("snap-dir", s.snapDir), - ) - if err = s.saveDB(); err != nil { - return err - } - if err = s.saveWALAndSnap(); err != nil { - return err - } - s.lg.Info( - "restored snapshot", - zap.String("path", s.dbPath), - zap.String("wal-dir", s.walDir), - zap.String("data-dir", dataDir), - zap.String("snap-dir", s.snapDir), - ) - - return nil -} - -// saveDB copies the database snapshot to the snapshot directory -func (s *v3Manager) saveDB() error { - f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) - if ferr != nil { - return ferr - } - defer f.Close() - - // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { - return err - } - sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { - return err - } - if _, err := f.Seek(0, io.SeekStart); err != nil { - return err - } - - if err := fileutil.CreateDirAll(s.snapDir); err != nil { - return err - } - - dbpath := filepath.Join(s.snapDir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) - if dberr != nil { - return dberr - } - dbClosed := false - defer func() { - if !dbClosed { - db.Close() - dbClosed = true - } - }() - if _, err := io.Copy(db, f); err != nil { - return err - } - - // truncate away integrity hash, if any. - off, serr := db.Seek(0, io.SeekEnd) - if serr != nil { - return serr - } - hasHash := hasChecksum(off) - if hasHash { - if err := db.Truncate(off - sha256.Size); err != nil { - return err - } - } - - if !hasHash && !s.skipHashCheck { - return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") - } - - if hasHash && !s.skipHashCheck { - // check for match - if _, err := db.Seek(0, io.SeekStart); err != nil { - return err - } - h := sha256.New() - if _, err := io.Copy(h, db); err != nil { - return err - } - dbsha := h.Sum(nil) - if !reflect.DeepEqual(sha, dbsha) { - return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) - } - } - - // db hash is OK, can now modify DB so it can be part of a new cluster - db.Close() - dbClosed = true - - commit := len(s.cl.Members()) - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - - ci := cindex.NewConsistentIndex(be.BatchTx()) - ci.SetConsistentIndex(uint64(commit)) - - // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) - - mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) - txn := mvs.Write(traceutil.TODO()) - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - - // trigger write-out of new consistent index - txn.End() - - mvs.Commit() - mvs.Close() - be.Close() - - return nil -} - -// saveWALAndSnap creates a WAL for the initial cluster -func (s *v3Manager) saveWALAndSnap() error { - if err := fileutil.CreateDirAll(s.walDir); err != nil { - return err - } - - // add members again to persist them to the store we create. - st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) - s.cl.SetStore(st) - for _, m := range s.cl.Members() { - s.cl.AddMember(m) - } - - m := s.cl.MemberByName(s.name) - md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} - metadata, merr := md.Marshal() - if merr != nil { - return merr - } - w, walerr := wal.Create(s.lg, s.walDir, metadata) - if walerr != nil { - return walerr - } - defer w.Close() - - peers := make([]raft.Peer, len(s.cl.MemberIDs())) - for i, id := range s.cl.MemberIDs() { - ctx, err := json.Marshal((*s.cl).Member(id)) - if err != nil { - return err - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - - ents := make([]raftpb.Entry, len(peers)) - nodeIDs := make([]uint64, len(peers)) - for i, p := range peers { - nodeIDs[i] = p.ID - cc := raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: p.ID, - Context: p.Context, - } - d, err := cc.Marshal() - if err != nil { - return err - } - ents[i] = raftpb.Entry{ - Type: raftpb.EntryConfChange, - Term: 1, - Index: uint64(i + 1), - Data: d, - } - } - - commit, term := uint64(len(ents)), uint64(1) - if err := w.Save(raftpb.HardState{ - Term: term, - Vote: peers[0].ID, - Commit: commit, - }, ents); err != nil { - return err - } - - b, berr := st.Save() - if berr != nil { - return berr - } - raftSnap := raftpb.Snapshot{ - Data: b, - Metadata: raftpb.SnapshotMetadata{ - Index: commit, - Term: term, - ConfState: raftpb.ConfState{ - Voters: nodeIDs, - }, - }, - } - sn := snap.New(s.lg, s.snapDir) - if err := sn.SaveSnap(raftSnap); err != nil { - return err - } - return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) -} diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index 7ad9676de..fab26eb0d 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -21,7 +21,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" v3 "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "github.com/dustin/go-humanize" ) diff --git a/etcdctl/ctlv3/command/printer_fields.go b/etcdctl/ctlv3/command/printer_fields.go index 6941636fc..c52e02982 100644 --- a/etcdctl/ctlv3/command/printer_fields.go +++ b/etcdctl/ctlv3/command/printer_fields.go @@ -20,7 +20,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" spb "go.etcd.io/etcd/api/v3/mvccpb" v3 "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" ) type fieldsPrinter struct{ printer } diff --git a/etcdctl/ctlv3/command/printer_json.go b/etcdctl/ctlv3/command/printer_json.go index a368d3c84..560621375 100644 --- a/etcdctl/ctlv3/command/printer_json.go +++ b/etcdctl/ctlv3/command/printer_json.go @@ -22,7 +22,7 @@ import ( "strconv" "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" ) type jsonPrinter struct { diff --git a/etcdctl/ctlv3/command/printer_simple.go b/etcdctl/ctlv3/command/printer_simple.go index e0213a2a2..04b9c0bfc 100644 --- a/etcdctl/ctlv3/command/printer_simple.go +++ b/etcdctl/ctlv3/command/printer_simple.go @@ -21,7 +21,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" v3 "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "go.etcd.io/etcd/v3/pkg/types" ) diff --git a/etcdctl/ctlv3/command/printer_table.go b/etcdctl/ctlv3/command/printer_table.go index c58024393..9478892ad 100644 --- a/etcdctl/ctlv3/command/printer_table.go +++ b/etcdctl/ctlv3/command/printer_table.go @@ -18,7 +18,7 @@ import ( "os" v3 "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "github.com/olekukonko/tablewriter" ) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 06eb33112..3cf2cbf12 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -20,7 +20,7 @@ import ( "path/filepath" "strings" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "github.com/spf13/cobra" "go.uber.org/zap" diff --git a/etcdctl/snapshot/doc.go b/etcdctl/snapshot/doc.go new file mode 100644 index 000000000..1c761be70 --- /dev/null +++ b/etcdctl/snapshot/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package snapshot implements utilities around etcd snapshot. +package snapshot diff --git a/clientv3/snapshot/util.go b/etcdctl/snapshot/util.go similarity index 100% rename from clientv3/snapshot/util.go rename to etcdctl/snapshot/util.go diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go new file mode 100644 index 000000000..982acccf9 --- /dev/null +++ b/etcdctl/snapshot/v3_snapshot.go @@ -0,0 +1,471 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "reflect" + "strings" + + bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/v3/clientv3" + "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdserver" + "go.etcd.io/etcd/v3/etcdserver/api/membership" + "go.etcd.io/etcd/v3/etcdserver/api/snap" + "go.etcd.io/etcd/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/v3/etcdserver/cindex" + "go.etcd.io/etcd/v3/lease" + "go.etcd.io/etcd/v3/mvcc" + "go.etcd.io/etcd/v3/mvcc/backend" + "go.etcd.io/etcd/v3/pkg/fileutil" + "go.etcd.io/etcd/v3/pkg/traceutil" + "go.etcd.io/etcd/v3/pkg/types" + "go.etcd.io/etcd/v3/raft" + "go.etcd.io/etcd/v3/raft/raftpb" + "go.etcd.io/etcd/v3/wal" + "go.etcd.io/etcd/v3/wal/walpb" + "go.uber.org/zap" +) + +// Manager defines snapshot methods. +type Manager interface { + // Save fetches snapshot from remote etcd server and saves data + // to target path. If the context "ctx" is canceled or timed out, + // snapshot save stream will error out (e.g. context.Canceled, + // context.DeadlineExceeded). Make sure to specify only one endpoint + // in client configuration. Snapshot API must be requested to a + // selected node, and saved snapshot is the point-in-time state of + // the selected node. + Save(ctx context.Context, cfg clientv3.Config, dbPath string) error + + // Status returns the snapshot file information. + Status(dbPath string) (Status, error) + + // Restore restores a new etcd data directory from given snapshot + // file. It returns an error if specified data directory already + // exists, to prevent unintended data directory overwrites. + Restore(cfg RestoreConfig) error +} + +// NewV3 returns a new snapshot Manager for v3.x snapshot. +func NewV3(lg *zap.Logger) Manager { + if lg == nil { + lg = zap.NewExample() + } + return &v3Manager{lg: lg} +} + +type v3Manager struct { + lg *zap.Logger + + name string + dbPath string + walDir string + snapDir string + cl *membership.RaftCluster + + skipHashCheck bool +} + +// hasChecksum returns "true" if the file size "n" +// has appended sha256 hash digest. +func hasChecksum(n int64) bool { + // 512 is chosen because it's a minimum disk sector size + // smaller than (and multiplies to) OS page size in most systems + return (n % 512) == sha256.Size +} + +// Save fetches snapshot from remote etcd server and saves data to target path. +func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { + return snapshot.Save(ctx, s.lg, cfg, dbPath) +} + +// Status is the snapshot file status. +type Status struct { + Hash uint32 `json:"hash"` + Revision int64 `json:"revision"` + TotalKey int `json:"totalKey"` + TotalSize int64 `json:"totalSize"` +} + +// Status returns the snapshot file information. +func (s *v3Manager) Status(dbPath string) (ds Status, err error) { + if _, err = os.Stat(dbPath); err != nil { + return ds, err + } + + db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) + if err != nil { + return ds, err + } + defer db.Close() + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + if err = db.View(func(tx *bolt.Tx) error { + // check snapshot file integrity first + var dbErrStrings []string + for dbErr := range tx.Check() { + dbErrStrings = append(dbErrStrings, dbErr.Error()) + } + if len(dbErrStrings) > 0 { + return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings)) + } + ds.TotalSize = tx.Size() + c := tx.Cursor() + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("cannot get hash of bucket %s", string(next)) + } + if _, err := h.Write(next); err != nil { + return fmt.Errorf("cannot write bucket %s : %v", string(next), err) + } + iskeyb := (string(next) == "key") + if err := b.ForEach(func(k, v []byte) error { + if _, err := h.Write(k); err != nil { + return fmt.Errorf("cannot write to bucket %s", err.Error()) + } + if _, err := h.Write(v); err != nil { + return fmt.Errorf("cannot write to bucket %s", err.Error()) + } + if iskeyb { + rev := bytesToRev(k) + ds.Revision = rev.main + } + ds.TotalKey++ + return nil + }); err != nil { + return fmt.Errorf("cannot write bucket %s : %v", string(next), err) + } + } + return nil + }); err != nil { + return ds, err + } + + ds.Hash = h.Sum32() + return ds, nil +} + +// RestoreConfig configures snapshot restore operation. +type RestoreConfig struct { + // SnapshotPath is the path of snapshot file to restore from. + SnapshotPath string + + // Name is the human-readable name of this member. + Name string + + // OutputDataDir is the target data directory to save restored data. + // OutputDataDir should not conflict with existing etcd data directory. + // If OutputDataDir already exists, it will return an error to prevent + // unintended data directory overwrites. + // If empty, defaults to "[Name].etcd" if not given. + OutputDataDir string + // OutputWALDir is the target WAL data directory. + // If empty, defaults to "[OutputDataDir]/member/wal" if not given. + OutputWALDir string + + // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. + PeerURLs []string + + // InitialCluster is the initial cluster configuration for restore bootstrap. + InitialCluster string + // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. + InitialClusterToken string + + // SkipHashCheck is "true" to ignore snapshot integrity hash value + // (required if copied from data directory). + SkipHashCheck bool +} + +// Restore restores a new etcd data directory from given snapshot file. +func (s *v3Manager) Restore(cfg RestoreConfig) error { + pURLs, err := types.NewURLs(cfg.PeerURLs) + if err != nil { + return err + } + var ics types.URLsMap + ics, err = types.NewURLsMap(cfg.InitialCluster) + if err != nil { + return err + } + + srv := etcdserver.ServerConfig{ + Logger: s.lg, + Name: cfg.Name, + PeerURLs: pURLs, + InitialPeerURLsMap: ics, + InitialClusterToken: cfg.InitialClusterToken, + } + if err = srv.VerifyBootstrap(); err != nil { + return err + } + + s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics) + if err != nil { + return err + } + + dataDir := cfg.OutputDataDir + if dataDir == "" { + dataDir = cfg.Name + ".etcd" + } + if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) { + return fmt.Errorf("data-dir %q not empty or could not be read", dataDir) + } + + walDir := cfg.OutputWALDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") + } else if fileutil.Exist(walDir) { + return fmt.Errorf("wal-dir %q exists", walDir) + } + + s.name = cfg.Name + s.dbPath = cfg.SnapshotPath + s.walDir = walDir + s.snapDir = filepath.Join(dataDir, "member", "snap") + s.skipHashCheck = cfg.SkipHashCheck + + s.lg.Info( + "restoring snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), + zap.String("data-dir", dataDir), + zap.String("snap-dir", s.snapDir), + ) + if err = s.saveDB(); err != nil { + return err + } + if err = s.saveWALAndSnap(); err != nil { + return err + } + s.lg.Info( + "restored snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), + zap.String("data-dir", dataDir), + zap.String("snap-dir", s.snapDir), + ) + + return nil +} + +// saveDB copies the database snapshot to the snapshot directory +func (s *v3Manager) saveDB() error { + f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + if ferr != nil { + return ferr + } + defer f.Close() + + // get snapshot integrity hash + if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + return err + } + sha := make([]byte, sha256.Size) + if _, err := f.Read(sha); err != nil { + return err + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := fileutil.CreateDirAll(s.snapDir); err != nil { + return err + } + + dbpath := filepath.Join(s.snapDir, "db") + db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + if dberr != nil { + return dberr + } + dbClosed := false + defer func() { + if !dbClosed { + db.Close() + dbClosed = true + } + }() + if _, err := io.Copy(db, f); err != nil { + return err + } + + // truncate away integrity hash, if any. + off, serr := db.Seek(0, io.SeekEnd) + if serr != nil { + return serr + } + hasHash := hasChecksum(off) + if hasHash { + if err := db.Truncate(off - sha256.Size); err != nil { + return err + } + } + + if !hasHash && !s.skipHashCheck { + return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") + } + + if hasHash && !s.skipHashCheck { + // check for match + if _, err := db.Seek(0, io.SeekStart); err != nil { + return err + } + h := sha256.New() + if _, err := io.Copy(h, db); err != nil { + return err + } + dbsha := h.Sum(nil) + if !reflect.DeepEqual(sha, dbsha) { + return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) + } + } + + // db hash is OK, can now modify DB so it can be part of a new cluster + db.Close() + dbClosed = true + + commit := len(s.cl.Members()) + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + + ci := cindex.NewConsistentIndex(be.BatchTx()) + ci.SetConsistentIndex(uint64(commit)) + + // a lessor never timeouts leases + lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) + + mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) + txn := mvs.Write(traceutil.TODO()) + btx := be.BatchTx() + del := func(k, v []byte) error { + txn.DeleteRange(k, nil) + return nil + } + + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + + // todo: add back new members when we start to deprecate old snap file. + btx.UnsafeForEach([]byte("members_removed"), del) + + // trigger write-out of new consistent index + txn.End() + + mvs.Commit() + mvs.Close() + be.Close() + + return nil +} + +// saveWALAndSnap creates a WAL for the initial cluster +func (s *v3Manager) saveWALAndSnap() error { + if err := fileutil.CreateDirAll(s.walDir); err != nil { + return err + } + + // add members again to persist them to the store we create. + st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + s.cl.SetStore(st) + for _, m := range s.cl.Members() { + s.cl.AddMember(m) + } + + m := s.cl.MemberByName(s.name) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + return merr + } + w, walerr := wal.Create(s.lg, s.walDir, metadata) + if walerr != nil { + return walerr + } + defer w.Close() + + peers := make([]raft.Peer, len(s.cl.MemberIDs())) + for i, id := range s.cl.MemberIDs() { + ctx, err := json.Marshal((*s.cl).Member(id)) + if err != nil { + return err + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + nodeIDs := make([]uint64, len(peers)) + for i, p := range peers { + nodeIDs[i] = p.ID + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context, + } + d, err := cc.Marshal() + if err != nil { + return err + } + ents[i] = raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + } + + commit, term := uint64(len(ents)), uint64(1) + if err := w.Save(raftpb.HardState{ + Term: term, + Vote: peers[0].ID, + Commit: commit, + }, ents); err != nil { + return err + } + + b, berr := st.Save() + if berr != nil { + return berr + } + raftSnap := raftpb.Snapshot{ + Data: b, + Metadata: raftpb.SnapshotMetadata{ + Index: commit, + Term: term, + ConfState: raftpb.ConfState{ + Voters: nodeIDs, + }, + }, + } + sn := snap.New(s.lg, s.snapDir) + if err := sn.SaveSnap(raftSnap); err != nil { + return err + } + return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) +} diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 93a52c243..c3c8cd9be 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -25,7 +25,7 @@ import ( "testing" "time" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "go.etcd.io/etcd/v3/pkg/expect" "go.etcd.io/etcd/v3/pkg/testutil" ) diff --git a/tests/functional/rpcpb/member.go b/tests/functional/rpcpb/member.go index 47bd9438d..208fb88f7 100644 --- a/tests/functional/rpcpb/member.go +++ b/tests/functional/rpcpb/member.go @@ -24,7 +24,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "go.etcd.io/etcd/v3/pkg/logutil" "go.etcd.io/etcd/v3/pkg/transport" diff --git a/tests/integration/clientv3/snapshot/v3_snapshot_test.go b/tests/integration/clientv3/snapshot/v3_snapshot_test.go new file mode 100644 index 000000000..8429cf89c --- /dev/null +++ b/tests/integration/clientv3/snapshot/v3_snapshot_test.go @@ -0,0 +1,123 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot_test + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "go.etcd.io/etcd/v3/clientv3" + "go.etcd.io/etcd/v3/clientv3/snapshot" + "go.etcd.io/etcd/v3/embed" + "go.etcd.io/etcd/v3/pkg/fileutil" + "go.etcd.io/etcd/v3/pkg/testutil" + + "go.uber.org/zap" +) + +// TestSaveSnapshotFilePermissions ensures that the snapshot is saved with +// the correct file permissions. +func TestSaveSnapshotFilePermissions(t *testing.T) { + expectedFileMode := os.FileMode(fileutil.PrivateFileMode) + kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} + dbPath := createSnapshotFile(t, kvs) + defer os.RemoveAll(dbPath) + + dbInfo, err := os.Stat(dbPath) + if err != nil { + t.Fatalf("failed to get test snapshot file status: %v", err) + } + actualFileMode := dbInfo.Mode() + + if expectedFileMode != actualFileMode { + t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode) + } +} + +type kv struct { + k, v string +} + +// creates a snapshot file and returns the file path. +func createSnapshotFile(t *testing.T, kvs []kv) string { + testutil.SkipTestIfShortMode(t, + "Snapshot creation tests are depending on embedded etcServer so are integration-level tests.") + clusterN := 1 + urls := newEmbedURLs(clusterN * 2) + cURLs, pURLs := urls[:clusterN], urls[clusterN:] + + cfg := embed.NewConfig() + cfg.Logger = "zap" + cfg.LogOutputs = []string{"/dev/null"} + cfg.Name = "default" + cfg.ClusterState = "new" + cfg.LCUrls, cfg.ACUrls = cURLs, cURLs + cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) + cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) + srv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(cfg.Dir) + srv.Close() + }() + select { + case <-srv.Server.ReadyNotify(): + case <-time.After(3 * time.Second): + t.Fatalf("failed to start embed.Etcd for creating snapshots") + } + + ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + for i := range kvs { + ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) + _, err = cli.Put(ctx, kvs[i].k, kvs[i].v) + cancel() + if err != nil { + t.Fatal(err) + } + } + + dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) + if err = snapshot.Save(context.Background(), zap.NewExample(), ccfg, dpPath); err != nil { + t.Fatal(err) + } + + os.RemoveAll(cfg.Dir) + srv.Close() + return dpPath +} + +func newEmbedURLs(n int) (urls []url.URL) { + urls = make([]url.URL, n) + for i := 0; i < n; i++ { + rand.Seed(int64(time.Now().Nanosecond())) + u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000))) + urls[i] = *u + } + return urls +} diff --git a/tests/integration/snapshot/member_test.go b/tests/integration/snapshot/member_test.go index e6fefdcdb..40e2c997a 100644 --- a/tests/integration/snapshot/member_test.go +++ b/tests/integration/snapshot/member_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snapshot +package snapshot_test import ( "context" diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index ebb4eef21..c2310152b 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package snapshot +package snapshot_test import ( "context" @@ -26,9 +26,8 @@ import ( "time" "go.etcd.io/etcd/v3/clientv3" - "go.etcd.io/etcd/v3/clientv3/snapshot" "go.etcd.io/etcd/v3/embed" - "go.etcd.io/etcd/v3/pkg/fileutil" + "go.etcd.io/etcd/v3/etcdctl/snapshot" "go.etcd.io/etcd/v3/pkg/testutil" "go.uber.org/zap" @@ -143,25 +142,6 @@ func TestSnapshotV3RestoreMulti(t *testing.T) { } } -// TestSnapshotFilePermissions ensures that the snapshot is saved with -// the correct file permissions. -func TestSnapshotFilePermissions(t *testing.T) { - expectedFileMode := os.FileMode(fileutil.PrivateFileMode) - kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} - dbPath := createSnapshotFile(t, kvs) - defer os.RemoveAll(dbPath) - - dbInfo, err := os.Stat(dbPath) - if err != nil { - t.Fatalf("failed to get test snapshot file status: %v", err) - } - actualFileMode := dbInfo.Mode() - - if expectedFileMode != actualFileMode { - t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode) - } -} - // TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file. func TestCorruptedBackupFileCheck(t *testing.T) { dbPath := "testdata/corrupted_backup.db"