mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15697 from ahrtr/3.4_request_progress_20230411
[3.4] etcdserver: guarantee order of requested progress notification
This commit is contained in:
commit
1d759fc8bd
@ -145,6 +145,10 @@ type serverWatchStream struct {
|
||||
// records fragmented watch IDs
|
||||
fragment map[mvcc.WatchID]bool
|
||||
|
||||
// indicates whether we have an outstanding global progress
|
||||
// notification to send
|
||||
deferredProgress bool
|
||||
|
||||
// closec indicates the stream is closed.
|
||||
closec chan struct{}
|
||||
|
||||
@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
prevKV: make(map[mvcc.WatchID]bool),
|
||||
fragment: make(map[mvcc.WatchID]bool),
|
||||
|
||||
deferredProgress: false,
|
||||
|
||||
closec: make(chan struct{}),
|
||||
}
|
||||
|
||||
@ -368,10 +374,16 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
}
|
||||
case *pb.WatchRequest_ProgressRequest:
|
||||
if uv.ProgressRequest != nil {
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
|
||||
sws.mu.Lock()
|
||||
// Ignore if deferred progress notification is already in progress
|
||||
if !sws.deferredProgress {
|
||||
// Request progress for all watchers,
|
||||
// force generation of a response
|
||||
if !sws.watchStream.RequestProgressAll() {
|
||||
sws.deferredProgress = true
|
||||
}
|
||||
}
|
||||
sws.mu.Unlock()
|
||||
}
|
||||
default:
|
||||
// we probably should not shutdown the entire stream when
|
||||
@ -439,11 +451,15 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
Canceled: canceled,
|
||||
}
|
||||
|
||||
if _, okID := ids[wresp.WatchID]; !okID {
|
||||
// buffer if id not yet announced
|
||||
wrs := append(pending[wresp.WatchID], wr)
|
||||
pending[wresp.WatchID] = wrs
|
||||
continue
|
||||
// Progress notifications can have WatchID -1
|
||||
// if they announce on behalf of multiple watchers
|
||||
if wresp.WatchID != clientv3.InvalidWatchID {
|
||||
if _, okID := ids[wresp.WatchID]; !okID {
|
||||
// buffer if id not yet announced
|
||||
wrs := append(pending[wresp.WatchID], wr)
|
||||
pending[wresp.WatchID] = wrs
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
mvcc.ReportEventReceived(len(evs))
|
||||
@ -482,6 +498,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
// elide next progress update if sent a key update
|
||||
sws.progress[wresp.WatchID] = false
|
||||
}
|
||||
if sws.deferredProgress {
|
||||
if sws.watchStream.RequestProgressAll() {
|
||||
sws.deferredProgress = false
|
||||
}
|
||||
}
|
||||
sws.mu.Unlock()
|
||||
|
||||
case c, ok := <-sws.ctrlStream:
|
||||
|
@ -1360,3 +1360,68 @@ func TestV3WatchCancellation(t *testing.T) {
|
||||
t.Fatalf("expected %s watch, got %s", expected, minWatches)
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchProgressWaitsForSync checks that progress notifications
|
||||
// don't get sent until the watcher is synchronised
|
||||
func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
||||
// Disable for gRPC proxy, as it does not support requesting
|
||||
// progress notifications
|
||||
if ThroughProxy {
|
||||
t.Skip("grpc proxy currently does not support requesting progress notifications")
|
||||
}
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
client := clus.RandClient()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Write a couple values into key to make sure there's a
|
||||
// non-trivial amount of history.
|
||||
count := 1001
|
||||
t.Logf("Writing key 'foo' %d times", count)
|
||||
for i := 0; i < count; i++ {
|
||||
_, err := client.Put(ctx, "foo", fmt.Sprintf("bar%d", i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Create watch channel starting at revision 1 (i.e. it starts
|
||||
// unsynced because of the update above)
|
||||
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
|
||||
|
||||
// Immediately request a progress notification. As the client
|
||||
// is unsynchronised, the server will have to defer the
|
||||
// notification internally.
|
||||
err := client.RequestProgress(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that we get the watch responses first. Note that
|
||||
// events might be spread across multiple packets.
|
||||
var event_count = 0
|
||||
for event_count < count {
|
||||
wr := <-wch
|
||||
if wr.Err() != nil {
|
||||
t.Fatal(fmt.Errorf("watch error: %w", wr.Err()))
|
||||
}
|
||||
if wr.IsProgressNotify() {
|
||||
t.Fatal("Progress notification from unsynced client!")
|
||||
}
|
||||
if wr.Header.Revision != int64(count+1) {
|
||||
t.Fatal("Incomplete watch response!")
|
||||
}
|
||||
event_count += len(wr.Events)
|
||||
}
|
||||
|
||||
// ... followed by the requested progress notification
|
||||
wr2 := <-wch
|
||||
if wr2.Err() != nil {
|
||||
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
|
||||
}
|
||||
if !wr2.IsProgressNotify() {
|
||||
t.Fatal("Did not receive progress notification!")
|
||||
}
|
||||
if wr2.Header.Revision != int64(count+1) {
|
||||
t.Fatal("Wrong revision in progress notification!")
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package mvcc
|
||||
|
||||
import (
|
||||
"go.etcd.io/etcd/auth"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -40,6 +41,7 @@ var (
|
||||
type watchable interface {
|
||||
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
|
||||
progress(w *watcher)
|
||||
progressAll(watchers map[WatchID]*watcher) bool
|
||||
rev() int64
|
||||
}
|
||||
|
||||
@ -489,14 +491,34 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
|
||||
func (s *watchableStore) rev() int64 { return s.store.Rev() }
|
||||
|
||||
func (s *watchableStore) progress(w *watcher) {
|
||||
s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
|
||||
}
|
||||
|
||||
func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
|
||||
return s.progressIfSync(watchers, clientv3.InvalidWatchID)
|
||||
}
|
||||
|
||||
func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if _, ok := s.synced.watchers[w]; ok {
|
||||
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
|
||||
// If the ch is full, this watcher is receiving events.
|
||||
// We do not need to send progress at all.
|
||||
// Any watcher unsynced?
|
||||
for _, w := range watchers {
|
||||
if _, ok := s.synced.watchers[w]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// If all watchers are synchronised, send out progress
|
||||
// notification on first watcher. Note that all watchers
|
||||
// should have the same underlying stream, and the progress
|
||||
// notification will be broadcasted client-side if required
|
||||
// (see dispatchEvent in client/v3/watch.go)
|
||||
for _, w := range watchers {
|
||||
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type watcher struct {
|
||||
|
@ -58,6 +58,13 @@ type WatchStream interface {
|
||||
// of the watchers since the watcher is currently synced.
|
||||
RequestProgress(id WatchID)
|
||||
|
||||
// RequestProgressAll requests a progress notification for all
|
||||
// watchers sharing the stream. If all watchers are synced, a
|
||||
// progress notification with watch ID -1 will be sent to an
|
||||
// arbitrary watcher of this stream, and the function returns
|
||||
// true.
|
||||
RequestProgressAll() bool
|
||||
|
||||
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
|
||||
// returned.
|
||||
Cancel(id WatchID) error
|
||||
@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) {
|
||||
}
|
||||
ws.watchable.progress(w)
|
||||
}
|
||||
|
||||
func (ws *watchStream) RequestProgressAll() bool {
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
return ws.watchable.progressAll(ws.watchers)
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/lease"
|
||||
"go.etcd.io/etcd/mvcc/backend"
|
||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||
@ -342,6 +343,58 @@ func TestWatcherRequestProgress(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherRequestProgressAll(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
testKey := []byte("foo")
|
||||
notTestKey := []byte("bad")
|
||||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
// Create watch stream with watcher. We will not actually get
|
||||
// any notifications on it specifically, but there needs to be
|
||||
// at least one Watch for progress notifications to get
|
||||
// generated.
|
||||
w := s.NewWatchStream()
|
||||
w.Watch(0, notTestKey, nil, 1)
|
||||
|
||||
w.RequestProgressAll()
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
t.Fatalf("unexpected %+v", resp)
|
||||
default:
|
||||
}
|
||||
|
||||
s.syncWatchers()
|
||||
|
||||
w.RequestProgressAll()
|
||||
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if !reflect.DeepEqual(resp, wrs) {
|
||||
t.Fatalf("got %+v, expect %+v", resp, wrs)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive progress")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherWatchWithFilter(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
|
||||
|
Loading…
x
Reference in New Issue
Block a user