etcd/clientv3/integration/lease_test.go
Gyuho Lee fbc0510a4e clientv3: fix keepalive send interval when response queue is full
client should update next keepalive send time
even when lease keepalive response queue becomes full.

Otherwise, client sends keepalive request every 500ms
regardless of TTL when the send is only expected to happen
with the interval of TTL / 3 at minimum.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-23 08:51:18 -07:00

843 lines
21 KiB
Go

// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"google.golang.org/grpc"
)
func TestLeaseNotFoundError(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
kv := clus.RandClient()
_, err := kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(500)))
if err != rpctypes.ErrLeaseNotFound {
t.Fatalf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
}
}
func TestLeaseGrant(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
lapi := clus.RandClient()
kv := clus.RandClient()
_, merr := lapi.Grant(context.Background(), clientv3.MaxLeaseTTL+1)
if merr != rpctypes.ErrLeaseTTLTooLarge {
t.Fatalf("err = %v, want %v", merr, rpctypes.ErrLeaseTTLTooLarge)
}
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
_, err = kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
t.Fatalf("failed to create key with lease %v", err)
}
}
func TestLeaseRevoke(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
lapi := clus.RandClient()
kv := clus.RandClient()
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
_, err = lapi.Revoke(context.Background(), clientv3.LeaseID(resp.ID))
if err != nil {
t.Errorf("failed to revoke lease %v", err)
}
_, err = kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != rpctypes.ErrLeaseNotFound {
t.Fatalf("err = %v, want %v", err, rpctypes.ErrLeaseNotFound)
}
}
func TestLeaseKeepAliveOnce(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
lapi := clus.RandClient()
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
_, err = lapi.KeepAliveOnce(context.Background(), resp.ID)
if err != nil {
t.Errorf("failed to keepalive lease %v", err)
}
_, err = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0))
if err != rpctypes.ErrLeaseNotFound {
t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
}
}
func TestLeaseKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
lapi := clus.Client(0)
clus.TakeClient(0)
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
kresp, ok := <-rc
if !ok {
t.Errorf("chan is closed, want not closed")
}
if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
lapi.Close()
_, ok = <-rc
if ok {
t.Errorf("chan is not closed, want lease Close() closes chan")
}
}
func TestLeaseKeepAliveOneSecond(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
resp, err := cli.Grant(context.Background(), 1)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed")
}
}
}
// TODO: add a client that can connect to all the members of cluster via unix sock.
// TODO: test handle more complicated failures.
func TestLeaseKeepAliveHandleFailure(t *testing.T) {
t.Skip("test it when we have a cluster client")
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// TODO: change this line to get a cluster client
lapi := clus.RandClient()
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
kresp := <-rc
if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
// restart the connected member.
clus.Members[0].Stop(t)
select {
case <-rc:
t.Fatalf("unexpected keepalive")
case <-time.After(10*time.Second/3 + 1):
}
// recover the member.
clus.Members[0].Restart(t)
kresp = <-rc
if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
lapi.Close()
_, ok := <-rc
if ok {
t.Errorf("chan is not closed, want lease Close() closes chan")
}
}
type leaseCh struct {
lid clientv3.LeaseID
ch <-chan *clientv3.LeaseKeepAliveResponse
}
// TestLeaseKeepAliveNotFound ensures a revoked lease won't halt other leases.
func TestLeaseKeepAliveNotFound(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
lchs := []leaseCh{}
for i := 0; i < 3; i++ {
resp, rerr := cli.Grant(context.TODO(), 5)
if rerr != nil {
t.Fatal(rerr)
}
kach, kaerr := cli.KeepAlive(context.Background(), resp.ID)
if kaerr != nil {
t.Fatal(kaerr)
}
lchs = append(lchs, leaseCh{resp.ID, kach})
}
if _, err := cli.Revoke(context.TODO(), lchs[1].lid); err != nil {
t.Fatal(err)
}
<-lchs[0].ch
if _, ok := <-lchs[0].ch; !ok {
t.Fatalf("closed keepalive on wrong lease")
}
timec := time.After(5 * time.Second)
for range lchs[1].ch {
select {
case <-timec:
t.Fatalf("revoke did not close keep alive")
default:
}
}
}
func TestLeaseGrantErrConnClosed(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
clus.TakeClient(0)
donec := make(chan struct{})
go func() {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Fatalf("expected %v or %v, got %v", grpc.ErrClientConnClosing, context.Canceled, err)
}
}()
if err := cli.Close(); err != nil {
t.Fatal(err)
}
select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
}
// TestLeaseKeepAliveFullResponseQueue ensures when response
// queue is full thus dropping keepalive response sends,
// keepalive request is sent with the same rate of TTL / 3.
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
lapi := clus.Client(0)
// expect lease keepalive every 10-second
lresp, err := lapi.Grant(context.Background(), 30)
if err != nil {
t.Fatalf("failed to create lease %v", err)
}
id := lresp.ID
old := clientv3.LeaseResponseChSize
defer func() {
clientv3.LeaseResponseChSize = old
}()
clientv3.LeaseResponseChSize = 0
// never fetch from response queue, and let it become full
_, err = lapi.KeepAlive(context.Background(), id)
if err != nil {
t.Fatalf("failed to keepalive lease %v", err)
}
// TTL should not be refreshed after 3 seconds
// expect keepalive to be triggered after TTL/3
time.Sleep(3 * time.Second)
tr, terr := lapi.TimeToLive(context.Background(), id)
if terr != nil {
t.Fatalf("failed to get lease information %v", terr)
}
if tr.TTL >= 29 {
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
}
}
func TestLeaseGrantNewAfterClose(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
clus.TakeClient(0)
if err := cli.Close(); err != nil {
t.Fatal(err)
}
donec := make(chan struct{})
go func() {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
}
func TestLeaseRevokeNewAfterClose(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
t.Fatal(err)
}
leaseID := resp.ID
clus.TakeClient(0)
if err := cli.Close(); err != nil {
t.Fatal(err)
}
donec := make(chan struct{})
go func() {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Revoke took too long")
case <-donec:
}
}
// TestLeaseKeepAliveCloseAfterDisconnectRevoke ensures the keep alive channel is closed
// following a disconnection, lease revoke, then reconnect.
func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli := clus.Client(0)
// setup lease and do a keepalive
resp, err := cli.Grant(context.Background(), 10)
if err != nil {
t.Fatal(err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
kresp := <-rc
if kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
}
// keep client disconnected
clus.Members[0].Stop(t)
time.Sleep(time.Second)
clus.WaitLeader(t)
if _, err := clus.Client(1).Revoke(context.TODO(), resp.ID); err != nil {
t.Fatal(err)
}
clus.Members[0].Restart(t)
// some responses may still be buffered; drain until close
timer := time.After(time.Duration(kresp.TTL) * time.Second)
for kresp != nil {
select {
case kresp = <-rc:
case <-timer:
t.Fatalf("keepalive channel did not close")
}
}
}
// TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
// the initial keep alive request never gets a response.
func TestLeaseKeepAliveInitTimeout(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
// setup lease and do a keepalive
resp, err := cli.Grant(context.Background(), 5)
if err != nil {
t.Fatal(err)
}
// keep client disconnected
clus.Members[0].Stop(t)
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
select {
case ka, ok := <-rc:
if ok {
t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
}
case <-time.After(10 * time.Second):
t.Fatalf("keepalive channel did not close")
}
clus.Members[0].Restart(t)
}
// TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
// a keep alive request after the first never gets a response.
func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
// setup lease and do a keepalive
resp, err := cli.Grant(context.Background(), 5)
if err != nil {
t.Fatal(err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
if kresp := <-rc; kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
}
// keep client disconnected
clus.Members[0].Stop(t)
select {
case ka, ok := <-rc:
if ok {
t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
}
case <-time.After(10 * time.Second):
t.Fatalf("keepalive channel did not close")
}
clus.Members[0].Restart(t)
}
func TestLeaseTimeToLive(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
c := clus.RandClient()
lapi := c
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
kv := clus.RandClient()
keys := []string{"foo1", "foo2"}
for i := range keys {
if _, err = kv.Put(context.TODO(), keys[i], "bar", clientv3.WithLease(resp.ID)); err != nil {
t.Fatal(err)
}
}
// linearized read to ensure Puts propagated to server backing lapi
if _, err := c.Get(context.TODO(), "abc"); err != nil {
t.Fatal(err)
}
lresp, lerr := lapi.TimeToLive(context.Background(), resp.ID, clientv3.WithAttachedKeys())
if lerr != nil {
t.Fatal(lerr)
}
if lresp.ID != resp.ID {
t.Fatalf("leaseID expected %d, got %d", resp.ID, lresp.ID)
}
if lresp.GrantedTTL != int64(10) {
t.Fatalf("GrantedTTL expected %d, got %d", 10, lresp.GrantedTTL)
}
if lresp.TTL == 0 || lresp.TTL > lresp.GrantedTTL {
t.Fatalf("unexpected TTL %d (granted %d)", lresp.TTL, lresp.GrantedTTL)
}
ks := make([]string, len(lresp.Keys))
for i := range lresp.Keys {
ks[i] = string(lresp.Keys[i])
}
sort.Strings(ks)
if !reflect.DeepEqual(ks, keys) {
t.Fatalf("keys expected %v, got %v", keys, ks)
}
lresp, lerr = lapi.TimeToLive(context.Background(), resp.ID)
if lerr != nil {
t.Fatal(lerr)
}
if len(lresp.Keys) != 0 {
t.Fatalf("unexpected keys %+v", lresp.Keys)
}
}
func TestLeaseTimeToLiveLeaseNotFound(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
resp, err := cli.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
_, err = cli.Revoke(context.Background(), resp.ID)
if err != nil {
t.Errorf("failed to Revoke lease %v", err)
}
lresp, err := cli.TimeToLive(context.Background(), resp.ID)
// TimeToLive() should return a response with TTL=-1.
if err != nil {
t.Fatalf("expected err to be nil")
}
if lresp == nil {
t.Fatalf("expected lresp not to be nil")
}
if lresp.ResponseHeader == nil {
t.Fatalf("expected ResponseHeader not to be nil")
}
if lresp.ID != resp.ID {
t.Fatalf("expected Lease ID %v, but got %v", resp.ID, lresp.ID)
}
if lresp.TTL != -1 {
t.Fatalf("expected TTL %v, but got %v", lresp.TTL, lresp.TTL)
}
}
func TestLeaseLeases(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
ids := []clientv3.LeaseID{}
for i := 0; i < 5; i++ {
resp, err := cli.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
ids = append(ids, resp.ID)
}
resp, err := cli.Leases(context.Background())
if err != nil {
t.Fatal(err)
}
if len(resp.Leases) != 5 {
t.Fatalf("len(resp.Leases) expected 5, got %d", len(resp.Leases))
}
for i := range resp.Leases {
if ids[i] != resp.Leases[i].ID {
t.Fatalf("#%d: lease ID expected %d, got %d", i, ids[i], resp.Leases[i].ID)
}
}
}
// TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
// for a while.
func TestLeaseRenewLostQuorum(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli := clus.Client(0)
r, err := cli.Grant(context.TODO(), 4)
if err != nil {
t.Fatal(err)
}
kctx, kcancel := context.WithCancel(context.Background())
defer kcancel()
ka, err := cli.KeepAlive(kctx, r.ID)
if err != nil {
t.Fatal(err)
}
// consume first keepalive so next message sends when cluster is down
<-ka
lastKa := time.Now()
// force keepalive stream message to timeout
clus.Members[1].Stop(t)
clus.Members[2].Stop(t)
// Use TTL-2 since the client closes the keepalive channel if no
// keepalive arrives before the lease deadline; the client will
// try to resend a keepalive after TTL/3 seconds, so for a TTL of 4,
// sleeping for 2s should be sufficient time for issuing a retry.
// The cluster has two seconds to recover and reply to the keepalive.
time.Sleep(time.Duration(r.TTL-2) * time.Second)
clus.Members[1].Restart(t)
clus.Members[2].Restart(t)
if time.Since(lastKa) > time.Duration(r.TTL)*time.Second {
t.Skip("waited too long for server stop and restart")
}
select {
case _, ok := <-ka:
if !ok {
t.Fatalf("keepalive closed")
}
case <-time.After(time.Duration(r.TTL) * time.Second):
t.Fatalf("timed out waiting for keepalive")
}
}
func TestLeaseKeepAliveLoopExit(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
ctx := context.Background()
cli := clus.Client(0)
clus.TakeClient(0)
resp, err := cli.Grant(ctx, 5)
if err != nil {
t.Fatal(err)
}
cli.Close()
_, err = cli.KeepAlive(ctx, resp.ID)
if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
}
}
// TestV3LeaseFailureOverlap issues Grant and KeepAlive requests to a cluster
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
// transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)
numReqs := 5
cli := clus.Client(0)
// bring up a session, tear it down
updown := func(i int) error {
sess, err := concurrency.NewSession(cli)
if err != nil {
return err
}
ch := make(chan struct{})
go func() {
defer close(ch)
sess.Close()
}()
select {
case <-ch:
case <-time.After(time.Minute / 4):
t.Fatalf("timeout %d", i)
}
return nil
}
var wg sync.WaitGroup
mkReqs := func(n int) {
wg.Add(numReqs)
for i := 0; i < numReqs; i++ {
go func() {
defer wg.Done()
err := updown(n)
if err == nil || err == rpctypes.ErrTimeoutDueToConnectionLost {
return
}
t.Fatal(err)
}()
}
}
mkReqs(1)
clus.Members[1].Stop(t)
mkReqs(2)
time.Sleep(time.Second)
mkReqs(3)
clus.Members[1].Restart(t)
mkReqs(4)
wg.Wait()
}
// TestLeaseWithRequireLeader checks keep-alive channel close when no leader.
func TestLeaseWithRequireLeader(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)
c := clus.Client(0)
lid1, err1 := c.Grant(context.TODO(), 60)
if err1 != nil {
t.Fatal(err1)
}
lid2, err2 := c.Grant(context.TODO(), 60)
if err2 != nil {
t.Fatal(err2)
}
// kaReqLeader close if the leader is lost
kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID)
if kerr1 != nil {
t.Fatal(kerr1)
}
// kaWait will wait even if the leader is lost
kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID)
if kerr2 != nil {
t.Fatal(kerr2)
}
select {
case <-kaReqLeader:
case <-time.After(5 * time.Second):
t.Fatalf("require leader first keep-alive timed out")
}
select {
case <-kaWait:
case <-time.After(5 * time.Second):
t.Fatalf("leader not required first keep-alive timed out")
}
clus.Members[1].Stop(t)
// kaReqLeader may issue multiple requests while waiting for the first
// response from proxy server; drain any stray keepalive responses
time.Sleep(100 * time.Millisecond)
for len(kaReqLeader) > 0 {
<-kaReqLeader
}
select {
case resp, ok := <-kaReqLeader:
if ok {
t.Fatalf("expected closed require leader, got response %+v", resp)
}
case <-time.After(5 * time.Second):
t.Fatal("keepalive with require leader took too long to close")
}
select {
case _, ok := <-kaWait:
if !ok {
t.Fatalf("got closed channel with no require leader, expected non-closed")
}
case <-time.After(10 * time.Millisecond):
// wait some to detect any closes happening soon after kaReqLeader closing
}
}