mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Guarantee order of requested progress notifications
Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent. Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
@@ -22,10 +22,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TestWatcherWatchID tests that each watcher provides unique watchID,
|
||||
@@ -342,6 +345,58 @@ func TestWatcherRequestProgress(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherRequestProgressAll(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
testKey := []byte("foo")
|
||||
notTestKey := []byte("bad")
|
||||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
// Create watch stream with watcher. We will not actually get
|
||||
// any notifications on it specifically, but there needs to be
|
||||
// at least one Watch for progress notifications to get
|
||||
// generated.
|
||||
w := s.NewWatchStream()
|
||||
w.Watch(0, notTestKey, nil, 1)
|
||||
|
||||
w.RequestProgressAll()
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
t.Fatalf("unexpected %+v", resp)
|
||||
default:
|
||||
}
|
||||
|
||||
s.syncWatchers()
|
||||
|
||||
w.RequestProgressAll()
|
||||
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if !reflect.DeepEqual(resp, wrs) {
|
||||
t.Fatalf("got %+v, expect %+v", resp, wrs)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive progress")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherWatchWithFilter(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
|
||||
Reference in New Issue
Block a user