diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 01e981638..513418ac8 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -498,6 +498,8 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event { type LeaseCreateRequest struct { // advisory ttl in seconds TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"` + // requested ID to create; 0 lets lessor choose + ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"` } func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} } @@ -1814,6 +1816,11 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintRpc(data, i, uint64(m.TTL)) } + if m.ID != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.ID)) + } return i, nil } @@ -2335,6 +2342,9 @@ func (m *LeaseCreateRequest) Size() (n int) { if m.TTL != 0 { n += 1 + sovRpc(uint64(m.TTL)) } + if m.ID != 0 { + n += 1 + sovRpc(uint64(m.ID)) + } return n } @@ -4471,6 +4481,22 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error { break } } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + m.ID = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 715ed1b70..06988a997 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -263,6 +263,8 @@ message WatchResponse { message LeaseCreateRequest { // advisory ttl in seconds int64 TTL = 1; + // requested ID to create; 0 lets lessor choose + int64 ID = 2; } message LeaseCreateResponse { diff --git a/etcdserver/server.go b/etcdserver/server.go index 3d9683bed..f7085bee3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -361,7 +361,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if cfg.V3demo { srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) - srv.lessor = lease.NewLessor(uint8(id), srv.be) + srv.lessor = lease.NewLessor(srv.be) srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) } diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 9c2bd402a..9f6c646ad 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -87,11 +87,20 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. } func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + // no id given? choose one + for r.ID == int64(lease.NoLease) { + // only use positive int64 id's + r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) + } result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r}) if err != nil { return nil, err } - return result.resp.(*pb.LeaseCreateResponse), result.err + resp := result.resp.(*pb.LeaseCreateResponse) + if result.err != nil { + resp.Error = result.err.Error() + } + return resp, nil } func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { @@ -462,9 +471,13 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) { } func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { - l := le.Grant(lc.TTL) - - return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil + l, err := le.Grant(lease.LeaseID(lc.ID), lc.TTL) + resp := &pb.LeaseCreateResponse{} + if err == nil { + resp.ID = int64(l.ID) + resp.TTL = l.TTL + } + return resp, err } func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 346911948..1c80bd9cf 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -812,6 +812,7 @@ func (m *member) Launch() error { m.grpcServer = grpc.NewServer() etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s)) etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s)) + etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s)) go m.grpcServer.Serve(m.grpcListener) } return nil diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index dc00f6afa..64438ff43 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/storagepb" ) @@ -1006,3 +1007,97 @@ func TestV3RangeRequest(t *testing.T) { clus.Terminate(t) } } + +// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. +func TestV3LeaseRevoke(t *testing.T) { + testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error { + _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) + return err + }) +} + +// TestV3LeaseCreateById ensures leases may be created by a given id. +func TestV3LeaseCreateByID(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + // create fixed lease + lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 1, TTL: 1}) + if err != nil { + t.Errorf("could not create lease 1 (%v)", err) + } + if lresp.ID != 1 { + t.Errorf("got id %v, wanted id %v", lresp.ID) + } + + // create duplicate fixed lease + lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 1, TTL: 1}) + if err != nil { + t.Error(err) + } + if lresp.ID != 0 || lresp.Error != lease.ErrLeaseExists.Error() { + t.Errorf("got id %v, wanted id 0 (%v)", lresp.ID, lresp.Error) + } + + // create fresh fixed lease + lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{ID: 2, TTL: 1}) + if err != nil { + t.Errorf("could not create lease 2 (%v)", err) + } + if lresp.ID != 2 { + t.Errorf("got id %v, wanted id %v", lresp.ID) + } + +} + +// acquireLeaseAndKey creates a new lease and creates an attached key. +func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) { + // create lease + lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{TTL: 1}) + if err != nil { + return 0, err + } + if lresp.Error != "" { + return 0, fmt.Errorf(lresp.Error) + } + // attach to key + put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID} + if _, err := pb.NewKVClient(clus.RandConn()).Put(context.TODO(), put); err != nil { + return 0, err + } + return lresp.ID, nil +} + +// testLeaseRemoveLeasedKey performs some action while holding a lease with an +// attached key "foo", then confirms the key is gone. +func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + leaseID, err := acquireLeaseAndKey(clus, "foo") + if err != nil { + t.Fatal(err) + } + + if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil { + t.Fatal(err) + } + + // confirm no key + rreq := &pb.RangeRequest{Key: []byte("foo")} + rresp, err := pb.NewKVClient(clus.RandConn()).Range(context.TODO(), rreq) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 0 { + t.Fatalf("lease removed but key remains") + } +} diff --git a/lease/lessor.go b/lease/lessor.go index c71410f64..5898aee02 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -22,7 +22,6 @@ import ( "time" "github.com/coreos/etcd/lease/leasepb" - "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/storage/backend" ) @@ -41,6 +40,7 @@ var ( ErrNotPrimary = errors.New("not a primary lessor") ErrLeaseNotFound = errors.New("lease not found") + ErrLeaseExists = errors.New("lease already exists") ) type LeaseID int64 @@ -62,7 +62,7 @@ type Lessor interface { SetRangeDeleter(dr RangeDeleter) // Grant grants a lease that expires at least after TTL seconds. - Grant(ttl int64) *Lease + Grant(id LeaseID, ttl int64) (*Lease, error) // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. @@ -132,21 +132,13 @@ type lessor struct { stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} - - idgen *idutil.Generator } -func NewLessor(lessorID uint8, b backend.Backend) Lessor { - return newLessor(lessorID, b) +func NewLessor(b backend.Backend) Lessor { + return newLessor(b) } -func newLessor(lessorID uint8, b backend.Backend) *lessor { - // ensure the most significant bit of lessorID is 0. - // so all the IDs generated by id generator will be greater than 0. - if int8(lessorID) < 0 { - lessorID = uint8(-int8(lessorID)) - } - +func newLessor(b backend.Backend) *lessor { l := &lessor{ leaseMap: make(map[LeaseID]*Lease), b: b, @@ -154,7 +146,6 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor { expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), - idgen: idutil.NewGenerator(lessorID, time.Now()), } l.initAndRecover() @@ -172,13 +163,19 @@ func (le *lessor) SetRangeDeleter(rd RangeDeleter) { // TODO: when lessor is under high load, it should give out lease // with longer TTL to reduce renew load. -func (le *lessor) Grant(ttl int64) *Lease { - id := LeaseID(le.idgen.Next()) +func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { + if id == NoLease { + return nil, ErrLeaseNotFound + } + + l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})} le.mu.Lock() defer le.mu.Unlock() - l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})} + if _, ok := le.leaseMap[id]; ok { + return nil, ErrLeaseExists + } if le.primary { l.refresh() @@ -186,14 +183,10 @@ func (le *lessor) Grant(ttl int64) *Lease { l.forever() } - if _, ok := le.leaseMap[id]; ok { - panic("lease: unexpected duplicate ID!") - } - le.leaseMap[id] = l l.persistTo(le.b) - return l + return l, nil } func (le *lessor) Revoke(id LeaseID) error { @@ -450,7 +443,7 @@ type FakeLessor struct { func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {} -func (fl *FakeLessor) Grant(ttl int64) *Lease { return nil } +func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil } func (fl *FakeLessor) Revoke(id LeaseID) error { return nil } diff --git a/lease/lessor_test.go b/lease/lessor_test.go index ebc0a4d9d..d1e44654c 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -33,10 +33,13 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(1, be) + le := newLessor(be) le.Promote() - l := le.Grant(1) + l, err := le.Grant(1, 1) + if err != nil { + t.Fatalf("could not grant lease 1 (%v)", err) + } gl := le.get(l.ID) if !reflect.DeepEqual(gl, l) { @@ -46,7 +49,15 @@ func TestLessorGrant(t *testing.T) { t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), time.Duration(minLeaseTTL)*time.Second-time.Second) } - nl := le.Grant(1) + nl, err := le.Grant(1, 1) + if err == nil { + t.Errorf("allocated the same lease") + } + + nl, err = le.Grant(2, 1) + if err != nil { + t.Errorf("could not grant lease 2 (%v)", err) + } if nl.ID == l.ID { t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID) } @@ -70,25 +81,26 @@ func TestLessorRevoke(t *testing.T) { fd := &fakeDeleter{} - le := newLessor(1, be) + le := newLessor(be) le.SetRangeDeleter(fd) // grant a lease with long term (100 seconds) to // avoid early termination during the test. - l := le.Grant(100) + l, err := le.Grant(1, 100) + if err != nil { + t.Fatalf("could not grant lease for 100s ttl (%v)", err) + } items := []LeaseItem{ {"foo"}, {"bar"}, } - err := le.Attach(l.ID, items) - if err != nil { + if err := le.Attach(l.ID, items); err != nil { t.Fatalf("failed to attach items to the lease: %v", err) } - err = le.Revoke(l.ID) - if err != nil { + if err = le.Revoke(l.ID); err != nil { t.Fatal("failed to revoke lease:", err) } @@ -115,10 +127,13 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(1, be) + le := newLessor(be) le.Promote() - l := le.Grant(5) + l, err := le.Grant(1, 5) + if err != nil { + t.Fatalf("failed to grant lease (%v)", err) + } // manually change the ttl field l.TTL = 10 @@ -143,12 +158,15 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(1, be) - l1 := le.Grant(10) - l2 := le.Grant(20) + le := newLessor(be) + l1, err1 := le.Grant(1, 10) + l2, err2 := le.Grant(2, 20) + if err1 != nil || err2 != nil { + t.Fatalf("could not grant initial leases (%v, %v)", err1, err2) + } // Create a new lessor with the same backend - nle := newLessor(1, be) + nle := newLessor(be) nl1 := nle.get(l1.ID) if nl1 == nil || nl1.TTL != l1.TTL { t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)