diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 029765ee8..ee1402d5f 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -682,3 +682,61 @@ func TestV3LeaseFailureOverlap(t *testing.T) { mkReqs(4) wg.Wait() } + +// TestLeaseWithRequireLeader checks keep-alive channel close when no leader. +func TestLeaseWithRequireLeader(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + defer clus.Terminate(t) + + c := clus.Client(0) + lid1, err1 := c.Grant(context.TODO(), 60) + if err1 != nil { + t.Fatal(err1) + } + lid2, err2 := c.Grant(context.TODO(), 60) + if err2 != nil { + t.Fatal(err2) + } + // kaReqLeader close if the leader is lost + kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID) + if kerr1 != nil { + t.Fatal(kerr1) + } + // kaWait will wait even if the leader is lost + kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID) + if kerr2 != nil { + t.Fatal(kerr2) + } + + select { + case <-kaReqLeader: + case <-time.After(5 * time.Second): + t.Fatalf("require leader first keep-alive timed out") + } + select { + case <-kaWait: + case <-time.After(5 * time.Second): + t.Fatalf("leader not required first keep-alive timed out") + } + + clus.Members[1].Stop(t) + + select { + case resp, ok := <-kaReqLeader: + if ok { + t.Fatalf("expected closed require leader, got response %+v", resp) + } + case <-time.After(5 * time.Second): + t.Fatal("keepalive with require leader took too long to close") + } + select { + case _, ok := <-kaWait: + if !ok { + t.Fatalf("got closed channel with no require leader, expected non-closed") + } + case <-time.After(10 * time.Millisecond): + // wait some to detect any closes happening soon after kaReqLeader closing + } +} diff --git a/clientv3/lease.go b/clientv3/lease.go index 900769386..a6494ceee 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,6 +22,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type ( @@ -67,6 +68,9 @@ const ( leaseResponseChSize = 16 // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 + + // retryConnWait is how long to wait before retrying on a lost leader + retryConnWait = 500 * time.Millisecond ) // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. @@ -157,7 +161,8 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL } - l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) + reqLeaderCtx := WithRequireLeader(context.Background()) + l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx) return l } @@ -309,6 +314,45 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha } } +// closeRequireLeader scans all keep alives for ctxs that have require leader +// and closes the associated channels. +func (l *lessor) closeRequireLeader() { + l.mu.Lock() + defer l.mu.Unlock() + for _, ka := range l.keepAlives { + reqIdxs := 0 + // find all required leader channels, close, mark as nil + for i, ctx := range ka.ctxs { + md, ok := metadata.FromContext(ctx) + if !ok { + continue + } + ks := md[rpctypes.MetadataRequireLeaderKey] + if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader { + continue + } + close(ka.chs[i]) + ka.chs[i] = nil + reqIdxs++ + } + if reqIdxs == 0 { + continue + } + // remove all channels that required a leader from keepalive + newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) + newCtxs := make([]context.Context, len(newChs)) + newIdx := 0 + for i := range ka.chs { + if ka.chs[i] == nil { + continue + } + newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx] + newIdx++ + } + ka.chs, ka.ctxs = newChs, newCtxs + } +} + func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { cctx, cancel := context.WithCancel(ctx) defer cancel() @@ -351,14 +395,22 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { stream, serr := l.resetRecv() for serr == nil { resp, err := stream.Recv() - if err != nil { - if isHaltErr(l.stopCtx, err) { - return err - } - stream, serr = l.resetRecv() + if err == nil { + l.recvKeepAlive(resp) continue } - l.recvKeepAlive(resp) + err = toErr(l.stopCtx, err) + if err == rpctypes.ErrNoLeader { + l.closeRequireLeader() + select { + case <-time.After(retryConnWait): + case <-l.stopCtx.Done(): + return err + } + } else if isHaltErr(l.stopCtx, err) { + return err + } + stream, serr = l.resetRecv() } return serr } @@ -375,7 +427,6 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { l.mu.Lock() defer l.mu.Unlock() if l.stream != nil && l.streamCancel != nil { - l.stream.CloseSend() l.streamCancel() } diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index 2922b3be9..a25d0ce6a 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -18,6 +18,7 @@ import ( "io" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" "golang.org/x/net/context" @@ -67,7 +68,24 @@ func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLi return resp, nil } -func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { +func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) { + errc := make(chan error, 1) + go func() { + errc <- ls.leaseKeepAlive(stream) + }() + select { + case err = <-errc: + case <-stream.Context().Done(): + // the only server-side cancellation is noleader for now. + err = stream.Context().Err() + if err == context.Canceled { + err = rpctypes.ErrGRPCNoLeader + } + } + return err +} + +func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { for { req, err := stream.Recv() if err == io.EOF { diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 9281df78a..a1599f6dc 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -20,6 +20,7 @@ import ( "time" "golang.org/x/net/context" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" @@ -490,6 +491,45 @@ func TestV3LeaseFailover(t *testing.T) { } } +// TestV3LeaseRequireLeader ensures that a Recv will get a leader +// loss error if there is no leader. +func TestV3LeaseRequireLeader(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lc := toGRPC(clus.Client(0)).Lease + clus.Members[1].Stop(t) + clus.Members[2].Stop(t) + + md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + mctx := metadata.NewContext(context.Background(), md) + ctx, cancel := context.WithCancel(mctx) + defer cancel() + lac, err := lc.LeaseKeepAlive(ctx) + if err != nil { + t.Fatal(err) + } + + donec := make(chan struct{}) + go func() { + defer close(donec) + resp, err := lac.Recv() + if err == nil { + t.Fatalf("got response %+v, expected error", resp) + } + if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) + } + }() + select { + case <-time.After(time.Duration(5*electionTicks) * tickDuration): + t.Fatalf("did not receive leader loss error") + case <-donec: + } +} + const fiveMinTTL int64 = 300 // TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.