Merge pull request #4253 from heyitsanthony/v3-lease-grant-consistency

lease: grant consistent lease IDs
This commit is contained in:
Anthony Romano 2016-01-22 11:10:12 -08:00
commit 2b54c5a977
8 changed files with 191 additions and 43 deletions

View File

@ -498,6 +498,8 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event {
type LeaseCreateRequest struct {
// advisory ttl in seconds
TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"`
// requested ID to create; 0 lets lessor choose
ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"`
}
func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} }
@ -1814,6 +1816,11 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) {
i++
i = encodeVarintRpc(data, i, uint64(m.TTL))
}
if m.ID != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.ID))
}
return i, nil
}
@ -2335,6 +2342,9 @@ func (m *LeaseCreateRequest) Size() (n int) {
if m.TTL != 0 {
n += 1 + sovRpc(uint64(m.TTL))
}
if m.ID != 0 {
n += 1 + sovRpc(uint64(m.ID))
}
return n
}
@ -4471,6 +4481,22 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {

View File

@ -263,6 +263,8 @@ message WatchResponse {
message LeaseCreateRequest {
// advisory ttl in seconds
int64 TTL = 1;
// requested ID to create; 0 lets lessor choose
int64 ID = 2;
}
message LeaseCreateResponse {

View File

@ -361,7 +361,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.V3demo {
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.lessor = lease.NewLessor(uint8(id), srv.be)
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
}

View File

@ -87,11 +87,20 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
// no id given? choose one
for r.ID == int64(lease.NoLease) {
// only use positive int64 id's
r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseCreateResponse), result.err
resp := result.resp.(*pb.LeaseCreateResponse)
if result.err != nil {
resp.Error = result.err.Error()
}
return resp, nil
}
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
@ -462,9 +471,13 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
}
func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
l := le.Grant(lc.TTL)
return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil
l, err := le.Grant(lease.LeaseID(lc.ID), lc.TTL)
resp := &pb.LeaseCreateResponse{}
if err == nil {
resp.ID = int64(l.ID)
resp.TTL = l.TTL
}
return resp, err
}
func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {

View File

@ -812,6 +812,7 @@ func (m *member) Launch() error {
m.grpcServer = grpc.NewServer()
etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
go m.grpcServer.Serve(m.grpcListener)
}
return nil

View File

@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/storage/storagepb"
)
@ -1006,3 +1007,97 @@ func TestV3RangeRequest(t *testing.T) {
clus.Terminate(t)
}
}
// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
func TestV3LeaseRevoke(t *testing.T) {
testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error {
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
return err
})
}
// TestV3LeaseCreateById ensures leases may be created by a given id.
func TestV3LeaseCreateByID(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
// create fixed lease
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
if err != nil {
t.Errorf("could not create lease 1 (%v)", err)
}
if lresp.ID != 1 {
t.Errorf("got id %v, wanted id %v", lresp.ID)
}
// create duplicate fixed lease
lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
if err != nil {
t.Error(err)
}
if lresp.ID != 0 || lresp.Error != lease.ErrLeaseExists.Error() {
t.Errorf("got id %v, wanted id 0 (%v)", lresp.ID, lresp.Error)
}
// create fresh fixed lease
lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 2, TTL: 1})
if err != nil {
t.Errorf("could not create lease 2 (%v)", err)
}
if lresp.ID != 2 {
t.Errorf("got id %v, wanted id %v", lresp.ID)
}
}
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// create lease
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{TTL: 1})
if err != nil {
return 0, err
}
if lresp.Error != "" {
return 0, fmt.Errorf(lresp.Error)
}
// attach to key
put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
if _, err := pb.NewKVClient(clus.RandConn()).Put(context.TODO(), put); err != nil {
return 0, err
}
return lresp.ID, nil
}
// testLeaseRemoveLeasedKey performs some action while holding a lease with an
// attached key "foo", then confirms the key is gone.
func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
leaseID, err := acquireLeaseAndKey(clus, "foo")
if err != nil {
t.Fatal(err)
}
if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil {
t.Fatal(err)
}
// confirm no key
rreq := &pb.RangeRequest{Key: []byte("foo")}
rresp, err := pb.NewKVClient(clus.RandConn()).Range(context.TODO(), rreq)
if err != nil {
t.Fatal(err)
}
if len(rresp.Kvs) != 0 {
t.Fatalf("lease removed but key remains")
}
}

