From 5abe521e77bdd1896f6a569f8b9cca3cb7dd5e5c Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 12 Apr 2018 12:43:35 -0700 Subject: [PATCH] snapshot: initial commit (for functional tests) Signed-off-by: Gyuho Lee --- snapshot/doc.go | 16 ++ snapshot/member_test.go | 125 +++++++++ snapshot/util.go | 35 +++ snapshot/v3_snapshot.go | 485 +++++++++++++++++++++++++++++++++++ snapshot/v3_snapshot_test.go | 272 ++++++++++++++++++++ 5 files changed, 933 insertions(+) create mode 100644 snapshot/doc.go create mode 100644 snapshot/member_test.go create mode 100644 snapshot/util.go create mode 100644 snapshot/v3_snapshot.go create mode 100644 snapshot/v3_snapshot_test.go diff --git a/snapshot/doc.go b/snapshot/doc.go new file mode 100644 index 000000000..1c761be70 --- /dev/null +++ b/snapshot/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package snapshot implements utilities around etcd snapshot. +package snapshot diff --git a/snapshot/member_test.go b/snapshot/member_test.go new file mode 100644 index 000000000..23ca52af1 --- /dev/null +++ b/snapshot/member_test.go @@ -0,0 +1,125 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/embed" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/pkg/testutil" +) + +// TestSnapshotV3RestoreMultiMemberAdd ensures that multiple members +// can boot into the same cluster after being restored from a same +// snapshot file, and also be able to add another member to the cluster. +func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { + kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} + dbPath := createSnapshotFile(t, kvs) + + clusterN := 3 + cURLs, pURLs, srvs := restoreCluster(t, clusterN, dbPath) + defer func() { + for i := 0; i < clusterN; i++ { + os.RemoveAll(srvs[i].Config().Dir) + srvs[i].Close() + } + }() + + // wait for health interval + leader election + time.Sleep(etcdserver.HealthInterval + 2*time.Second) + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[0].String()}}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + urls := newEmbedURLs(2) + newCURLs, newPURLs := urls[:1], urls[1:] + if _, err = cli.MemberAdd(context.Background(), []string{newPURLs[0].String()}); err != nil { + t.Fatal(err) + } + + // wait for membership reconfiguration apply + time.Sleep(testutil.ApplyTimeout) + + cfg := embed.NewConfig() + cfg.Name = "3" + cfg.InitialClusterToken = testClusterTkn + cfg.ClusterState = "existing" + cfg.LCUrls, cfg.ACUrls = newCURLs, newCURLs + cfg.LPUrls, cfg.APUrls = newPURLs, newPURLs + cfg.InitialCluster = "" + for i := 0; i < clusterN; i++ { + cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) + } + cfg.InitialCluster = cfg.InitialCluster[1:] + cfg.InitialCluster += fmt.Sprintf(",%s=%s", cfg.Name, newPURLs[0].String()) + cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) + + srv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(cfg.Dir) + srv.Close() + }() + select { + case <-srv.Server.ReadyNotify(): + case <-time.After(10 * time.Second): + t.Fatalf("failed to start the newly added etcd member") + } + + cli2, err := clientv3.New(clientv3.Config{Endpoints: []string{newCURLs[0].String()}}) + if err != nil { + t.Fatal(err) + } + defer cli2.Close() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) + mresp, err := cli2.MemberList(ctx) + cancel() + if err != nil { + t.Fatal(err) + } + if len(mresp.Members) != 4 { + t.Fatalf("expected 4 members, got %+v", mresp) + } + + // make sure restored cluster has kept all data on recovery + var gresp *clientv3.GetResponse + ctx, cancel = context.WithTimeout(context.Background(), testutil.RequestTimeout) + gresp, err = cli2.Get(ctx, "foo", clientv3.WithPrefix()) + cancel() + if err != nil { + t.Fatal(err) + } + for i := range gresp.Kvs { + if string(gresp.Kvs[i].Key) != kvs[i].k { + t.Fatalf("#%d: key expected %s, got %s", i, kvs[i].k, string(gresp.Kvs[i].Key)) + } + if string(gresp.Kvs[i].Value) != kvs[i].v { + t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[i].Value)) + } + } +} diff --git a/snapshot/util.go b/snapshot/util.go new file mode 100644 index 000000000..93ba70b6c --- /dev/null +++ b/snapshot/util.go @@ -0,0 +1,35 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import "encoding/binary" + +type revision struct { + main int64 + sub int64 +} + +func bytesToRev(bytes []byte) revision { + return revision{ + main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sub: int64(binary.BigEndian.Uint64(bytes[9:])), + } +} + +// initIndex implements ConsistentIndexGetter so the snapshot won't block +// the new raft instance by waiting for a future raft index. +type initIndex int + +func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } diff --git a/snapshot/v3_snapshot.go b/snapshot/v3_snapshot.go new file mode 100644 index 000000000..87da3fb86 --- /dev/null +++ b/snapshot/v3_snapshot.go @@ -0,0 +1,485 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "reflect" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" + + bolt "github.com/coreos/bbolt" + "go.uber.org/zap" +) + +// Manager defines snapshot methods. +type Manager interface { + // Save fetches snapshot from remote etcd server and saves data + // to target path. If the context "ctx" is canceled or timed out, + // snapshot save stream will error out (e.g. context.Canceled, + // context.DeadlineExceeded). Make sure to specify only one endpoint + // in client configuration. Snapshot API must be requested to a + // selected node, and saved snapshot is the point-in-time state of + // the selected node. + Save(ctx context.Context, cfg clientv3.Config, dbPath string) error + + // Status returns the snapshot file information. + Status(dbPath string) (Status, error) + + // Restore restores a new etcd data directory from given snapshot + // file. It returns an error if specified data directory already + // exists, to prevent unintended data directory overwrites. + Restore(cfg RestoreConfig) error +} + +// NewV3 returns a new snapshot Manager for v3.x snapshot. +func NewV3(lg *zap.Logger) Manager { + if lg == nil { + lg = zap.NewExample() + } + return &v3Manager{lg: lg} +} + +type v3Manager struct { + lg *zap.Logger + + name string + dbPath string + walDir string + snapDir string + cl *membership.RaftCluster + + skipHashCheck bool +} + +// Save fetches snapshot from remote etcd server and saves data to target path. +func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { + if len(cfg.Endpoints) != 1 { + return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints) + } + cli, err := clientv3.New(cfg) + if err != nil { + return err + } + defer cli.Close() + + partpath := dbPath + ".part" + defer os.RemoveAll(partpath) + + var f *os.File + f, err = os.Create(partpath) + if err != nil { + return fmt.Errorf("could not open %s (%v)", partpath, err) + } + s.lg.Info( + "created temporary db file", + zap.String("path", partpath), + ) + + now := time.Now() + var rd io.ReadCloser + rd, err = cli.Snapshot(ctx) + if err != nil { + return err + } + s.lg.Info( + "fetching snapshot", + zap.String("endpoint", cfg.Endpoints[0]), + ) + if _, err = io.Copy(f, rd); err != nil { + return err + } + if err = fileutil.Fsync(f); err != nil { + return err + } + if err = f.Close(); err != nil { + return err + } + s.lg.Info( + "fetched snapshot", + zap.String("endpoint", cfg.Endpoints[0]), + zap.Duration("took", time.Since(now)), + ) + + if err = os.Rename(partpath, dbPath); err != nil { + return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) + } + s.lg.Info("saved", zap.String("path", dbPath)) + return nil +} + +// Status is the snapshot file status. +type Status struct { + Hash uint32 `json:"hash"` + Revision int64 `json:"revision"` + TotalKey int `json:"totalKey"` + TotalSize int64 `json:"totalSize"` +} + +// Status returns the snapshot file information. +func (s *v3Manager) Status(dbPath string) (ds Status, err error) { + if _, err = os.Stat(dbPath); err != nil { + return ds, err + } + + db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) + if err != nil { + return ds, err + } + defer db.Close() + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + if err = db.View(func(tx *bolt.Tx) error { + ds.TotalSize = tx.Size() + c := tx.Cursor() + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("cannot get hash of bucket %s", string(next)) + } + h.Write(next) + iskeyb := (string(next) == "key") + b.ForEach(func(k, v []byte) error { + h.Write(k) + h.Write(v) + if iskeyb { + rev := bytesToRev(k) + ds.Revision = rev.main + } + ds.TotalKey++ + return nil + }) + } + return nil + }); err != nil { + return ds, err + } + + ds.Hash = h.Sum32() + return ds, nil +} + +// RestoreConfig configures snapshot restore operation. +type RestoreConfig struct { + // SnapshotPath is the path of snapshot file to restore from. + SnapshotPath string + + // Name is the human-readable name of this member. + Name string + + // OutputDataDir is the target data directory to save restored data. + // OutputDataDir should not conflict with existing etcd data directory. + // If OutputDataDir already exists, it will return an error to prevent + // unintended data directory overwrites. + // If empty, defaults to "[Name].etcd" if not given. + OutputDataDir string + // OutputWALDir is the target WAL data directory. + // If empty, defaults to "[OutputDataDir]/member/wal" if not given. + OutputWALDir string + + // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. + PeerURLs []string + + // InitialCluster is the initial cluster configuration for restore bootstrap. + InitialCluster string + // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. + InitialClusterToken string + + // SkipHashCheck is "true" to ignore snapshot integrity hash value + // (required if copied from data directory). + SkipHashCheck bool +} + +// Restore restores a new etcd data directory from given snapshot file. +func (s *v3Manager) Restore(cfg RestoreConfig) error { + pURLs, err := types.NewURLs(cfg.PeerURLs) + if err != nil { + return err + } + var ics types.URLsMap + ics, err = types.NewURLsMap(cfg.InitialCluster) + if err != nil { + return err + } + + srv := etcdserver.ServerConfig{ + Name: cfg.Name, + PeerURLs: pURLs, + InitialPeerURLsMap: ics, + InitialClusterToken: cfg.InitialClusterToken, + } + if err = srv.VerifyBootstrap(); err != nil { + return err + } + + s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, ics) + if err != nil { + return err + } + + dataDir := cfg.OutputDataDir + if dataDir == "" { + dataDir = cfg.Name + ".etcd" + } + if fileutil.Exist(dataDir) { + return fmt.Errorf("data-dir %q exists", dataDir) + } + + walDir := cfg.OutputWALDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") + } else if fileutil.Exist(walDir) { + return fmt.Errorf("wal-dir %q exists", walDir) + } + + s.name = cfg.Name + s.dbPath = cfg.SnapshotPath + s.walDir = walDir + s.snapDir = filepath.Join(dataDir, "member", "snap") + s.skipHashCheck = cfg.SkipHashCheck + + s.lg.Info( + "restoring snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), + zap.String("data-dir", dataDir), + zap.String("snap-dir", s.snapDir), + ) + if err = s.saveDB(); err != nil { + return err + } + if err = s.saveWALAndSnap(); err != nil { + return err + } + s.lg.Info( + "restored snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), + zap.String("data-dir", dataDir), + zap.String("snap-dir", s.snapDir), + ) + + return nil +} + +// saveDB copies the database snapshot to the snapshot directory +func (s *v3Manager) saveDB() error { + f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + if ferr != nil { + return ferr + } + defer f.Close() + + // get snapshot integrity hash + if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + return err + } + sha := make([]byte, sha256.Size) + if _, err := f.Read(sha); err != nil { + return err + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := fileutil.CreateDirAll(s.snapDir); err != nil { + return err + } + + dbpath := filepath.Join(s.snapDir, "db") + db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + if dberr != nil { + return dberr + } + if _, err := io.Copy(db, f); err != nil { + return err + } + + // truncate away integrity hash, if any. + off, serr := db.Seek(0, io.SeekEnd) + if serr != nil { + return serr + } + hasHash := (off % 512) == sha256.Size + if hasHash { + if err := db.Truncate(off - sha256.Size); err != nil { + return err + } + } + + if !hasHash && !s.skipHashCheck { + return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") + } + + if hasHash && !s.skipHashCheck { + // check for match + if _, err := db.Seek(0, io.SeekStart); err != nil { + return err + } + h := sha256.New() + if _, err := io.Copy(h, db); err != nil { + return err + } + dbsha := h.Sum(nil) + if !reflect.DeepEqual(sha, dbsha) { + return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) + } + } + + // db hash is OK, can now modify DB so it can be part of a new cluster + db.Close() + + commit := len(s.cl.Members()) + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + + // a lessor never timeouts leases + lessor := lease.NewLessor(be, math.MaxInt64) + + mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) + txn := mvs.Write() + btx := be.BatchTx() + del := func(k, v []byte) error { + txn.DeleteRange(k, nil) + return nil + } + + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + + // todo: add back new members when we start to deprecate old snap file. + btx.UnsafeForEach([]byte("members_removed"), del) + + // trigger write-out of new consistent index + txn.End() + + mvs.Commit() + mvs.Close() + be.Close() + + return nil +} + +// saveWALAndSnap creates a WAL for the initial cluster +func (s *v3Manager) saveWALAndSnap() error { + if err := fileutil.CreateDirAll(s.walDir); err != nil { + return err + } + + // add members again to persist them to the store we create. + st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + s.cl.SetStore(st) + for _, m := range s.cl.Members() { + s.cl.AddMember(m) + } + + m := s.cl.MemberByName(s.name) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + return merr + } + w, walerr := wal.Create(s.walDir, metadata) + if walerr != nil { + return walerr + } + defer w.Close() + + peers := make([]raft.Peer, len(s.cl.MemberIDs())) + for i, id := range s.cl.MemberIDs() { + ctx, err := json.Marshal((*s.cl).Member(id)) + if err != nil { + return err + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + nodeIDs := make([]uint64, len(peers)) + for i, p := range peers { + nodeIDs[i] = p.ID + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context, + } + d, err := cc.Marshal() + if err != nil { + return err + } + ents[i] = raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + } + + commit, term := uint64(len(ents)), uint64(1) + if err := w.Save(raftpb.HardState{ + Term: term, + Vote: peers[0].ID, + Commit: commit, + }, ents); err != nil { + return err + } + + b, berr := st.Save() + if berr != nil { + return berr + } + raftSnap := raftpb.Snapshot{ + Data: b, + Metadata: raftpb.SnapshotMetadata{ + Index: commit, + Term: term, + ConfState: raftpb.ConfState{ + Nodes: nodeIDs, + }, + }, + } + sn := snap.New(s.snapDir) + if err := sn.SaveSnap(raftSnap); err != nil { + return err + } + + return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) +} diff --git a/snapshot/v3_snapshot_test.go b/snapshot/v3_snapshot_test.go new file mode 100644 index 000000000..2aed7b014 --- /dev/null +++ b/snapshot/v3_snapshot_test.go @@ -0,0 +1,272 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/embed" + "github.com/coreos/etcd/pkg/testutil" + + "go.uber.org/zap" +) + +// TestSnapshotV3RestoreSingle tests single node cluster restoring +// from a snapshot file. +func TestSnapshotV3RestoreSingle(t *testing.T) { + kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} + dbPath := createSnapshotFile(t, kvs) + defer os.RemoveAll(dbPath) + + clusterN := 1 + urls := newEmbedURLs(clusterN * 2) + cURLs, pURLs := urls[:clusterN], urls[clusterN:] + + cfg := embed.NewConfig() + cfg.Name = "s1" + cfg.InitialClusterToken = testClusterTkn + cfg.ClusterState = "existing" + cfg.LCUrls, cfg.ACUrls = cURLs, cURLs + cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) + cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) + + sp := NewV3(zap.NewExample()) + pss := make([]string, 0, len(pURLs)) + for _, p := range pURLs { + pss = append(pss, p.String()) + } + if err := sp.Restore(RestoreConfig{ + SnapshotPath: dbPath, + Name: cfg.Name, + OutputDataDir: cfg.Dir, + InitialCluster: cfg.InitialCluster, + InitialClusterToken: cfg.InitialClusterToken, + PeerURLs: pss, + }); err != nil { + t.Fatal(err) + } + + srv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(cfg.Dir) + srv.Close() + }() + select { + case <-srv.Server.ReadyNotify(): + case <-time.After(3 * time.Second): + t.Fatalf("failed to start restored etcd member") + } + + var cli *clientv3.Client + cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + for i := range kvs { + var gresp *clientv3.GetResponse + gresp, err = cli.Get(context.Background(), kvs[i].k) + if err != nil { + t.Fatal(err) + } + if string(gresp.Kvs[0].Value) != kvs[i].v { + t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) + } + } +} + +// TestSnapshotV3RestoreMulti ensures that multiple members +// can boot into the same cluster after being restored from a same +// snapshot file. +func TestSnapshotV3RestoreMulti(t *testing.T) { + kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} + dbPath := createSnapshotFile(t, kvs) + defer os.RemoveAll(dbPath) + + clusterN := 3 + cURLs, _, srvs := restoreCluster(t, clusterN, dbPath) + defer func() { + for i := 0; i < clusterN; i++ { + os.RemoveAll(srvs[i].Config().Dir) + srvs[i].Close() + } + }() + + // wait for leader election + time.Sleep(time.Second) + + for i := 0; i < clusterN; i++ { + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + for i := range kvs { + var gresp *clientv3.GetResponse + gresp, err = cli.Get(context.Background(), kvs[i].k) + if err != nil { + t.Fatal(err) + } + if string(gresp.Kvs[0].Value) != kvs[i].v { + t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) + } + } + } +} + +type kv struct { + k, v string +} + +// creates a snapshot file and returns the file path. +func createSnapshotFile(t *testing.T, kvs []kv) string { + clusterN := 1 + urls := newEmbedURLs(clusterN * 2) + cURLs, pURLs := urls[:clusterN], urls[clusterN:] + + cfg := embed.NewConfig() + cfg.Name = "default" + cfg.ClusterState = "new" + cfg.LCUrls, cfg.ACUrls = cURLs, cURLs + cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) + cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) + srv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(cfg.Dir) + srv.Close() + }() + select { + case <-srv.Server.ReadyNotify(): + case <-time.After(3 * time.Second): + t.Fatalf("failed to start embed.Etcd for creating snapshots") + } + + ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + for i := range kvs { + ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) + _, err = cli.Put(ctx, kvs[i].k, kvs[i].v) + cancel() + if err != nil { + t.Fatal(err) + } + } + + sp := NewV3(zap.NewExample()) + dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) + if err = sp.Save(context.Background(), ccfg, dpPath); err != nil { + t.Fatal(err) + } + + os.RemoveAll(cfg.Dir) + srv.Close() + return dpPath +} + +const testClusterTkn = "tkn" + +func restoreCluster(t *testing.T, clusterN int, dbPath string) ( + cURLs []url.URL, + pURLs []url.URL, + srvs []*embed.Etcd) { + urls := newEmbedURLs(clusterN * 2) + cURLs, pURLs = urls[:clusterN], urls[clusterN:] + + ics := "" + for i := 0; i < clusterN; i++ { + ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) + } + ics = ics[1:] + + cfgs := make([]*embed.Config, clusterN) + for i := 0; i < clusterN; i++ { + cfg := embed.NewConfig() + cfg.Name = fmt.Sprintf("%d", i) + cfg.InitialClusterToken = testClusterTkn + cfg.ClusterState = "existing" + cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} + cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} + cfg.InitialCluster = ics + cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i)) + + sp := NewV3(zap.NewExample()) + if err := sp.Restore(RestoreConfig{ + SnapshotPath: dbPath, + Name: cfg.Name, + OutputDataDir: cfg.Dir, + PeerURLs: []string{pURLs[i].String()}, + InitialCluster: ics, + InitialClusterToken: cfg.InitialClusterToken, + }); err != nil { + t.Fatal(err) + } + cfgs[i] = cfg + } + + sch := make(chan *embed.Etcd) + for i := range cfgs { + go func(idx int) { + srv, err := embed.StartEtcd(cfgs[idx]) + if err != nil { + t.Fatal(err) + } + + <-srv.Server.ReadyNotify() + sch <- srv + }(i) + } + + srvs = make([]*embed.Etcd, clusterN) + for i := 0; i < clusterN; i++ { + select { + case srv := <-sch: + srvs[i] = srv + case <-time.After(5 * time.Second): + t.Fatalf("#%d: failed to start embed.Etcd", i) + } + } + return cURLs, pURLs, srvs +} + +// TODO: TLS +func newEmbedURLs(n int) (urls []url.URL) { + urls = make([]url.URL, n) + for i := 0; i < n; i++ { + rand.Seed(int64(time.Now().Nanosecond())) + u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000))) + urls[i] = *u + } + return urls +}