From 25f82b25f7ca7e97fb4c258ea7ae17fda72d9dbf Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 4 Jan 2016 22:53:11 -0800 Subject: [PATCH] lease: modify API and persist lease to disk --- lease/leasepb/lease.pb.go | 282 ++++++++++++++++++++++++++++++++++++++ lease/leasepb/lease.proto | 15 ++ lease/lessor.go | 102 ++++++++++---- lease/lessor_test.go | 70 ++++++++-- scripts/genproto.sh | 2 +- 5 files changed, 432 insertions(+), 39 deletions(-) create mode 100644 lease/leasepb/lease.pb.go create mode 100644 lease/leasepb/lease.proto diff --git a/lease/leasepb/lease.pb.go b/lease/leasepb/lease.pb.go new file mode 100644 index 000000000..3d14027b8 --- /dev/null +++ b/lease/leasepb/lease.pb.go @@ -0,0 +1,282 @@ +// Code generated by protoc-gen-gogo. +// source: lease.proto +// DO NOT EDIT! + +/* + Package leasepb is a generated protocol buffer package. + + It is generated from these files: + lease.proto + + It has these top-level messages: + Lease +*/ +package leasepb + +import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + +// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +type Lease struct { + ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` + TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"` +} + +func (m *Lease) Reset() { *m = Lease{} } +func (m *Lease) String() string { return proto.CompactTextString(m) } +func (*Lease) ProtoMessage() {} + +func (m *Lease) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Lease) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.ID != 0 { + data[i] = 0x8 + i++ + i = encodeVarintLease(data, i, uint64(m.ID)) + } + if m.TTL != 0 { + data[i] = 0x10 + i++ + i = encodeVarintLease(data, i, uint64(m.TTL)) + } + return i, nil +} + +func encodeFixed64Lease(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Lease(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLease(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Lease) Size() (n int) { + var l int + _ = l + if m.ID != 0 { + n += 1 + sovLease(uint64(m.ID)) + } + if m.TTL != 0 { + n += 1 + sovLease(uint64(m.TTL)) + } + return n +} + +func sovLease(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLease(x uint64) (n int) { + return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Lease) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + m.ID = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) + } + m.TTL = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.TTL |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipLease(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func skipLease(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthLease + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLease(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling") +) diff --git a/lease/leasepb/lease.proto b/lease/leasepb/lease.proto new file mode 100644 index 000000000..f2c996c8a --- /dev/null +++ b/lease/leasepb/lease.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package leasepb; + +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; + +message Lease { + int64 ID = 1; + int64 TTL = 2; +} diff --git a/lease/lessor.go b/lease/lessor.go index 73717dbe6..ab70dc152 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -15,15 +15,20 @@ package lease import ( + "encoding/binary" "fmt" "sync" "time" + "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/pkg/idutil" + "github.com/coreos/etcd/storage/backend" ) var ( minLeaseTerm = 5 * time.Second + + leaseBucketName = []byte("lease") ) // DeleteableRange defines an interface with DeleteRange method. @@ -37,7 +42,6 @@ type DeleteableRange interface { // a lessor is the owner of leases. It can grant, revoke, // renew and modify leases for lessee. -// TODO: persist lease on to stable backend for failure recovery. // TODO: use clockwork for testability. type lessor struct { mu sync.Mutex @@ -47,49 +51,67 @@ type lessor struct { // We want to make Grant, Revoke, and FindExpired all O(logN) and // Renew O(1). // FindExpired and Renew should be the most frequent operations. - leaseMap map[uint64]*lease + leaseMap map[int64]*lease // A DeleteableRange the lessor operates on. // When a lease expires, the lessor will delete the // leased range (or key) from the DeleteableRange. dr DeleteableRange + // backend to persist leases. We only persist lease ID and expiry for now. + // The leased items can be recovered by iterating all the keys in kv. + b backend.Backend + idgen *idutil.Generator } -func NewLessor(lessorID uint8, dr DeleteableRange) *lessor { - return &lessor{ - leaseMap: make(map[uint64]*lease), +func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { + l := &lessor{ + leaseMap: make(map[int64]*lease), + b: b, dr: dr, idgen: idutil.NewGenerator(lessorID, time.Now()), } + + tx := l.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(leaseBucketName) + tx.Unlock() + l.b.ForceCommit() + + // TODO: recover from previous state in backend. + + return l } -// Grant grants a lease that expires at least at the given expiry -// time. -// TODO: when leassor is under highload, it should give out lease -// with longer term to reduce renew load. -func (le *lessor) Grant(expiry time.Time) *lease { +// Grant grants a lease that expires at least after TTL seconds. +// TODO: when lessor is under high load, it should give out lease +// with longer TTL to reduce renew load. +func (le *lessor) Grant(ttl int64) *lease { + // TODO: define max TTL + expiry := time.Now().Add(time.Duration(ttl) * time.Second) expiry = minExpiry(time.Now(), expiry) - id := le.idgen.Next() + id := int64(le.idgen.Next()) le.mu.Lock() defer le.mu.Unlock() - l := &lease{id: id, expiry: expiry, itemSet: make(map[leaseItem]struct{})} + l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})} if _, ok := le.leaseMap[id]; ok { panic("lease: unexpected duplicate ID!") } le.leaseMap[id] = l + l.persistTo(le.b) + return l } // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. -func (le *lessor) Revoke(id uint64) error { +func (le *lessor) Revoke(id int64) error { le.mu.Lock() defer le.mu.Unlock() @@ -98,19 +120,20 @@ func (le *lessor) Revoke(id uint64) error { return fmt.Errorf("lease: cannot find lease %x", id) } - delete(le.leaseMap, l.id) - for item := range l.itemSet { - le.dr.DeleteRange([]byte(item.key), []byte(item.endRange)) + le.dr.DeleteRange([]byte(item.key), nil) } + delete(le.leaseMap, l.id) + l.removeFrom(le.b) + return nil } -// Renew renews an existing lease with at least the given expiry. -// If the given lease does not exist or has expired, an error will -// be returned. -func (le *lessor) Renew(id uint64, expiry time.Time) error { +// Renew renews an existing lease. If the given lease does not exist or +// has expired, an error will be returned. +// TODO: return new TTL? +func (le *lessor) Renew(id int64) error { le.mu.Lock() defer le.mu.Unlock() @@ -119,6 +142,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error { return fmt.Errorf("lease: cannot find lease %x", id) } + expiry := time.Now().Add(time.Duration(l.ttl) * time.Second) l.expiry = minExpiry(time.Now(), expiry) return nil } @@ -126,7 +150,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error { // Attach attaches items to the lease with given ID. When the lease // expires, the attached items will be automatically removed. // If the given lease does not exist, an error will be returned. -func (le *lessor) Attach(id uint64, items []leaseItem) error { +func (le *lessor) Attach(id int64, items []leaseItem) error { le.mu.Lock() defer le.mu.Unlock() @@ -161,7 +185,7 @@ func (le *lessor) findExpiredLeases() []*lease { // get gets the lease with given id. // get is a helper fucntion for testing, at least for now. -func (le *lessor) get(id uint64) *lease { +func (le *lessor) get(id int64) *lease { le.mu.Lock() defer le.mu.Unlock() @@ -169,16 +193,38 @@ func (le *lessor) get(id uint64) *lease { } type lease struct { - id uint64 + id int64 + ttl int64 // time to live in seconds itemSet map[leaseItem]struct{} // expiry time in unixnano expiry time.Time } +func (l lease) persistTo(b backend.Backend) { + key := int64ToBytes(l.id) + + lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)} + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + + b.BatchTx().Lock() + b.BatchTx().UnsafePut(leaseBucketName, key, val) + b.BatchTx().Unlock() +} + +func (l lease) removeFrom(b backend.Backend) { + key := int64ToBytes(l.id) + + b.BatchTx().Lock() + b.BatchTx().UnsafeDelete(leaseBucketName, key) + b.BatchTx().Unlock() +} + type leaseItem struct { - key string - endRange string + key string } // minExpiry returns a minimal expiry. A minimal expiry is the larger on @@ -190,3 +236,9 @@ func minExpiry(now time.Time, expectedExpiry time.Time) time.Time { } return expectedExpiry } + +func int64ToBytes(n int64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(n)) + return bytes +} diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 548c57c79..e6078b1bc 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -15,18 +15,27 @@ package lease import ( + "io/ioutil" + "os" + "path" "reflect" "testing" "time" + + "github.com/coreos/etcd/storage/backend" ) // TestLessorGrant ensures Lessor can grant wanted lease. // The granted lease should have a unique ID with a term // that is greater than minLeaseTerm. func TestLessorGrant(t *testing.T) { - le := NewLessor(1, &fakeDeleteable{}) + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() - l := le.Grant(time.Now().Add(time.Second)) + le := NewLessor(1, be, &fakeDeleteable{}) + + l := le.Grant(1) gl := le.get(l.id) if !reflect.DeepEqual(gl, l) { @@ -36,10 +45,17 @@ func TestLessorGrant(t *testing.T) { t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), minLeaseTerm-time.Second) } - nl := le.Grant(time.Now().Add(time.Second)) + nl := le.Grant(1) if nl.id == l.id { t.Errorf("new lease.id = %x, want != %x", nl.id, l.id) } + + be.BatchTx().Lock() + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + if len(vs) != 1 { + t.Errorf("len(vs) = %d, want 1", len(vs)) + } + be.BatchTx().Unlock() } // TestLessorRevoke ensures Lessor can revoke a lease. @@ -47,16 +63,21 @@ func TestLessorGrant(t *testing.T) { // the DeleteableKV. // The revoked lease cannot be got from Lessor again. func TestLessorRevoke(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + fd := &fakeDeleteable{} - le := NewLessor(1, fd) + + le := NewLessor(1, be, fd) // grant a lease with long term (100 seconds) to // avoid early termination during the test. - l := le.Grant(time.Now().Add(100 * time.Second)) + l := le.Grant(100) items := []leaseItem{ - {"foo", ""}, - {"bar", "zar"}, + {"foo"}, + {"bar"}, } err := le.Attach(l.id, items) @@ -73,22 +94,36 @@ func TestLessorRevoke(t *testing.T) { t.Errorf("got revoked lease %x", l.id) } - wdeleted := []string{"foo_", "bar_zar"} + wdeleted := []string{"foo_", "bar_"} if !reflect.DeepEqual(fd.deleted, wdeleted) { t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted) } + + be.BatchTx().Lock() + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + if len(vs) != 0 { + t.Errorf("len(vs) = %d, want 0", len(vs)) + } + be.BatchTx().Unlock() } // TestLessorRenew ensures Lessor can renew an existing lease. func TestLessorRenew(t *testing.T) { - le := NewLessor(1, &fakeDeleteable{}) - l := le.Grant(time.Now().Add(5 * time.Second)) + dir, be := NewTestBackend(t) + defer be.Close() + defer os.RemoveAll(dir) - le.Renew(l.id, time.Now().Add(100*time.Second)) + le := NewLessor(1, be, &fakeDeleteable{}) + l := le.Grant(5) + + // manually change the ttl field + l.ttl = 10 + + le.Renew(l.id) l = le.get(l.id) - if l.expiry.Sub(time.Now()) < 95*time.Second { - t.Errorf("failed to renew the lease for 100 seconds") + if l.expiry.Sub(time.Now()) < 9*time.Second { + t.Errorf("failed to renew the lease") } } @@ -100,3 +135,12 @@ func (fd *fakeDeleteable) DeleteRange(key, end []byte) (int64, int64) { fd.deleted = append(fd.deleted, string(key)+"_"+string(end)) return 0, 0 } + +func NewTestBackend(t *testing.T) (string, backend.Backend) { + tmpPath, err := ioutil.TempDir("", "lease") + if err != nil { + t.Fatalf("failed to create tmpdir (%v)", err) + } + + return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000) +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 1176473ba..327a45a1e 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -20,7 +20,7 @@ PREFIX="github.com/coreos/etcd/Godeps/_workspace/src" ESCAPED_PREFIX=$(echo $PREFIX | sed -e 's/[\/&]/\\&/g') # directories containing protos to be built -DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb" +DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb ./lease/leasepb" # exact version of protoc-gen-gogo to build SHA="932b70afa8b0bf4a8e167fdf0c3367cebba45903"