mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Fix progress notification for watch that doesn't get any events
When implementing the fix for progress notifications (https://github.com/etcd-io/etcd/pull/15237) we made a incorrect assumption that that unsynched watches will always get at least one event. Unsynched watches include not only slow watchers, but also newly created watches that requested current or older revision. In case that non of the events match watch filter, those newly created watches might become synched without any event going through. Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
358e3bae75
commit
9399dd1628
@ -145,10 +145,6 @@ type serverWatchStream struct {
|
||||
// records fragmented watch IDs
|
||||
fragment map[mvcc.WatchID]bool
|
||||
|
||||
// indicates whether we have an outstanding global progress
|
||||
// notification to send
|
||||
deferredProgress bool
|
||||
|
||||
// closec indicates the stream is closed.
|
||||
closec chan struct{}
|
||||
|
||||
@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
prevKV: make(map[mvcc.WatchID]bool),
|
||||
fragment: make(map[mvcc.WatchID]bool),
|
||||
|
||||
deferredProgress: false,
|
||||
|
||||
closec: make(chan struct{}),
|
||||
}
|
||||
|
||||
@ -375,14 +369,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
case *pb.WatchRequest_ProgressRequest:
|
||||
if uv.ProgressRequest != nil {
|
||||
sws.mu.Lock()
|
||||
// Ignore if deferred progress notification is already in progress
|
||||
if !sws.deferredProgress {
|
||||
// Request progress for all watchers,
|
||||
// force generation of a response
|
||||
if !sws.watchStream.RequestProgressAll() {
|
||||
sws.deferredProgress = true
|
||||
}
|
||||
}
|
||||
sws.watchStream.RequestProgressAll()
|
||||
sws.mu.Unlock()
|
||||
}
|
||||
default:
|
||||
@ -498,11 +485,6 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
// elide next progress update if sent a key update
|
||||
sws.progress[wresp.WatchID] = false
|
||||
}
|
||||
if sws.deferredProgress {
|
||||
if sws.watchStream.RequestProgressAll() {
|
||||
sws.deferredProgress = false
|
||||
}
|
||||
}
|
||||
sws.mu.Unlock()
|
||||
|
||||
case c, ok := <-sws.ctrlStream:
|
||||
|
@ -1391,8 +1391,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
||||
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
|
||||
|
||||
// Immediately request a progress notification. As the client
|
||||
// is unsynchronised, the server will have to defer the
|
||||
// notification internally.
|
||||
// is unsynchronised, the server will not sent any notification,
|
||||
//as client can infer progress from events.
|
||||
err := client.RequestProgress(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1412,8 +1412,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
||||
}
|
||||
event_count += len(wr.Events)
|
||||
}
|
||||
|
||||
// ... followed by the requested progress notification
|
||||
// client needs to request progress notification again
|
||||
err = client.RequestProgress(ctx)
|
||||
require.NoError(t, err)
|
||||
wr2 := <-wch
|
||||
if wr2.Err() != nil {
|
||||
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
|
||||
@ -1425,3 +1426,46 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
||||
t.Fatal("Wrong revision in progress notification!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
|
||||
if ThroughProxy {
|
||||
t.Skip("grpc proxy currently does not support requesting progress notifications")
|
||||
}
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
client := clus.RandClient()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.Put(ctx, "bar", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision))
|
||||
// Request the progress notification on newly created watch that was not yet synced.
|
||||
err = client.RequestProgress(ctx)
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
require.NoError(t, err)
|
||||
gotProgressNotification := false
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := client.RequestProgress(ctx)
|
||||
require.NoError(t, err)
|
||||
case resp := <-wch:
|
||||
if resp.Err() != nil {
|
||||
t.Fatal(fmt.Errorf("watch error: %w", resp.Err()))
|
||||
}
|
||||
if resp.IsProgressNotify() {
|
||||
gotProgressNotification = true
|
||||
}
|
||||
}
|
||||
if gotProgressNotification {
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, gotProgressNotification, "Expected to get progress notification")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user