From fd0e68d16bd999a27fd40be9fb965dd1ff497395 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 10:57:54 -0800 Subject: [PATCH] clientv3/integration: test lease keepalive --- clientv3/integration/lease_test.go | 111 ++++++++++++++++++++++++++++- clientv3/lease.go | 6 ++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index c44fafcbd..ee6ab1c9b 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -16,6 +16,7 @@ package integration import ( "testing" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" @@ -50,7 +51,7 @@ func TestLeaseCreate(t *testing.T) { func TestLeaseRevoke(t *testing.T) { defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) lapi := clientv3.NewLease(clus.RandClient()) @@ -73,3 +74,111 @@ func TestLeaseRevoke(t *testing.T) { t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound) } } + +func TestLeaseKeepAliveOnce(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + _, err = lapi.KeepAliveOnce(context.Background(), lease.LeaseID(resp.ID)) + if err != nil { + t.Errorf("failed to keepalive lease", err) + } +} + +func TestLeaseKeepAlive(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID)) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } + + kresp, ok := <-rc + if !ok { + t.Errorf("chan is closed, want not closed") + } + + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } + + lapi.Close() + + _, ok = <-rc + if ok { + t.Errorf("chan is not closed, want lease Close() closes chan") + } +} + +// TODO: add a client that can connect to all the members of cluster via unix sock. +// TODO: test handle more complicated failures. +func TestLeaseKeepAliveHandleFailure(t *testing.T) { + t.Skip("test it when we have a cluster client") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // TODO: change this line to get a cluster client + lapi := clientv3.NewLease(clus.RandClient()) + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID)) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } + + kresp := <-rc + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } + + // restart the connected member. + clus.Members[0].Stop(t) + + select { + case <-rc: + t.Fatalf("unexpected keepalive") + case <-time.After(10*time.Second/3 + 1): + } + + // recover the member. + clus.Members[0].Restart(t) + + kresp = <-rc + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } + + lapi.Close() + + _, ok := <-rc + if ok { + t.Errorf("chan is not closed, want lease Close() closes chan") + } +} diff --git a/clientv3/lease.go b/clientv3/lease.go index 071f6f7b8..53f779fd2 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -206,12 +206,18 @@ func (l *lessor) recvKeepAliveLoop() { defer func() { l.stopCancel() close(l.donec) + for _, ch := range l.keepAlives { + close(ch) + } }() stream, serr := l.resetRecv() for { resp, err := stream.Recv() if err != nil { + if isRPCError(err) { + return + } if stream, serr = l.resetRecv(); serr != nil { return }