diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 66bec68c1..0fbf7d72a 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -311,9 +311,11 @@ func TestKVCompact(t *testing.T) { defer wc.Close() wchan := wc.Watch(ctx, "foo", 3) - _, ok := <-wchan - if ok { - t.Fatalf("wchan ok got %v, want false", ok) + if wr := <-wchan; wr.CompactRevision != 7 { + t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision) + } + if wr, ok := <-wchan; ok { + t.Fatalf("wchan got %v, expected closed", wr) } err = kv.Compact(ctx, 1000) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index d33723cf5..dcaa0619f 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -35,7 +35,7 @@ type watchctx struct { w clientv3.Watcher wclient *clientv3.Client kv clientv3.KV - ch <-chan clientv3.WatchResponse + ch clientv3.WatchChan } func runWatchTest(t *testing.T, f watcherTest) { diff --git a/clientv3/kv.go b/clientv3/kv.go index 9efc2dc23..0c8cf4e77 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -38,8 +38,8 @@ type KV interface { // Get retrieves keys. // By default, Get will return the value for "key", if any. - // When passed WithRange(end), Get will return the keys in the range [key, end) if - // end is non-empty, otherwise it returns keys greater than or equal to key. + // When passed WithRange(end), Get will return the keys in the range [key, end). + // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. diff --git a/clientv3/op.go b/clientv3/op.go index f3458e1d9..616f86438 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -132,6 +132,7 @@ func WithSort(tgt SortTarget, order SortOrder) OpOption { func WithRange(endKey string) OpOption { return func(op *Op) { op.end = []byte(endKey) } } +func WithFromKey() OpOption { return WithRange("\x00") } func WithSerializable() OpOption { return func(op *Op) { op.serializable = true } } diff --git a/clientv3/watch.go b/clientv3/watch.go index 79b78a433..46960b4a8 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -24,18 +24,20 @@ import ( storagepb "github.com/coreos/etcd/storage/storagepb" ) +type WatchChan <-chan WatchResponse + type Watcher interface { // Watch watches on a single key. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. - Watch(cxt context.Context, key string, rev int64) <-chan WatchResponse + Watch(ctx context.Context, key string, rev int64) WatchChan - // Watch watches on a prefix. The watched events will be returned + // WatchPrefix watches on a prefix. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. - WatchPrefix(cxt context.Context, prefix string, rev int64) <-chan WatchResponse + WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan // Close closes the watcher and cancels all watch requests. Close() error @@ -44,6 +46,9 @@ type Watcher interface { type WatchResponse struct { Header pb.ResponseHeader Events []*storagepb.Event + // CompactRevision is set to the compaction revision that + // caused the watcher to cancel. + CompactRevision int64 } // watcher implements the Watcher interface @@ -122,11 +127,11 @@ func NewWatcher(c *Client) Watcher { return w } -func (w *watcher) Watch(ctx context.Context, key string, rev int64) <-chan WatchResponse { +func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan { return w.watch(ctx, key, "", rev) } -func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) <-chan WatchResponse { +func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan { return w.watch(ctx, "", prefix, rev) } @@ -140,7 +145,7 @@ func (w *watcher) Close() error { } // watch posts a watch request to run() and waits for a new watcher channel -func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) <-chan WatchResponse { +func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan { retc := make(chan chan WatchResponse, 1) wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc} // submit request @@ -166,7 +171,18 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { if pendingReq == nil { // no pending request; ignore return - } else if resp.WatchId == -1 || resp.Compacted { + } + if resp.CompactRevision != 0 { + // compaction after start revision + ret := make(chan WatchResponse, 1) + ret <- WatchResponse{ + Header: *resp.Header, + CompactRevision: resp.CompactRevision} + close(ret) + pendingReq.retc <- ret + return + } + if resp.WatchId == -1 { // failed; no channel pendingReq.retc <- nil return @@ -238,12 +254,6 @@ func (w *watcher) run() { switch { case pbresp.Canceled: delete(cancelSet, pbresp.WatchId) - case pbresp.Compacted: - w.mu.Lock() - if ws, ok := w.streams[pbresp.WatchId]; ok { - w.closeStream(ws) - } - w.mu.Unlock() case pbresp.Created: // response to pending req, try to add w.addStream(pbresp, pendingReq) @@ -305,7 +315,10 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { defer w.mu.RUnlock() ws, ok := w.streams[pbresp.WatchId] if ok { - wr := &WatchResponse{*pbresp.Header, pbresp.Events} + wr := &WatchResponse{ + Header: *pbresp.Header, + Events: pbresp.Events, + CompactRevision: pbresp.CompactRevision} ws.recvc <- wr } return ok @@ -346,6 +359,11 @@ func (w *watcher) serveStream(ws *watcherStream) { } select { case outc <- *curWr: + if len(wrs[0].Events) == 0 { + // compaction message + closing = true + break + } newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision if newRev != ws.lastRev { ws.lastRev = newRev diff --git a/etcdctlv3/command/error.go b/etcdctlv3/command/error.go index 61b6f3e00..264332928 100644 --- a/etcdctlv3/command/error.go +++ b/etcdctlv3/command/error.go @@ -28,7 +28,9 @@ const ( ExitBadConnection ExitInvalidInput // for txn, watch command ExitBadFeature // provided a valid flag with an unsupported value - ExitBadArgs = 128 + ExitInterrupted + ExitIO + ExitBadArgs = 128 ) func ExitWithError(code int, err error) { diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go new file mode 100644 index 000000000..27ab3610b --- /dev/null +++ b/etcdctlv3/command/snapshot_command.go @@ -0,0 +1,138 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "io" + "os" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" +) + +// NewSnapshotCommand returns the cobra command for "snapshot". +func NewSnapshotCommand() *cobra.Command { + return &cobra.Command{ + Use: "snapshot [filename]", + Short: "Snapshot streams a point-in-time snapshot of the store", + Run: snapshotCommandFunc, + } +} + +// snapshotCommandFunc watches for the length of the entire store and records +// to a file. +func snapshotCommandFunc(cmd *cobra.Command, args []string) { + switch { + case len(args) == 0: + snapshotToStdout(mustClient(cmd)) + case len(args) == 1: + snapshotToFile(mustClient(cmd), args[0]) + default: + err := fmt.Errorf("snapshot takes at most one argument") + ExitWithError(ExitBadArgs, err) + } +} + +// snapshotToStdout streams a snapshot over stdout +func snapshotToStdout(c *clientv3.Client) { + // must explicitly fetch first revision since no retry on stdout + wapi := clientv3.NewWatcher(c) + defer wapi.Close() + wr := <-wapi.WatchPrefix(context.TODO(), "", 1) + if len(wr.Events) > 0 { + wr.CompactRevision = 1 + } + if rev := snapshot(os.Stdout, c, wr.CompactRevision); rev != 0 { + err := fmt.Errorf("snapshot interrupted by compaction %v", rev) + ExitWithError(ExitInterrupted, err) + } +} + +// snapshotToFile atomically writes a snapshot to a file +func snapshotToFile(c *clientv3.Client, path string) { + partpath := path + ".part" + f, err := os.Create(partpath) + defer f.Close() + if err != nil { + exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) + ExitWithError(ExitBadArgs, exiterr) + } + rev := int64(1) + for rev != 0 { + f.Seek(0, 0) + f.Truncate(0) + rev = snapshot(f, c, rev) + } + f.Sync() + if err := os.Rename(partpath, path); err != nil { + exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, err) + ExitWithError(ExitIO, exiterr) + } +} + +// snapshot reads all of a watcher; returns compaction revision if incomplete +// TODO: stabilize snapshot format +func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { + wapi := clientv3.NewWatcher(c) + defer wapi.Close() + + // get all events since revision (or get non-compacted revision, if + // rev is too far behind) + wch := wapi.WatchPrefix(context.TODO(), "", rev) + for wr := range wch { + if len(wr.Events) == 0 { + return wr.CompactRevision + } + for _, ev := range wr.Events { + fmt.Fprintln(w, ev) + } + rev := wr.Events[len(wr.Events)-1].Kv.ModRevision + if rev >= wr.Header.Revision { + break + } + } + + // get base state at rev + kapi := clientv3.NewKV(c) + key := "\x00" + for { + kvs, err := kapi.Get( + context.TODO(), + key, + clientv3.WithFromKey(), + clientv3.WithRev(rev+1), + clientv3.WithLimit(1000)) + if err == v3rpc.ErrCompacted { + // will get correct compact revision on retry + return rev + 1 + } else if err != nil { + // failed for some unknown reason, retry on same revision + return rev + } + for _, kv := range kvs.Kvs { + fmt.Fprintln(w, kv) + } + if !kvs.More { + break + } + // move to next key + key = string(append(kvs.Kvs[len(kvs.Kvs)-1].Key, 0)) + } + + return 0 +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 59de85273..2fc662aa2 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -57,6 +57,7 @@ func init() { command.NewVersionCommand(), command.NewLeaseCommand(), command.NewMemberCommand(), + command.NewSnapshotCommand(), ) } diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index df70f556a..408590993 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -146,10 +146,10 @@ func (sws *serverWatchStream) sendLoop() { } err := sws.gRPCStream.Send(&pb.WatchResponse{ - Header: sws.newResponseHeader(wresp.Revision), - WatchId: int64(wresp.WatchID), - Events: events, - Compacted: wresp.Compacted, + Header: sws.newResponseHeader(wresp.Revision), + WatchId: int64(wresp.WatchID), + Events: events, + CompactRevision: wresp.CompactRevision, }) storage.ReportEventReceived() if err != nil { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 2c2cff20e..f271abcf5 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -890,15 +890,16 @@ type WatchResponse struct { // If the response is for a cancel watch request, cancel is set to true. // No further events will be sent to the canceled watching. Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"` - // If a watching tries to watch at a compacted index, compacted will be set to true. + // CompactRevision is set to the minimum index if a watching tries to watch + // at a compacted index. // // This happens when creating a watching at a compacted revision or the watching cannot // catch up with the progress of the KV. // // Client should treat the watching as canceled and should not try to create any // watching with same start_revision again. - Compacted bool `protobuf:"varint,5,opt,name=compacted,proto3" json:"compacted,omitempty"` - Events []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"` + CompactRevision int64 `protobuf:"varint,5,opt,name=compact_revision,proto3" json:"compact_revision,omitempty"` + Events []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"` } func (m *WatchResponse) Reset() { *m = WatchResponse{} } @@ -2651,15 +2652,10 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { } i++ } - if m.Compacted { + if m.CompactRevision != 0 { data[i] = 0x28 i++ - if m.Compacted { - data[i] = 1 - } else { - data[i] = 0 - } - i++ + i = encodeVarintRpc(data, i, uint64(m.CompactRevision)) } if len(m.Events) > 0 { for _, msg := range m.Events { @@ -3601,8 +3597,8 @@ func (m *WatchResponse) Size() (n int) { if m.Canceled { n += 2 } - if m.Compacted { - n += 2 + if m.CompactRevision != 0 { + n += 1 + sovRpc(uint64(m.CompactRevision)) } if len(m.Events) > 0 { for _, e := range m.Events { @@ -6185,9 +6181,9 @@ func (m *WatchResponse) Unmarshal(data []byte) error { m.Canceled = bool(v != 0) case 5: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Compacted", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field CompactRevision", wireType) } - var v int + m.CompactRevision = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRpc @@ -6197,12 +6193,11 @@ func (m *WatchResponse) Unmarshal(data []byte) error { } b := data[iNdEx] iNdEx++ - v |= (int(b) & 0x7F) << shift + m.CompactRevision |= (int64(b) & 0x7F) << shift if b < 0x80 { break } } - m.Compacted = bool(v != 0) case 11: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 55fa1df58..dd0f06ada 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -282,14 +282,15 @@ message WatchResponse { // If the response is for a cancel watch request, cancel is set to true. // No further events will be sent to the canceled watching. bool canceled = 4; - // If a watching tries to watch at a compacted index, compacted will be set to true. + // CompactRevision is set to the minimum index if a watching tries to watch + // at a compacted index. // // This happens when creating a watching at a compacted revision or the watching cannot // catch up with the progress of the KV. // // Client should treat the watching as canceled and should not try to create any // watching with same start_revision again. - bool compacted = 5; + int64 compact_revision = 5; repeated storagepb.Event events = 11; } diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index b52d0010d..ed434f66b 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -311,6 +311,12 @@ func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeRespo err error ) + // grpc sends empty byte strings as nils, so use a '\0' to indicate + // wanting a >= query + if len(r.RangeEnd) == 1 && r.RangeEnd[0] == 0 { + r.RangeEnd = []byte{} + } + limit := r.Limit if r.SortOrder != pb.RangeRequest_NONE { // fetch everything; sort and truncate afterwards diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 2142a1a85..90efa5ca3 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -488,6 +488,8 @@ func TestV3RangeRequest(t *testing.T) { {Key: []byte("c"), RangeEnd: []byte("c")}, // [d, b) = empty {Key: []byte("d"), RangeEnd: []byte("b")}, + // ["\0", "\0") => all in range + {Key: []byte{0}, RangeEnd: []byte{0}}, }, [][]string{ @@ -496,8 +498,9 @@ func TestV3RangeRequest(t *testing.T) { {}, {}, {}, + {"a", "b", "c", "d", "e"}, }, - []bool{false, false, false, false, false}, + []bool{false, false, false, false, false, false}, }, // revision { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 6816c9fab..28729e453 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -300,7 +300,7 @@ func (s *watchableStore) syncWatchers() { if w.cur < compactionRev { select { - case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}: + case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactionRev}: s.unsynced.delete(w) default: // retry next time diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index db61b3e7b..f3988683e 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -247,8 +247,8 @@ func TestWatchCompacted(t *testing.T) { if resp.WatchID != wt { t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt) } - if resp.Compacted != true { - t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true) + if resp.CompactRevision == 0 { + t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, compactRev) } case <-time.After(1 * time.Second): t.Fatalf("failed to receive response (timeout)") diff --git a/storage/watcher.go b/storage/watcher.go index 0f099a84a..a29e36490 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -68,8 +68,8 @@ type WatchResponse struct { // inside Events. Revision int64 - // Compacted is set when the watcher is cancelled due to compaction. - Compacted bool + // CompactRevision is set when the watcher is cancelled due to compaction. + CompactRevision int64 } // watchStream contains a collection of watchers that share