Merge pull request #4523 from xiang90/syncer

*: move sync logic to clientv3/sync
This commit is contained in:
Xiang Li 2016-02-14 23:24:39 -08:00
commit fe5500228a
2 changed files with 175 additions and 34 deletions

150
clientv3/sync/syncer.go Normal file
View File

@ -0,0 +1,150 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
const (
batchLimit = 1000
)
// Syncer syncs with the key-value state of an etcd cluster.
type Syncer interface {
// SyncBase syncs the base state of the key-value state.
// The key-value state are sent through the returned chan.
SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
// SyncBase syncs the updates of the key-value state.
// The update events are sent through the returned chan.
SyncUpdates(ctx context.Context) clientv3.WatchChan
}
// NewSyncer creates a Syncer.
func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
return &syncer{c: c, prefix: prefix, rev: rev}
}
type syncer struct {
c *clientv3.Client
rev int64
prefix string
}
func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
respchan := make(chan clientv3.GetResponse, 1024)
errchan := make(chan error, 1)
kapi := clientv3.NewKV(s.c)
// if rev is not specified, we will choose the most recent revision.
if s.rev == 0 {
resp, err := kapi.Get(ctx, "")
if err != nil {
errchan <- err
close(respchan)
close(errchan)
return respchan, errchan
}
s.rev = resp.Header.Revision
}
go func() {
defer close(respchan)
defer close(errchan)
var key, end string
opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}
if len(s.prefix) == 0 {
// If len(s.prefix) == 0, we will sync the entire key-value space.
// We then range from the smallest key (0x00) to the end.
opts = append(opts, clientv3.WithFromKey())
key = "\x00"
} else {
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
// We then range from the prefix to the next prefix if exists. Or we will
// range from the prefix to the end if the next prefix does not exists.
// (For example, when the given prefix is 0xffff, the next prefix does not
// exist).
key = s.prefix
end = string(incr([]byte(s.prefix)))
if len(end) == 0 {
opts = append(opts, clientv3.WithFromKey())
} else {
opts = append(opts, clientv3.WithRange(string(end)))
}
}
for {
resp, err := kapi.Get(ctx, key, opts...)
if err != nil {
errchan <- err
return
}
respchan <- (clientv3.GetResponse)(*resp)
if !resp.More {
return
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}
}()
return respchan, errchan
}
func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
if s.rev == 0 {
panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
}
respchan := make(chan clientv3.WatchResponse, 1024)
go func() {
wapi := clientv3.NewWatcher(s.c)
defer wapi.Close()
defer close(respchan)
// get all events since revision (or get non-compacted revision, if
// rev is too far behind)
wch := wapi.WatchPrefix(ctx, s.prefix, s.rev)
for wr := range wch {
respchan <- wr
}
}()
return respchan
}
func incr(bs []byte) []byte {
c := int8(1)
for i := range bs {
j := len(bs) - i - 1
n := int8(bs[j])
n += c
bs[j] = byte(n)
if n == 0 {
c = 1
} else {
c = 0
return bs
}
}
return nil
}

View File

@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/sync"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
)
@ -57,10 +58,11 @@ func snapshotToStdout(c *clientv3.Client) {
if len(wr.Events) > 0 {
wr.CompactRevision = 1
}
if rev := snapshot(os.Stdout, c, wr.CompactRevision); rev != 0 {
if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 {
err := fmt.Errorf("snapshot interrupted by compaction %v", rev)
ExitWithError(ExitInterrupted, err)
}
os.Stdout.Sync()
}
// snapshotToFile atomically writes a snapshot to a file
@ -88,13 +90,29 @@ func snapshotToFile(c *clientv3.Client, path string) {
// snapshot reads all of a watcher; returns compaction revision if incomplete
// TODO: stabilize snapshot format
func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
wapi := clientv3.NewWatcher(c)
defer wapi.Close()
s := sync.NewSyncer(c, "", rev)
// get all events since revision (or get non-compacted revision, if
// rev is too far behind)
wch := wapi.WatchPrefix(context.TODO(), "", rev)
for wr := range wch {
rc, errc := s.SyncBase(context.TODO())
for r := range rc {
for _, kv := range r.Kvs {
fmt.Fprintln(w, kv)
}
}
err := <-errc
if err != nil {
if err == v3rpc.ErrCompacted {
// will get correct compact revision on retry
return rev + 1
}
// failed for some unknown reason, retry on same revision
return rev
}
wc := s.SyncUpdates(context.TODO())
for wr := range wc {
if len(wr.Events) == 0 {
return wr.CompactRevision
}
@ -107,32 +125,5 @@ func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
}
}
// get base state at rev
kapi := clientv3.NewKV(c)
key := "\x00"
for {
kvs, err := kapi.Get(
context.TODO(),
key,
clientv3.WithFromKey(),
clientv3.WithRev(rev+1),
clientv3.WithLimit(1000))
if err == v3rpc.ErrCompacted {
// will get correct compact revision on retry
return rev + 1
} else if err != nil {
// failed for some unknown reason, retry on same revision
return rev
}
for _, kv := range kvs.Kvs {
fmt.Fprintln(w, kv)
}
if !kvs.More {
break
}
// move to next key
key = string(append(kvs.Kvs[len(kvs.Kvs)-1].Key, 0))
}
return 0
}