From 8431801814fecac48793da6a5d87cb61daeedef9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 12:07:34 -0800 Subject: [PATCH 1/2] lease: fix lease init race --- clientv3/lease.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index ddfc63c05..129043eb5 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -57,8 +57,10 @@ type Lease interface { type lessor struct { c *Client - mu sync.Mutex // guards all fields - conn *grpc.ClientConn // conn in-use + mu sync.Mutex // guards all fields + conn *grpc.ClientConn // conn in-use + initedc chan bool + remote pb.LeaseClient stream pb.Lease_LeaseKeepAliveClient @@ -76,12 +78,17 @@ func NewLease(c *Client) Lease { c: c, conn: c.ActiveConnection(), + initedc: make(chan bool, 1), + keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse), deadlines: make(map[lease.LeaseID]time.Time), } l.remote = pb.NewLeaseClient(l.conn) l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) + + l.initedc <- false + go l.recvKeepAliveLoop() go l.sendKeepAliveLoop() @@ -103,7 +110,6 @@ func (l *lessor) Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, e if isRPCError(err) { return nil, err } - if nerr := l.switchRemoteAndStream(err); nerr != nil { return nil, nerr } @@ -354,14 +360,17 @@ func (l *lessor) newStream() error { } func (l *lessor) initStream() bool { - if l.getKeepAliveStream() != nil { + ok := <-l.initedc + if ok { return true } err := l.switchRemoteAndStream(nil) if err == nil { + l.initedc <- true return true } + l.initedc <- false return false } From bef7887c0d86667faee1358f9e359aa2f5739ce4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 12:08:17 -0800 Subject: [PATCH 2/2] clientv3/integration: add basic lease test --- .../{client_test.go => kv_test.go} | 0 clientv3/integration/lease_test.go | 75 +++++++++++++++++++ etcdserver/api/v3rpc/key.go | 3 + 3 files changed, 78 insertions(+) rename clientv3/integration/{client_test.go => kv_test.go} (100%) create mode 100644 clientv3/integration/lease_test.go diff --git a/clientv3/integration/client_test.go b/clientv3/integration/kv_test.go similarity index 100% rename from clientv3/integration/client_test.go rename to clientv3/integration/kv_test.go diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go new file mode 100644 index 000000000..c44fafcbd --- /dev/null +++ b/clientv3/integration/lease_test.go @@ -0,0 +1,75 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "testing" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestLeaseCreate(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + kv := clientv3.NewKV(clus.RandClient()) + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + _, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID)) + if err != nil { + t.Fatalf("failed to create key with lease %v", err) + } +} + +func TestLeaseRevoke(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + kv := clientv3.NewKV(clus.RandClient()) + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + _, err = lapi.Revoke(context.Background(), lease.LeaseID(resp.ID)) + if err != nil { + t.Errorf("failed to revoke lease", err) + } + + _, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID)) + if err != v3rpc.ErrLeaseNotFound { + t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound) + } +} diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index afe0a446e..52a8bb564 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage" ) @@ -213,6 +214,8 @@ func togRPCError(err error) error { return ErrCompacted case storage.ErrFutureRev: return ErrFutureRev + case lease.ErrLeaseNotFound: + return ErrLeaseNotFound // TODO: handle error from raft and timeout default: return grpc.Errorf(codes.Internal, err.Error())