lease: modify API and persist lease to disk

This commit is contained in:
Xiang Li 2016-01-04 22:53:11 -08:00
parent b3ad736d2a
commit 25f82b25f7
5 changed files with 432 additions and 39 deletions

282
lease/leasepb/lease.pb.go Normal file
View File

@ -0,0 +1,282 @@
// Code generated by protoc-gen-gogo.
// source: lease.proto
// DO NOT EDIT!
/*
Package leasepb is a generated protocol buffer package.
It is generated from these files:
lease.proto
It has these top-level messages:
Lease
*/
package leasepb
import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto"
import io "io"
import fmt "fmt"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
type Lease struct {
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"`
}
func (m *Lease) Reset() { *m = Lease{} }
func (m *Lease) String() string { return proto.CompactTextString(m) }
func (*Lease) ProtoMessage() {}
func (m *Lease) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Lease) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.ID != 0 {
data[i] = 0x8
i++
i = encodeVarintLease(data, i, uint64(m.ID))
}
if m.TTL != 0 {
data[i] = 0x10
i++
i = encodeVarintLease(data, i, uint64(m.TTL))
}
return i, nil
}
func encodeFixed64Lease(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Lease(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintLease(data []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return offset + 1
}
func (m *Lease) Size() (n int) {
var l int
_ = l
if m.ID != 0 {
n += 1 + sovLease(uint64(m.ID))
}
if m.TTL != 0 {
n += 1 + sovLease(uint64(m.TTL))
}
return n
}
func sovLease(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozLease(x uint64) (n int) {
return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Lease) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
m.ID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
}
m.TTL = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.TTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
skippy, err := skipLease(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLease
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
return nil
}
func skipLease(data []byte) (n int, err error) {
l := len(data)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if data[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthLease
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipLease(data[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling")
)

15
lease/leasepb/lease.proto Normal file
View File

@ -0,0 +1,15 @@
syntax = "proto3";
package leasepb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message Lease {
int64 ID = 1;
int64 TTL = 2;
}

View File

@ -15,15 +15,20 @@
package lease
import (
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/storage/backend"
)
var (
minLeaseTerm = 5 * time.Second
leaseBucketName = []byte("lease")
)
// DeleteableRange defines an interface with DeleteRange method.
@ -37,7 +42,6 @@ type DeleteableRange interface {
// a lessor is the owner of leases. It can grant, revoke,
// renew and modify leases for lessee.
// TODO: persist lease on to stable backend for failure recovery.
// TODO: use clockwork for testability.
type lessor struct {
mu sync.Mutex
@ -47,49 +51,67 @@ type lessor struct {
// We want to make Grant, Revoke, and FindExpired all O(logN) and
// Renew O(1).
// FindExpired and Renew should be the most frequent operations.
leaseMap map[uint64]*lease
leaseMap map[int64]*lease
// A DeleteableRange the lessor operates on.
// When a lease expires, the lessor will delete the
// leased range (or key) from the DeleteableRange.
dr DeleteableRange
// backend to persist leases. We only persist lease ID and expiry for now.
// The leased items can be recovered by iterating all the keys in kv.
b backend.Backend
idgen *idutil.Generator
}
func NewLessor(lessorID uint8, dr DeleteableRange) *lessor {
return &lessor{
leaseMap: make(map[uint64]*lease),
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
l := &lessor{
leaseMap: make(map[int64]*lease),
b: b,
dr: dr,
idgen: idutil.NewGenerator(lessorID, time.Now()),
}
tx := l.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(leaseBucketName)
tx.Unlock()
l.b.ForceCommit()
// TODO: recover from previous state in backend.
return l
}
// Grant grants a lease that expires at least at the given expiry
// time.
// TODO: when leassor is under highload, it should give out lease
// with longer term to reduce renew load.
func (le *lessor) Grant(expiry time.Time) *lease {
// Grant grants a lease that expires at least after TTL seconds.
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
func (le *lessor) Grant(ttl int64) *lease {
// TODO: define max TTL
expiry := time.Now().Add(time.Duration(ttl) * time.Second)
expiry = minExpiry(time.Now(), expiry)
id := le.idgen.Next()
id := int64(le.idgen.Next())
le.mu.Lock()
defer le.mu.Unlock()
l := &lease{id: id, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
if _, ok := le.leaseMap[id]; ok {
panic("lease: unexpected duplicate ID!")
}
le.leaseMap[id] = l
l.persistTo(le.b)
return l
}
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
func (le *lessor) Revoke(id uint64) error {
func (le *lessor) Revoke(id int64) error {
le.mu.Lock()
defer le.mu.Unlock()
@ -98,19 +120,20 @@ func (le *lessor) Revoke(id uint64) error {
return fmt.Errorf("lease: cannot find lease %x", id)
}
delete(le.leaseMap, l.id)
for item := range l.itemSet {
le.dr.DeleteRange([]byte(item.key), []byte(item.endRange))
le.dr.DeleteRange([]byte(item.key), nil)
}
delete(le.leaseMap, l.id)
l.removeFrom(le.b)
return nil
}
// Renew renews an existing lease with at least the given expiry.
// If the given lease does not exist or has expired, an error will
// be returned.
func (le *lessor) Renew(id uint64, expiry time.Time) error {
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
// TODO: return new TTL?
func (le *lessor) Renew(id int64) error {
le.mu.Lock()
defer le.mu.Unlock()
@ -119,6 +142,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error {
return fmt.Errorf("lease: cannot find lease %x", id)
}
expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
l.expiry = minExpiry(time.Now(), expiry)
return nil
}
@ -126,7 +150,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error {
// Attach attaches items to the lease with given ID. When the lease
// expires, the attached items will be automatically removed.
// If the given lease does not exist, an error will be returned.
func (le *lessor) Attach(id uint64, items []leaseItem) error {
func (le *lessor) Attach(id int64, items []leaseItem) error {
le.mu.Lock()
defer le.mu.Unlock()
@ -161,7 +185,7 @@ func (le *lessor) findExpiredLeases() []*lease {
// get gets the lease with given id.
// get is a helper fucntion for testing, at least for now.
func (le *lessor) get(id uint64) *lease {
func (le *lessor) get(id int64) *lease {
le.mu.Lock()
defer le.mu.Unlock()
@ -169,16 +193,38 @@ func (le *lessor) get(id uint64) *lease {
}
type lease struct {
id uint64
id int64
ttl int64 // time to live in seconds
itemSet map[leaseItem]struct{}
// expiry time in unixnano
expiry time.Time
}
func (l lease) persistTo(b backend.Backend) {
key := int64ToBytes(l.id)
lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
b.BatchTx().Unlock()
}
func (l lease) removeFrom(b backend.Backend) {
key := int64ToBytes(l.id)
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)
b.BatchTx().Unlock()
}
type leaseItem struct {
key string
endRange string
key string
}
// minExpiry returns a minimal expiry. A minimal expiry is the larger on
@ -190,3 +236,9 @@ func minExpiry(now time.Time, expectedExpiry time.Time) time.Time {
}
return expectedExpiry
}
func int64ToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}

View File

@ -15,18 +15,27 @@
package lease
import (
"io/ioutil"
"os"
"path"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/storage/backend"
)
// TestLessorGrant ensures Lessor can grant wanted lease.
// The granted lease should have a unique ID with a term
// that is greater than minLeaseTerm.
func TestLessorGrant(t *testing.T) {
le := NewLessor(1, &fakeDeleteable{})
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
l := le.Grant(time.Now().Add(time.Second))
le := NewLessor(1, be, &fakeDeleteable{})
l := le.Grant(1)
gl := le.get(l.id)
if !reflect.DeepEqual(gl, l) {
@ -36,10 +45,17 @@ func TestLessorGrant(t *testing.T) {
t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), minLeaseTerm-time.Second)
}
nl := le.Grant(time.Now().Add(time.Second))
nl := le.Grant(1)
if nl.id == l.id {
t.Errorf("new lease.id = %x, want != %x", nl.id, l.id)
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
}
be.BatchTx().Unlock()
}
// TestLessorRevoke ensures Lessor can revoke a lease.
@ -47,16 +63,21 @@ func TestLessorGrant(t *testing.T) {
// the DeleteableKV.
// The revoked lease cannot be got from Lessor again.
func TestLessorRevoke(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
fd := &fakeDeleteable{}
le := NewLessor(1, fd)
le := NewLessor(1, be, fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l := le.Grant(time.Now().Add(100 * time.Second))
l := le.Grant(100)
items := []leaseItem{
{"foo", ""},
{"bar", "zar"},
{"foo"},
{"bar"},
}
err := le.Attach(l.id, items)
@ -73,22 +94,36 @@ func TestLessorRevoke(t *testing.T) {
t.Errorf("got revoked lease %x", l.id)
}
wdeleted := []string{"foo_", "bar_zar"}
wdeleted := []string{"foo_", "bar_"}
if !reflect.DeepEqual(fd.deleted, wdeleted) {
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
}
be.BatchTx().Unlock()
}
// TestLessorRenew ensures Lessor can renew an existing lease.
func TestLessorRenew(t *testing.T) {
le := NewLessor(1, &fakeDeleteable{})
l := le.Grant(time.Now().Add(5 * time.Second))
dir, be := NewTestBackend(t)
defer be.Close()
defer os.RemoveAll(dir)
le.Renew(l.id, time.Now().Add(100*time.Second))
le := NewLessor(1, be, &fakeDeleteable{})
l := le.Grant(5)
// manually change the ttl field
l.ttl = 10
le.Renew(l.id)
l = le.get(l.id)
if l.expiry.Sub(time.Now()) < 95*time.Second {
t.Errorf("failed to renew the lease for 100 seconds")
if l.expiry.Sub(time.Now()) < 9*time.Second {
t.Errorf("failed to renew the lease")
}
}
@ -100,3 +135,12 @@ func (fd *fakeDeleteable) DeleteRange(key, end []byte) (int64, int64) {
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
return 0, 0
}
func NewTestBackend(t *testing.T) (string, backend.Backend) {
tmpPath, err := ioutil.TempDir("", "lease")
if err != nil {
t.Fatalf("failed to create tmpdir (%v)", err)
}
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
}

View File

@ -20,7 +20,7 @@ PREFIX="github.com/coreos/etcd/Godeps/_workspace/src"
ESCAPED_PREFIX=$(echo $PREFIX | sed -e 's/[\/&]/\\&/g')
# directories containing protos to be built
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb"
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb ./lease/leasepb"
# exact version of protoc-gen-gogo to build
SHA="932b70afa8b0bf4a8e167fdf0c3367cebba45903"