diff --git a/etcdserver/api/v3rpc/error.go b/etcdserver/api/v3rpc/error.go index ab01edf75..ace2f578e 100644 --- a/etcdserver/api/v3rpc/error.go +++ b/etcdserver/api/v3rpc/error.go @@ -21,12 +21,14 @@ import ( ) var ( - ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided") - ErrTooManyOps = grpc.Errorf(codes.InvalidArgument, "too many operations in txn request") - ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "duplicate key given in txn request") - ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error()) - ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error()) + ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided") + ErrTooManyOps = grpc.Errorf(codes.InvalidArgument, "too many operations in txn request") + ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "duplicate key given in txn request") + ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error()) + ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error()) + ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "requested lease not found") + ErrLeaseExist = grpc.Errorf(codes.FailedPrecondition, "lease already exists") ErrMemberExist = grpc.Errorf(codes.FailedPrecondition, "member ID already exist") ErrPeerURLExist = grpc.Errorf(codes.FailedPrecondition, "Peer URLs already exists") diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index cd9db152c..5e2aedfaf 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -32,7 +32,11 @@ func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer { } func (ls *LeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { - return ls.le.LeaseCreate(ctx, cr) + resp, err := ls.le.LeaseCreate(ctx, cr) + if err == lease.ErrLeaseExists { + return nil, ErrLeaseExist + } + return resp, err } func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { @@ -54,6 +58,10 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro } ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) + if err == lease.ErrLeaseNotFound { + return ErrLeaseNotFound + } + if err != nil && err != lease.ErrLeaseNotFound { return err } diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 0d54812d3..8b1296856 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -98,11 +98,7 @@ func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) if err != nil { return nil, err } - resp := result.resp.(*pb.LeaseCreateResponse) - if result.err != nil { - resp.Error = result.err.Error() - } - return resp, nil + return result.resp.(*pb.LeaseCreateResponse), result.err } func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { diff --git a/integration/cluster.go b/integration/cluster.go index 75c18f3b1..666ffab86 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -289,7 +289,7 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { return } -func (c *cluster) waitLeader(t *testing.T, membs []*member) { +func (c *cluster) waitLeader(t *testing.T, membs []*member) int { possibleLead := make(map[uint64]bool) var lead uint64 for _, m := range membs { @@ -307,6 +307,14 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) { } time.Sleep(10 * tickDuration) } + + for i, m := range membs { + if uint64(m.s.ID()) == lead { + return i + } + } + + return -1 } func (c *cluster) waitVersion() { diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 1c9a6ba7d..73e215cdc 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -26,7 +26,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/storagepb" ) @@ -1270,224 +1269,6 @@ func TestV3RangeRequest(t *testing.T) { } } -// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. -func TestV3LeaseRevoke(t *testing.T) { - defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease - _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) - return err - }) -} - -// TestV3LeaseCreateById ensures leases may be created by a given id. -func TestV3LeaseCreateByID(t *testing.T) { - defer testutil.AfterTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) - - // create fixed lease - lresp, err := clus.RandClient().Lease.LeaseCreate( - context.TODO(), - &pb.LeaseCreateRequest{ID: 1, TTL: 1}) - if err != nil { - t.Errorf("could not create lease 1 (%v)", err) - } - if lresp.ID != 1 { - t.Errorf("got id %v, wanted id %v", lresp.ID, 1) - } - - // create duplicate fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( - context.TODO(), - &pb.LeaseCreateRequest{ID: 1, TTL: 1}) - if err != nil { - t.Error(err) - } - if lresp.ID != 0 || lresp.Error != lease.ErrLeaseExists.Error() { - t.Errorf("got id %v, wanted id 0 (%v)", lresp.ID, lresp.Error) - } - - // create fresh fixed lease - lresp, err = clus.RandClient().Lease.LeaseCreate( - context.TODO(), - &pb.LeaseCreateRequest{ID: 2, TTL: 1}) - if err != nil { - t.Errorf("could not create lease 2 (%v)", err) - } - if lresp.ID != 2 { - t.Errorf("got id %v, wanted id %v", lresp.ID, 2) - } - -} - -// TestV3LeaseExpire ensures a key is deleted once a key expires. -func TestV3LeaseExpire(t *testing.T) { - defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - // let lease lapse; wait for deleted key - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - wStream, err := clus.RandClient().Watch.Watch(ctx) - if err != nil { - return err - } - - wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ - CreateRequest: &pb.WatchCreateRequest{ - Key: []byte("foo"), StartRevision: 1}}} - if err := wStream.Send(wreq); err != nil { - return err - } - if _, err := wStream.Recv(); err != nil { - // the 'created' message - return err - } - if _, err := wStream.Recv(); err != nil { - // the 'put' message - return err - } - - errc := make(chan error, 1) - go func() { - resp, err := wStream.Recv() - switch { - case err != nil: - errc <- err - case len(resp.Events) != 1: - fallthrough - case resp.Events[0].Type != storagepb.DELETE: - errc <- fmt.Errorf("expected key delete, got %v", resp) - default: - errc <- nil - } - }() - - select { - case <-time.After(15 * time.Second): - return fmt.Errorf("lease expiration too slow") - case err := <-errc: - return err - } - }) -} - -// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. -func TestV3LeaseKeepAlive(t *testing.T) { - defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { - lc := clus.RandClient().Lease - lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - lac, err := lc.LeaseKeepAlive(ctx) - if err != nil { - return err - } - defer lac.CloseSend() - - // renew long enough so lease would've expired otherwise - for i := 0; i < 3; i++ { - if err = lac.Send(lreq); err != nil { - return err - } - lresp, rxerr := lac.Recv() - if rxerr != nil { - return rxerr - } - if lresp.ID != leaseID { - return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID) - } - time.Sleep(time.Duration(lresp.TTL/2) * time.Second) - } - _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) - return err - }) -} - -// TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another -// client to confirm it's visible to the whole cluster. -func TestV3LeaseExists(t *testing.T) { - defer testutil.AfterTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) - - // create lease - ctx0, cancel0 := context.WithCancel(context.Background()) - defer cancel0() - lresp, err := clus.RandClient().Lease.LeaseCreate( - ctx0, - &pb.LeaseCreateRequest{TTL: 30}) - if err != nil { - t.Fatal(err) - } - if lresp.Error != "" { - t.Fatal(lresp.Error) - } - - // confirm keepalive - ctx1, cancel1 := context.WithCancel(context.Background()) - defer cancel1() - lac, err := clus.RandClient().Lease.LeaseKeepAlive(ctx1) - if err != nil { - t.Fatal(err) - } - defer lac.CloseSend() - if err = lac.Send(&pb.LeaseKeepAliveRequest{ID: lresp.ID}); err != nil { - t.Fatal(err) - } - if _, err = lac.Recv(); err != nil { - t.Fatal(err) - } -} - -// acquireLeaseAndKey creates a new lease and creates an attached key. -func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { - // create lease - lresp, err := clus.RandClient().Lease.LeaseCreate( - context.TODO(), - &pb.LeaseCreateRequest{TTL: 1}) - if err != nil { - return 0, err - } - if lresp.Error != "" { - return 0, fmt.Errorf(lresp.Error) - } - // attach to key - put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID} - if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil { - return 0, err - } - return lresp.ID, nil -} - -// testLeaseRemoveLeasedKey performs some action while holding a lease with an -// attached key "foo", then confirms the key is gone. -func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) - - leaseID, err := acquireLeaseAndKey(clus, "foo") - if err != nil { - t.Fatal(err) - } - - if err = act(clus, leaseID); err != nil { - t.Fatal(err) - } - - // confirm no key - rreq := &pb.RangeRequest{Key: []byte("foo")} - rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) - if err != nil { - t.Fatal(err) - } - if len(rresp.Kvs) != 0 { - t.Fatalf("lease removed but key remains") - } -} - func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 { cfg.UseV3 = true cfg.UseGRPC = true diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go new file mode 100644 index 000000000..2274095be --- /dev/null +++ b/integration/v3_lease_test.go @@ -0,0 +1,297 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver/api/v3rpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/storage/storagepb" +) + +// TestV3LeasePrmote ensures the newly elected leader can promote itself +// to the primary lessor, refresh the leases and start to manage leases. +// TODO: use customized clock to make this test go faster? +func TestV3LeasePrmote(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // create lease + lresp, err := clus.RandClient().Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + + // wait until the lease is going to expire. + time.Sleep(time.Duration(lresp.TTL-1) * time.Second) + + // kill the current leader, all leases should be refreshed. + toStop := clus.waitLeader(t, clus.Members) + clus.Members[toStop].Stop(t) + + var toWait []*member + for i, m := range clus.Members { + if i != toStop { + toWait = append(toWait, m) + } + } + clus.waitLeader(t, toWait) + clus.Members[toStop].Restart(t) + clus.waitLeader(t, clus.Members) + + // ensure lease is refreshed by waiting for a "long" time. + // it was going to expire anyway. + time.Sleep(3 * time.Second) + + if !leaseExist(t, clus, lresp.ID) { + t.Error("unexpected lease not exists") + } + + // let lease expires. total lease = 5 seconds and we already + // waits for 3 seconds, so 3 seconds more is enough. + time.Sleep(3 * time.Second) + if leaseExist(t, clus, lresp.ID) { + t.Error("unexpected lease exists") + } +} + +// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. +func TestV3LeaseRevoke(t *testing.T) { + defer testutil.AfterTest(t) + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { + lc := clus.RandClient().Lease + _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) + return err + }) +} + +// TestV3LeaseCreateById ensures leases may be created by a given id. +func TestV3LeaseCreateByID(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // create fixed lease + lresp, err := clus.RandClient().Lease.LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 1, TTL: 1}) + if err != nil { + t.Errorf("could not create lease 1 (%v)", err) + } + if lresp.ID != 1 { + t.Errorf("got id %v, wanted id %v", lresp.ID, 1) + } + + // create duplicate fixed lease + lresp, err = clus.RandClient().Lease.LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 1, TTL: 1}) + if err != v3rpc.ErrLeaseExist { + t.Error(err) + } + + // create fresh fixed lease + lresp, err = clus.RandClient().Lease.LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 2, TTL: 1}) + if err != nil { + t.Errorf("could not create lease 2 (%v)", err) + } + if lresp.ID != 2 { + t.Errorf("got id %v, wanted id %v", lresp.ID, 2) + } +} + +// TestV3LeaseExpire ensures a key is deleted once a key expires. +func TestV3LeaseExpire(t *testing.T) { + defer testutil.AfterTest(t) + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { + // let lease lapse; wait for deleted key + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wStream, err := clus.RandClient().Watch.Watch(ctx) + if err != nil { + return err + } + + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: 1}}} + if err := wStream.Send(wreq); err != nil { + return err + } + if _, err := wStream.Recv(); err != nil { + // the 'created' message + return err + } + if _, err := wStream.Recv(); err != nil { + // the 'put' message + return err + } + + errc := make(chan error, 1) + go func() { + resp, err := wStream.Recv() + switch { + case err != nil: + errc <- err + case len(resp.Events) != 1: + fallthrough + case resp.Events[0].Type != storagepb.DELETE: + errc <- fmt.Errorf("expected key delete, got %v", resp) + default: + errc <- nil + } + }() + + select { + case <-time.After(15 * time.Second): + return fmt.Errorf("lease expiration too slow") + case err := <-errc: + return err + } + }) +} + +// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. +func TestV3LeaseKeepAlive(t *testing.T) { + defer testutil.AfterTest(t) + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { + lc := clus.RandClient().Lease + lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lac, err := lc.LeaseKeepAlive(ctx) + if err != nil { + return err + } + defer lac.CloseSend() + + // renew long enough so lease would've expired otherwise + for i := 0; i < 3; i++ { + if err = lac.Send(lreq); err != nil { + return err + } + lresp, rxerr := lac.Recv() + if rxerr != nil { + return rxerr + } + if lresp.ID != leaseID { + return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID) + } + time.Sleep(time.Duration(lresp.TTL/2) * time.Second) + } + _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) + return err + }) +} + +// TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster. +func TestV3LeaseExists(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // create lease + ctx0, cancel0 := context.WithCancel(context.Background()) + defer cancel0() + lresp, err := clus.RandClient().Lease.LeaseCreate( + ctx0, + &pb.LeaseCreateRequest{TTL: 30}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + + if !leaseExist(t, clus, lresp.ID) { + t.Error("unexpected lease not exists") + } +} + +// acquireLeaseAndKey creates a new lease and creates an attached key. +func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { + // create lease + lresp, err := clus.RandClient().Lease.LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{TTL: 1}) + if err != nil { + return 0, err + } + if lresp.Error != "" { + return 0, fmt.Errorf(lresp.Error) + } + // attach to key + put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID} + if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil { + return 0, err + } + return lresp.ID, nil +} + +// testLeaseRemoveLeasedKey performs some action while holding a lease with an +// attached key "foo", then confirms the key is gone. +func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + leaseID, err := acquireLeaseAndKey(clus, "foo") + if err != nil { + t.Fatal(err) + } + + if err = act(clus, leaseID); err != nil { + t.Fatal(err) + } + + // confirm no key + rreq := &pb.RangeRequest{Key: []byte("foo")} + rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 0 { + t.Fatalf("lease removed but key remains") + } +} + +func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool { + l := clus.RandClient().Lease + + _, err := l.LeaseCreate(context.Background(), &pb.LeaseCreateRequest{ID: leaseID, TTL: 5}) + if err == nil { + _, err = l.LeaseRevoke(context.Background(), &pb.LeaseRevokeRequest{ID: leaseID}) + if err != nil { + t.Fatalf("failed to check lease %v", err) + } + return false + } + + if err == v3rpc.ErrLeaseExist { + return true + } + t.Fatalf("unexpecter error %v", err) + + return true +} diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index dfac713f9..4cc1d1832 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -52,6 +52,11 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ttl, err := h.l.Renew(lease.LeaseID(lreq.ID)) if err != nil { + if err == lease.ErrLeaseNotFound { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -88,8 +93,12 @@ func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time. return -1, err } + if resp.StatusCode == http.StatusNotFound { + return -1, lease.ErrLeaseNotFound + } + if resp.StatusCode != http.StatusOK { - return -1, fmt.Errorf("lease: %s", string(b)) + return -1, fmt.Errorf("lease: unknown error(%s)", string(b)) } lresp := &pb.LeaseKeepAliveResponse{} @@ -99,5 +108,5 @@ func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time. if lresp.ID != int64(id) { return -1, fmt.Errorf("lease: renew id mismatch") } - return lresp.TTL, err + return lresp.TTL, nil }