diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index c4cebe7cd..03c541837 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -139,7 +139,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { for kv == nil { wr, ok := <-wch - if !ok || len(wr.Events) == 0 { + if !ok || wr.Err() != nil { cancel() return } diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index 0a1a93078..2249854f7 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -20,7 +20,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" v3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/storage/storagepb" ) @@ -52,10 +51,7 @@ func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.O if !ok { return ctx.Err() } - if len(wresp.Events) == 0 { - return v3rpc.ErrCompacted - } - return nil + return wresp.Err() } func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 32a6b6896..8c71028a0 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" storagepb "github.com/coreos/etcd/storage/storagepb" @@ -347,8 +348,8 @@ func TestWatchInvalidFutureRevision(t *testing.T) { if !ok { t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok) } - if !wresp.Canceled { - t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled) + if wresp.Err() != v3rpc.ErrFutureRev { + t.Fatalf("wresp.Err() expected ErrFutureRev, but got %v", wresp.Err()) } _, ok = <-rch // ensure the channel is closed @@ -356,3 +357,42 @@ func TestWatchInvalidFutureRevision(t *testing.T) { t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok) } } + +// TestWatchCompactRevision ensures the CompactRevision error is given on a +// compaction event ahead of a watcher. +func TestWatchCompactRevision(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // set some keys + kv := clientv3.NewKV(clus.RandClient()) + for i := 0; i < 5; i++ { + if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil { + t.Fatal(err) + } + } + + w := clientv3.NewWatcher(clus.RandClient()) + defer w.Close() + + if err := kv.Compact(context.TODO(), 4); err != nil { + t.Fatal(err) + } + wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2)) + + // get compacted error message + wresp, ok := <-wch + if !ok { + t.Fatalf("expected wresp, but got closed channel") + } + if wresp.Err() != v3rpc.ErrCompacted { + t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err()) + } + + // ensure the channel is closed + if wresp, ok = <-wch; ok { + t.Fatalf("expected closed channel, but got %v", wresp) + } +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 63a52aa2d..a1896b6ff 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -20,6 +20,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" storagepb "github.com/coreos/etcd/storage/storagepb" ) @@ -41,14 +42,25 @@ type Watcher interface { type WatchResponse struct { Header pb.ResponseHeader Events []*storagepb.Event - // CompactRevision is set to the compaction revision that - // caused the watcher to cancel. + + // CompactRevision is the minimum revision the watcher may receive. CompactRevision int64 - // Canceled is 'true' when it has received wrong watch start revision. + // Canceled is set to indicate the channel is about to close. Canceled bool } +// Err is the error value if this WatchResponse holds an error. +func (wr *WatchResponse) Err() error { + if wr.CompactRevision != 0 { + return v3rpc.ErrCompacted + } + if wr.Canceled { + return v3rpc.ErrFutureRev + } + return nil +} + // watcher implements the Watcher interface type watcher struct { c *Client @@ -179,12 +191,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { return } if resp.Canceled || resp.CompactRevision != 0 { - // compaction after start revision + // a cancel at id creation time means the start revision has + // been compacted out of the store ret := make(chan WatchResponse, 1) ret <- WatchResponse{ Header: *resp.Header, CompactRevision: resp.CompactRevision, - Canceled: resp.Canceled} + Canceled: true} close(ret) pendingReq.retc <- ret return @@ -375,8 +388,7 @@ func (w *watcher) serveStream(ws *watcherStream) { } select { case outc <- *curWr: - if len(wrs[0].Events) == 0 { - // compaction message + if wrs[0].Err() != nil { closing = true break } diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 7931d8adb..7c82eb785 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -53,7 +53,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) { func snapshotToStdout(c *clientv3.Client) { // must explicitly fetch first revision since no retry on stdout wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) - if len(wr.Events) > 0 { + if wr.Err() == nil { wr.CompactRevision = 1 } if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 { @@ -111,7 +111,7 @@ func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { wc := s.SyncUpdates(context.TODO()) for wr := range wc { - if len(wr.Events) == 0 { + if wr.Err() != nil { return wr.CompactRevision } for _, ev := range wr.Events {