From e0bcd4d5161435082aa3aecbb6cff94b1038a911 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Mon, 5 Dec 2016 19:22:15 +0200 Subject: [PATCH 1/3] clientv3: return error from KeepAlive if corresponding loop exits after recvKeepAliveLoop exits client might call KeepAlive adding request channel that will not be closed this fix makes sure that recvKeepAliveLoop is running before adding request to lessor's list and returns error otherwise Fixes #6922 --- clientv3/lease.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/clientv3/lease.go b/clientv3/lease.go index d3b587e8f..987961f85 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -15,6 +15,7 @@ package clientv3 import ( + "errors" "sync" "time" @@ -69,6 +70,8 @@ const ( NoLease LeaseID = 0 ) +var ErrLeaseHalted = errors.New("etcdclient: leases halted") + type Lease interface { // Grant creates a new lease. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) @@ -216,6 +219,14 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) l.mu.Lock() + // ensure that recvKeepAliveLoop is still running + select { + case <-l.donec: + l.mu.Unlock() + close(ch) + return ch, ErrLeaseHalted + default: + } ka, ok := l.keepAlives[id] if !ok { // create fresh keep alive From 5183ce01187a80593a26cff2ee039158d81bf5d5 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 15 Dec 2016 02:43:41 +0200 Subject: [PATCH 2/3] clientv3: add test for keep alive loop exit case --- clientv3/integration/lease_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 6429eef54..06bec7b84 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -553,3 +553,23 @@ func TestLeaseRenewLostQuorum(t *testing.T) { t.Fatalf("timed out waiting for keepalive") } } + +func TestLeaseKeepAliveLoopExit(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx := context.Background() + cli := clus.Client(0) + + resp, err := cli.Grant(ctx, 5) + if err != nil { + t.Fatal(err) + } + cli.Lease.Close() + + if _, err := cli.KeepAlive(ctx, resp.ID); err != clientv3.ErrLeaseHalted { + t.Fatalf("expected %v, got %v", clientv3.ErrLeaseHalted, err) + } +} From b126e31132758b7ea9171464dd7d38b998ad6118 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 15 Dec 2016 16:06:27 +0200 Subject: [PATCH 3/3] clientv3: better error message for keep alive loop halt --- clientv3/integration/lease_test.go | 5 +++-- clientv3/lease.go | 30 +++++++++++++++++++++++------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 06bec7b84..82cd8a767 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -569,7 +569,8 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) { } cli.Lease.Close() - if _, err := cli.KeepAlive(ctx, resp.ID); err != clientv3.ErrLeaseHalted { - t.Fatalf("expected %v, got %v", clientv3.ErrLeaseHalted, err) + _, err = cli.KeepAlive(ctx, resp.ID) + if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok { + t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err) } } diff --git a/clientv3/lease.go b/clientv3/lease.go index 987961f85..81c9c6072 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -15,7 +15,6 @@ package clientv3 import ( - "errors" "sync" "time" @@ -70,7 +69,20 @@ const ( NoLease LeaseID = 0 ) -var ErrLeaseHalted = errors.New("etcdclient: leases halted") +// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. +// +// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected. +type ErrKeepAliveHalted struct { + Reason error +} + +func (e ErrKeepAliveHalted) Error() string { + s := "etcdclient: leases keep alive halted" + if e.Reason != nil { + s += ": " + e.Reason.Error() + } + return s +} type Lease interface { // Grant creates a new lease. @@ -97,8 +109,9 @@ type Lease interface { type lessor struct { mu sync.Mutex // guards all fields - // donec is closed when recvKeepAliveLoop stops - donec chan struct{} + // donec is closed and loopErr is set when recvKeepAliveLoop stops + donec chan struct{} + loopErr error remote pb.LeaseClient @@ -222,9 +235,10 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl // ensure that recvKeepAliveLoop is still running select { case <-l.donec: + err := l.loopErr l.mu.Unlock() close(ch) - return ch, ErrLeaseHalted + return ch, ErrKeepAliveHalted{Reason: err} default: } ka, ok := l.keepAlives[id] @@ -338,10 +352,11 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return karesp, nil } -func (l *lessor) recvKeepAliveLoop() { +func (l *lessor) recvKeepAliveLoop() (gerr error) { defer func() { l.mu.Lock() close(l.donec) + l.loopErr = gerr for _, ka := range l.keepAlives { ka.Close() } @@ -354,13 +369,14 @@ func (l *lessor) recvKeepAliveLoop() { resp, err := stream.Recv() if err != nil { if isHaltErr(l.stopCtx, err) { - return + return err } stream, serr = l.resetRecv() continue } l.recvKeepAlive(resp) } + return serr } // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests