mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4703 from xiang90/future_watch
*: support watch from future revision
This commit is contained in:
commit
e9be0415fd
@ -119,27 +119,20 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
// support >= key queries
|
||||
creq.RangeEnd = []byte{}
|
||||
}
|
||||
|
||||
rev := creq.StartRevision
|
||||
wsrev := sws.watchStream.Rev()
|
||||
futureRev := rev > wsrev
|
||||
rev := creq.StartRevision
|
||||
if rev == 0 {
|
||||
// rev 0 watches past the current revision
|
||||
rev = wsrev + 1
|
||||
}
|
||||
// do not allow future watch revision
|
||||
id := storage.WatchID(-1)
|
||||
if !futureRev {
|
||||
id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||
if creq.ProgressNotify {
|
||||
sws.progress[id] = true
|
||||
}
|
||||
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||
if id != -1 && creq.ProgressNotify {
|
||||
sws.progress[id] = true
|
||||
}
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(wsrev),
|
||||
WatchId: int64(id),
|
||||
Created: true,
|
||||
Canceled: futureRev,
|
||||
Canceled: id == -1,
|
||||
}
|
||||
case *pb.WatchRequest_CancelRequest:
|
||||
if uv.CancelRequest != nil {
|
||||
|
@ -286,6 +286,70 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchFutureRevision tests Watch APIs from a future revision.
|
||||
func TestV3WatchFutureRevision(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
wAPI := toGRPC(clus.RandClient()).Watch
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, err := wAPI.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", err)
|
||||
}
|
||||
|
||||
wkey := []byte("foo")
|
||||
wrev := int64(10)
|
||||
req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: wkey, StartRevision: wrev}}}
|
||||
err = wStream.Send(req)
|
||||
if err != nil {
|
||||
t.Fatalf("wStream.Send error: %v", err)
|
||||
}
|
||||
|
||||
// ensure watcher request created a new watcher
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if !cresp.Created {
|
||||
t.Fatal("create = %v, want %v", cresp.Created, true)
|
||||
}
|
||||
|
||||
// asynchronously create keys
|
||||
go func() {
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
for {
|
||||
req := &pb.PutRequest{Key: wkey, Value: []byte("bar")}
|
||||
resp, rerr := kvc.Put(context.TODO(), req)
|
||||
if rerr != nil {
|
||||
t.Fatalf("couldn't put key (%v)", rerr)
|
||||
}
|
||||
if resp.Header.Revision == wrev {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// ensure watcher request created a new watcher
|
||||
cresp, err = wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if cresp.Header.Revision != wrev {
|
||||
t.Fatalf("revision = %d, want %d", cresp.Header.Revision, wrev)
|
||||
}
|
||||
if len(cresp.Events) != 1 {
|
||||
t.Fatalf("failed to receive events")
|
||||
}
|
||||
if cresp.Events[0].Kv.ModRevision != wrev {
|
||||
t.Errorf("mod revision = %d, want %d", cresp.Events[0].Kv.ModRevision, wrev)
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
|
||||
func TestV3WatchCancelSynced(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
@ -859,36 +923,6 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TestV3WatchFutureRevision ensures invalid future revision to Watch APIs
|
||||
// returns WatchResponse of true Created and true Canceled.
|
||||
func TestV3WatchInvalidFutureRevision(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||
if wErr != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||
}
|
||||
|
||||
wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 100}}}
|
||||
if err := wStream.Send(wreq); err != nil {
|
||||
t.Fatalf("watch request failed (%v)", err)
|
||||
}
|
||||
|
||||
resp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("wStream.Recv error: %v", err)
|
||||
}
|
||||
if resp.WatchId != -1 || !resp.Created || !resp.Canceled || len(resp.Events) != 0 {
|
||||
t.Errorf("invalid start-rev expected -1, true, true, 0, but got %d, %v, %v, %d",
|
||||
resp.WatchId, resp.Created, resp.Canceled, len(resp.Events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchWithProgressNotify(t *testing.T) {
|
||||
testInterval := 3 * time.Second
|
||||
pi := v3rpc.ProgressReportInterval
|
||||
|
@ -189,12 +189,12 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
|
||||
synced := startRev > s.store.currentRev.main || startRev == 0
|
||||
if synced {
|
||||
wa.cur = s.store.currentRev.main + 1
|
||||
if startRev > wa.cur {
|
||||
wa.cur = startRev
|
||||
}
|
||||
}
|
||||
s.store.mu.Unlock()
|
||||
if synced {
|
||||
if startRev > wa.cur {
|
||||
panic("can't watch past sync revision")
|
||||
}
|
||||
s.synced.add(wa)
|
||||
} else {
|
||||
slowWatcherGauge.Inc()
|
||||
@ -368,9 +368,11 @@ type watcher struct {
|
||||
// end indicates the end of the range to watch.
|
||||
// If end is set, the watcher is on a range.
|
||||
end []byte
|
||||
// cur is the current watcher revision.
|
||||
// If cur is behind the current revision of the KV,
|
||||
// watcher is unsynced and needs to catch up.
|
||||
|
||||
// cur is the current watcher revision of a unsynced watcher.
|
||||
// cur will be updated for unsynced watcher while it is catching up.
|
||||
// cur is startRev of a synced watcher.
|
||||
// cur will not be updated for synced watcher.
|
||||
cur int64
|
||||
id WatchID
|
||||
|
||||
|
@ -255,6 +255,45 @@ func TestWatchCompacted(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchFutureRev(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{})
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
|
||||
w := s.NewWatchStream()
|
||||
wrev := int64(10)
|
||||
w.Watch(testKey, nil, wrev)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||
if rev >= wrev {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if resp.Revision != wrev {
|
||||
t.Fatalf("rev = %d, want %d", resp.Revision, wrev)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("failed to get events from the response")
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != wrev {
|
||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, wrev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
Loading…
x
Reference in New Issue
Block a user