Merge pull request #12271 from jingyih/add_watch_notify_interval_flag_in_testing

integration: add WatchProgressNotifyInterval in integration test
This commit is contained in:
Joe Betz 2020-09-09 08:56:48 -07:00 committed by GitHub
commit 10fa9614e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 34 deletions

View File

@ -584,6 +584,30 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
} }
} }
func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
progressInterval := 200 * time.Millisecond
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Size: 3,
WatchProgressNotifyInterval: progressInterval,
})
defer clus.Terminate(t)
opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
rch := clus.RandClient().Watch(context.Background(), "foo", opts...)
timeout := 1 * time.Second // we expect to receive watch progress notify in 2 * progressInterval,
// but for CPU-starved situation it may take longer. So we use 1 second here for timeout.
select {
case resp := <-rch: // waiting for a watch progress notify response
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
case <-time.After(timeout):
t.Fatalf("timed out waiting for watch progress notify response in %v", timeout)
}
}
func TestWatchRequestProgress(t *testing.T) { func TestWatchRequestProgress(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string

View File

@ -152,6 +152,8 @@ type ClusterConfig struct {
EnableLeaseCheckpoint bool EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration LeaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
} }
type cluster struct { type cluster struct {
@ -279,23 +281,24 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t testing.TB) *member { func (c *cluster) mustNewMember(t testing.TB) *member {
m := mustNewMember(t, m := mustNewMember(t,
memberConfig{ memberConfig{
name: c.name(rand.Int()), name: c.name(rand.Int()),
authToken: c.cfg.AuthToken, authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS, peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS, clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes, quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps, maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes, maxRequestBytes: c.cfg.MaxRequestBytes,
snapshotCount: c.cfg.SnapshotCount, snapshotCount: c.cfg.SnapshotCount,
snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP, useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
}) })
m.DiscoveryURL = c.cfg.DiscoveryURL m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC { if c.cfg.UseGRPC {
@ -568,23 +571,24 @@ type member struct {
func (m *member) GRPCAddr() string { return m.grpcAddr } func (m *member) GRPCAddr() string { return m.grpcAddr }
type memberConfig struct { type memberConfig struct {
name string name string
peerTLS *transport.TLSInfo peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo clientTLS *transport.TLSInfo
authToken string authToken string
quotaBackendBytes int64 quotaBackendBytes int64
maxTxnOps uint maxTxnOps uint
maxRequestBytes uint maxRequestBytes uint
snapshotCount uint64 snapshotCount uint64
snapshotCatchUpEntries uint64 snapshotCatchUpEntries uint64
grpcKeepAliveMinTime time.Duration grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration grpcKeepAliveTimeout time.Duration
clientMaxCallSendMsgSize int clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int clientMaxCallRecvMsgSize int
useIP bool useIP bool
enableLeaseCheckpoint bool enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
} }
// mustNewMember return an inited member with the given name. If peerTLS is // mustNewMember return an inited member with the given name. If peerTLS is
@ -678,6 +682,8 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
m.InitialCorruptCheck = true m.InitialCorruptCheck = true
lcfg := logutil.DefaultZapLoggerConfig lcfg := logutil.DefaultZapLoggerConfig