mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17557 from serathius/progressrequest-new-watch
Fix progress notification for watch that doesn't get any events
This commit is contained in:
commit
ddf54715bf
@ -145,10 +145,6 @@ type serverWatchStream struct {
|
|||||||
// records fragmented watch IDs
|
// records fragmented watch IDs
|
||||||
fragment map[mvcc.WatchID]bool
|
fragment map[mvcc.WatchID]bool
|
||||||
|
|
||||||
// indicates whether we have an outstanding global progress
|
|
||||||
// notification to send
|
|
||||||
deferredProgress bool
|
|
||||||
|
|
||||||
// closec indicates the stream is closed.
|
// closec indicates the stream is closed.
|
||||||
closec chan struct{}
|
closec chan struct{}
|
||||||
|
|
||||||
@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|||||||
prevKV: make(map[mvcc.WatchID]bool),
|
prevKV: make(map[mvcc.WatchID]bool),
|
||||||
fragment: make(map[mvcc.WatchID]bool),
|
fragment: make(map[mvcc.WatchID]bool),
|
||||||
|
|
||||||
deferredProgress: false,
|
|
||||||
|
|
||||||
closec: make(chan struct{}),
|
closec: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -367,14 +361,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
case *pb.WatchRequest_ProgressRequest:
|
case *pb.WatchRequest_ProgressRequest:
|
||||||
if uv.ProgressRequest != nil {
|
if uv.ProgressRequest != nil {
|
||||||
sws.mu.Lock()
|
sws.mu.Lock()
|
||||||
// Ignore if deferred progress notification is already in progress
|
sws.watchStream.RequestProgressAll()
|
||||||
if !sws.deferredProgress {
|
|
||||||
// Request progress for all watchers,
|
|
||||||
// force generation of a response
|
|
||||||
if !sws.watchStream.RequestProgressAll() {
|
|
||||||
sws.deferredProgress = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sws.mu.Unlock()
|
sws.mu.Unlock()
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -483,11 +470,6 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
// elide next progress update if sent a key update
|
// elide next progress update if sent a key update
|
||||||
sws.progress[wresp.WatchID] = false
|
sws.progress[wresp.WatchID] = false
|
||||||
}
|
}
|
||||||
if sws.deferredProgress {
|
|
||||||
if sws.watchStream.RequestProgressAll() {
|
|
||||||
sws.deferredProgress = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sws.mu.Unlock()
|
sws.mu.Unlock()
|
||||||
|
|
||||||
case c, ok := <-sws.ctrlStream:
|
case c, ok := <-sws.ctrlStream:
|
||||||
|
@ -1433,8 +1433,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
|||||||
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
|
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
|
||||||
|
|
||||||
// Immediately request a progress notification. As the client
|
// Immediately request a progress notification. As the client
|
||||||
// is unsynchronised, the server will have to defer the
|
// is unsynchronised, the server will not sent any notification,
|
||||||
// notification internally.
|
//as client can infer progress from events.
|
||||||
err := client.RequestProgress(ctx)
|
err := client.RequestProgress(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -1454,8 +1454,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
event_count += len(wr.Events)
|
event_count += len(wr.Events)
|
||||||
}
|
}
|
||||||
|
// client needs to request progress notification again
|
||||||
// ... followed by the requested progress notification
|
err = client.RequestProgress(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
wr2 := <-wch
|
wr2 := <-wch
|
||||||
if wr2.Err() != nil {
|
if wr2.Err() != nil {
|
||||||
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
|
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
|
||||||
@ -1467,3 +1468,47 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
|||||||
t.Fatal("Wrong revision in progress notification!")
|
t.Fatal("Wrong revision in progress notification!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
|
||||||
|
if integration.ThroughProxy {
|
||||||
|
t.Skip("grpc proxy currently does not support requesting progress notifications")
|
||||||
|
}
|
||||||
|
integration.BeforeTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
client := clus.RandClient()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, err := client.Put(ctx, "bar", "1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision))
|
||||||
|
// Request the progress notification on newly created watch that was not yet synced.
|
||||||
|
err = client.RequestProgress(ctx)
|
||||||
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
gotProgressNotification := false
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
err := client.RequestProgress(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
case resp := <-wch:
|
||||||
|
if resp.Err() != nil {
|
||||||
|
t.Fatal(fmt.Errorf("watch error: %w", resp.Err()))
|
||||||
|
}
|
||||||
|
if resp.IsProgressNotify() {
|
||||||
|
gotProgressNotification = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if gotProgressNotification {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.True(t, gotProgressNotification, "Expected to get progress notification")
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user