added a new flag --rev to make-mirror command to support incremental mirror

This commit is contained in:
ahrtr 2021-12-07 10:57:10 +08:00
parent 69279532f4
commit 661e0a91ef
3 changed files with 43 additions and 18 deletions

View File

@ -34,6 +34,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
- Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror.
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)

View File

@ -43,6 +43,7 @@ var (
mmuser string
mmpassword string
mmnodestprefix bool
mmrev int64
)
// NewMakeMirrorCommand returns the cobra command for "makeMirror".
@ -54,6 +55,7 @@ func NewMakeMirrorCommand() *cobra.Command {
}
c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror")
c.Flags().Int64Var(&mmrev, "rev", 0, "Specify the kv revision to start to mirror")
c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster")
c.Flags().BoolVar(&mmnodestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster")
c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster")
@ -142,28 +144,37 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
}
}()
s := mirror.NewSyncer(c, mmprefix, 0)
rc, errc := s.SyncBase(ctx)
// if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix
if !mmnodestprefix && len(mmdestprefix) == 0 {
mmdestprefix = mmprefix
startRev := mmrev - 1
if startRev < 0 {
startRev = 0
}
for r := range rc {
for _, kv := range r.Kvs {
_, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value))
if err != nil {
return err
}
atomic.AddInt64(&total, 1)
s := mirror.NewSyncer(c, mmprefix, startRev)
// If a rev is provided, then do not sync the whole key space.
// Instead, just start watching the key space starting from the rev
if startRev == 0 {
rc, errc := s.SyncBase(ctx)
// if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix
if !mmnodestprefix && len(mmdestprefix) == 0 {
mmdestprefix = mmprefix
}
}
err := <-errc
if err != nil {
return err
for r := range rc {
for _, kv := range r.Kvs {
_, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value))
if err != nil {
return err
}
atomic.AddInt64(&total, 1)
}
}
err := <-errc
if err != nil {
return err
}
}
wc := s.SyncUpdates(ctx)

View File

@ -25,6 +25,7 @@ import (
func TestCtlV3MakeMirror(t *testing.T) { testCtl(t, makeMirrorTest) }
func TestCtlV3MakeMirrorModifyDestPrefix(t *testing.T) { testCtl(t, makeMirrorModifyDestPrefixTest) }
func TestCtlV3MakeMirrorNoDestPrefix(t *testing.T) { testCtl(t, makeMirrorNoDestPrefixTest) }
func TestCtlV3MakeMirrorWithWatchRev(t *testing.T) { testCtl(t, makeMirrorWithWatchRev) }
func makeMirrorTest(cx ctlCtx) {
var (
@ -59,6 +60,18 @@ func makeMirrorNoDestPrefixTest(cx ctlCtx) {
testMirrorCommand(cx, flags, kvs, kvs2, srcprefix, destprefix)
}
func makeMirrorWithWatchRev(cx ctlCtx) {
var (
flags = []string{"--prefix", "o_", "--no-dest-prefix", "--rev", "4"}
kvs = []kv{{"o_key1", "val1"}, {"o_key2", "val2"}, {"o_key3", "val3"}, {"o_key4", "val4"}}
kvs2 = []kvExec{{key: "key3", val: "val3"}, {key: "key4", val: "val4"}}
srcprefix = "o_"
destprefix = "key"
)
testMirrorCommand(cx, flags, kvs, kvs2, srcprefix, destprefix)
}
func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvExec, srcprefix, destprefix string) {
// set up another cluster to mirror with
mirrorcfg := e2e.NewConfigAutoTLS()