From 24a6abaf59c60153bf68128fd96e0162834949c6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 13 Feb 2016 22:33:46 -0800 Subject: [PATCH] *: move sync logic to clientv3/sync --- clientv3/sync/syncer.go | 150 ++++++++++++++++++++++++++ etcdctlv3/command/snapshot_command.go | 59 +++++----- 2 files changed, 175 insertions(+), 34 deletions(-) create mode 100644 clientv3/sync/syncer.go diff --git a/clientv3/sync/syncer.go b/clientv3/sync/syncer.go new file mode 100644 index 000000000..334c87b2d --- /dev/null +++ b/clientv3/sync/syncer.go @@ -0,0 +1,150 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" +) + +const ( + batchLimit = 1000 +) + +// Syncer syncs with the key-value state of an etcd cluster. +type Syncer interface { + // SyncBase syncs the base state of the key-value state. + // The key-value state are sent through the returned chan. + SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) + // SyncBase syncs the updates of the key-value state. + // The update events are sent through the returned chan. + SyncUpdates(ctx context.Context) clientv3.WatchChan +} + +// NewSyncer creates a Syncer. +func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer { + return &syncer{c: c, prefix: prefix, rev: rev} +} + +type syncer struct { + c *clientv3.Client + rev int64 + prefix string +} + +func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) { + respchan := make(chan clientv3.GetResponse, 1024) + errchan := make(chan error, 1) + + kapi := clientv3.NewKV(s.c) + // if rev is not specified, we will choose the most recent revision. + if s.rev == 0 { + resp, err := kapi.Get(ctx, "") + if err != nil { + errchan <- err + close(respchan) + close(errchan) + return respchan, errchan + } + s.rev = resp.Header.Revision + } + + go func() { + defer close(respchan) + defer close(errchan) + + var key, end string + + opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)} + + if len(s.prefix) == 0 { + // If len(s.prefix) == 0, we will sync the entire key-value space. + // We then range from the smallest key (0x00) to the end. + opts = append(opts, clientv3.WithFromKey()) + key = "\x00" + } else { + // If len(s.prefix) != 0, we will sync key-value space with given prefix. + // We then range from the prefix to the next prefix if exists. Or we will + // range from the prefix to the end if the next prefix does not exists. + // (For example, when the given prefix is 0xffff, the next prefix does not + // exist). + key = s.prefix + end = string(incr([]byte(s.prefix))) + if len(end) == 0 { + opts = append(opts, clientv3.WithFromKey()) + } else { + opts = append(opts, clientv3.WithRange(string(end))) + } + } + + for { + resp, err := kapi.Get(ctx, key, opts...) + if err != nil { + errchan <- err + return + } + + respchan <- (clientv3.GetResponse)(*resp) + + if !resp.More { + return + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + }() + + return respchan, errchan +} + +func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { + if s.rev == 0 { + panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?") + } + + respchan := make(chan clientv3.WatchResponse, 1024) + + go func() { + wapi := clientv3.NewWatcher(s.c) + defer wapi.Close() + defer close(respchan) + + // get all events since revision (or get non-compacted revision, if + // rev is too far behind) + wch := wapi.WatchPrefix(ctx, s.prefix, s.rev) + for wr := range wch { + respchan <- wr + } + }() + + return respchan +} + +func incr(bs []byte) []byte { + c := int8(1) + for i := range bs { + j := len(bs) - i - 1 + n := int8(bs[j]) + n += c + bs[j] = byte(n) + if n == 0 { + c = 1 + } else { + c = 0 + return bs + } + } + return nil +} diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 27ab3610b..247b338e8 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/sync" "github.com/coreos/etcd/etcdserver/api/v3rpc" ) @@ -57,10 +58,11 @@ func snapshotToStdout(c *clientv3.Client) { if len(wr.Events) > 0 { wr.CompactRevision = 1 } - if rev := snapshot(os.Stdout, c, wr.CompactRevision); rev != 0 { + if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 { err := fmt.Errorf("snapshot interrupted by compaction %v", rev) ExitWithError(ExitInterrupted, err) } + os.Stdout.Sync() } // snapshotToFile atomically writes a snapshot to a file @@ -88,13 +90,29 @@ func snapshotToFile(c *clientv3.Client, path string) { // snapshot reads all of a watcher; returns compaction revision if incomplete // TODO: stabilize snapshot format func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { - wapi := clientv3.NewWatcher(c) - defer wapi.Close() + s := sync.NewSyncer(c, "", rev) - // get all events since revision (or get non-compacted revision, if - // rev is too far behind) - wch := wapi.WatchPrefix(context.TODO(), "", rev) - for wr := range wch { + rc, errc := s.SyncBase(context.TODO()) + + for r := range rc { + for _, kv := range r.Kvs { + fmt.Fprintln(w, kv) + } + } + + err := <-errc + if err != nil { + if err == v3rpc.ErrCompacted { + // will get correct compact revision on retry + return rev + 1 + } + // failed for some unknown reason, retry on same revision + return rev + } + + wc := s.SyncUpdates(context.TODO()) + + for wr := range wc { if len(wr.Events) == 0 { return wr.CompactRevision } @@ -107,32 +125,5 @@ func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { } } - // get base state at rev - kapi := clientv3.NewKV(c) - key := "\x00" - for { - kvs, err := kapi.Get( - context.TODO(), - key, - clientv3.WithFromKey(), - clientv3.WithRev(rev+1), - clientv3.WithLimit(1000)) - if err == v3rpc.ErrCompacted { - // will get correct compact revision on retry - return rev + 1 - } else if err != nil { - // failed for some unknown reason, retry on same revision - return rev - } - for _, kv := range kvs.Kvs { - fmt.Fprintln(w, kv) - } - if !kvs.More { - break - } - // move to next key - key = string(append(kvs.Kvs[len(kvs.Kvs)-1].Key, 0)) - } - return 0 }