View File

@ -22,7 +22,6 @@ import (
"time"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/storage/backend"
)
@ -41,6 +40,7 @@ var (
ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
ErrLeaseExists = errors.New("lease already exists")
)
type LeaseID int64
@ -62,7 +62,7 @@ type Lessor interface {
SetRangeDeleter(dr RangeDeleter)
// Grant grants a lease that expires at least after TTL seconds.
Grant(ttl int64) *Lease
Grant(id LeaseID, ttl int64) (*Lease, error)
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
@ -132,21 +132,13 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}
idgen *idutil.Generator
}
func NewLessor(lessorID uint8, b backend.Backend) Lessor {
return newLessor(lessorID, b)
func NewLessor(b backend.Backend) Lessor {
return newLessor(b)
}
func newLessor(lessorID uint8, b backend.Backend) *lessor {
// ensure the most significant bit of lessorID is 0.
// so all the IDs generated by id generator will be greater than 0.
if int8(lessorID) < 0 {
lessorID = uint8(-int8(lessorID))
}
func newLessor(b backend.Backend) *lessor {
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
b: b,
@ -154,7 +146,6 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor {
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
idgen: idutil.NewGenerator(lessorID, time.Now()),
}
l.initAndRecover()
@ -172,13 +163,19 @@ func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
func (le *lessor) Grant(ttl int64) *Lease {
id := LeaseID(le.idgen.Next())
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
if id == NoLease {
return nil, ErrLeaseNotFound
}
l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
le.mu.Lock()
defer le.mu.Unlock()
l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
if _, ok := le.leaseMap[id]; ok {
return nil, ErrLeaseExists
}
if le.primary {
l.refresh()
@ -186,14 +183,10 @@ func (le *lessor) Grant(ttl int64) *Lease {
l.forever()
}
if _, ok := le.leaseMap[id]; ok {
panic("lease: unexpected duplicate ID!")
}
le.leaseMap[id] = l
l.persistTo(le.b)
return l
return l, nil
}
func (le *lessor) Revoke(id LeaseID) error {
@ -450,7 +443,7 @@ type FakeLessor struct {
func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
func (fl *FakeLessor) Grant(ttl int64) *Lease { return nil }
func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }

View File

@ -33,10 +33,13 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(1, be)
le := newLessor(be)
le.Promote()
l := le.Grant(1)
l, err := le.Grant(1, 1)
if err != nil {
t.Fatalf("could not grant lease 1 (%v)", err)
}
gl := le.get(l.ID)
if !reflect.DeepEqual(gl, l) {
@ -46,7 +49,15 @@ func TestLessorGrant(t *testing.T) {
t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), time.Duration(minLeaseTTL)*time.Second-time.Second)
}
nl := le.Grant(1)
nl, err := le.Grant(1, 1)
if err == nil {
t.Errorf("allocated the same lease")
}
nl, err = le.Grant(2, 1)
if err != nil {
t.Errorf("could not grant lease 2 (%v)", err)
}
if nl.ID == l.ID {
t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
}
@ -70,25 +81,26 @@ func TestLessorRevoke(t *testing.T) {
fd := &fakeDeleter{}
le := newLessor(1, be)
le := newLessor(be)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l := le.Grant(100)
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}
items := []LeaseItem{
{"foo"},
{"bar"},
}
err := le.Attach(l.ID, items)
if err != nil {
if err := le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
err = le.Revoke(l.ID)
if err != nil {
if err = le.Revoke(l.ID); err != nil {
t.Fatal("failed to revoke lease:", err)
}
@ -115,10 +127,13 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(1, be)
le := newLessor(be)
le.Promote()
l := le.Grant(5)
l, err := le.Grant(1, 5)
if err != nil {
t.Fatalf("failed to grant lease (%v)", err)
}
// manually change the ttl field
l.TTL = 10
@ -143,12 +158,15 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(1, be)
l1 := le.Grant(10)
l2 := le.Grant(20)
le := newLessor(be)
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
if err1 != nil || err2 != nil {
t.Fatalf("could not grant initial leases (%v, %v)", err1, err2)
}
// Create a new lessor with the same backend
nle := newLessor(1, be)
nle := newLessor(be)
nl1 := nle.get(l1.ID)
if nl1 == nil || nl1.TTL != l1.TTL {
t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)