mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4134 from xiang90/lease
lease: modify API and persist lease to disk
This commit is contained in:
commit
1e61243fd7
@ -21,6 +21,7 @@ import (
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/lease"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -143,12 +144,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
|
||||
err error
|
||||
)
|
||||
if txnID != noTxn {
|
||||
rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease))
|
||||
rev, err = kv.TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease))
|
||||
rev = kv.Put(p.Key, p.Value, lease.LeaseID(p.Lease))
|
||||
}
|
||||
resp.Header.Revision = rev
|
||||
return resp, nil
|
||||
|
282
lease/leasepb/lease.pb.go
Normal file
282
lease/leasepb/lease.pb.go
Normal file
@ -0,0 +1,282 @@
|
||||
// Code generated by protoc-gen-gogo.
|
||||
// source: lease.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package leasepb is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
lease.proto
|
||||
|
||||
It has these top-level messages:
|
||||
Lease
|
||||
*/
|
||||
package leasepb
|
||||
|
||||
import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
|
||||
// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto"
|
||||
|
||||
import io "io"
|
||||
import fmt "fmt"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
|
||||
type Lease struct {
|
||||
ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
|
||||
TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"`
|
||||
}
|
||||
|
||||
func (m *Lease) Reset() { *m = Lease{} }
|
||||
func (m *Lease) String() string { return proto.CompactTextString(m) }
|
||||
func (*Lease) ProtoMessage() {}
|
||||
|
||||
func (m *Lease) Marshal() (data []byte, err error) {
|
||||
size := m.Size()
|
||||
data = make([]byte, size)
|
||||
n, err := m.MarshalTo(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data[:n], nil
|
||||
}
|
||||
|
||||
func (m *Lease) MarshalTo(data []byte) (int, error) {
|
||||
var i int
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.ID != 0 {
|
||||
data[i] = 0x8
|
||||
i++
|
||||
i = encodeVarintLease(data, i, uint64(m.ID))
|
||||
}
|
||||
if m.TTL != 0 {
|
||||
data[i] = 0x10
|
||||
i++
|
||||
i = encodeVarintLease(data, i, uint64(m.TTL))
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func encodeFixed64Lease(data []byte, offset int, v uint64) int {
|
||||
data[offset] = uint8(v)
|
||||
data[offset+1] = uint8(v >> 8)
|
||||
data[offset+2] = uint8(v >> 16)
|
||||
data[offset+3] = uint8(v >> 24)
|
||||
data[offset+4] = uint8(v >> 32)
|
||||
data[offset+5] = uint8(v >> 40)
|
||||
data[offset+6] = uint8(v >> 48)
|
||||
data[offset+7] = uint8(v >> 56)
|
||||
return offset + 8
|
||||
}
|
||||
func encodeFixed32Lease(data []byte, offset int, v uint32) int {
|
||||
data[offset] = uint8(v)
|
||||
data[offset+1] = uint8(v >> 8)
|
||||
data[offset+2] = uint8(v >> 16)
|
||||
data[offset+3] = uint8(v >> 24)
|
||||
return offset + 4
|
||||
}
|
||||
func encodeVarintLease(data []byte, offset int, v uint64) int {
|
||||
for v >= 1<<7 {
|
||||
data[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
data[offset] = uint8(v)
|
||||
return offset + 1
|
||||
}
|
||||
func (m *Lease) Size() (n int) {
|
||||
var l int
|
||||
_ = l
|
||||
if m.ID != 0 {
|
||||
n += 1 + sovLease(uint64(m.ID))
|
||||
}
|
||||
if m.TTL != 0 {
|
||||
n += 1 + sovLease(uint64(m.TTL))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovLease(x uint64) (n int) {
|
||||
for {
|
||||
n++
|
||||
x >>= 7
|
||||
if x == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
func sozLease(x uint64) (n int) {
|
||||
return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (m *Lease) Unmarshal(data []byte) error {
|
||||
l := len(data)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
|
||||
}
|
||||
m.ID = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
m.ID |= (int64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 2:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
|
||||
}
|
||||
m.TTL = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
m.TTL |= (int64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
var sizeOfWire int
|
||||
for {
|
||||
sizeOfWire++
|
||||
wire >>= 7
|
||||
if wire == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
iNdEx -= sizeOfWire
|
||||
skippy, err := skipLease(data[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthLease
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func skipLease(data []byte) (n int, err error) {
|
||||
l := len(data)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for {
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if data[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
return iNdEx, nil
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
iNdEx += length
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthLease
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 3:
|
||||
for {
|
||||
var innerWire uint64
|
||||
var start int = iNdEx
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
innerWire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
innerWireType := int(innerWire & 0x7)
|
||||
if innerWireType == 4 {
|
||||
break
|
||||
}
|
||||
next, err := skipLease(data[start:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
iNdEx = start + next
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 4:
|
||||
return iNdEx, nil
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
return iNdEx, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
)
|
15
lease/leasepb/lease.proto
Normal file
15
lease/leasepb/lease.proto
Normal file
@ -0,0 +1,15 @@
|
||||
syntax = "proto3";
|
||||
package leasepb;
|
||||
|
||||
import "gogoproto/gogo.proto";
|
||||
|
||||
option (gogoproto.marshaler_all) = true;
|
||||
option (gogoproto.sizer_all) = true;
|
||||
option (gogoproto.unmarshaler_all) = true;
|
||||
option (gogoproto.goproto_getters_all) = false;
|
||||
option (gogoproto.goproto_enum_prefix_all) = false;
|
||||
|
||||
message Lease {
|
||||
int64 ID = 1;
|
||||
int64 TTL = 2;
|
||||
}
|
104
lease/lessor.go
104
lease/lessor.go
@ -15,17 +15,24 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease/leasepb"
|
||||
"github.com/coreos/etcd/pkg/idutil"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
var (
|
||||
minLeaseTerm = 5 * time.Second
|
||||
|
||||
leaseBucketName = []byte("lease")
|
||||
)
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
// DeleteableRange defines an interface with DeleteRange method.
|
||||
// We define this interface only for lessor to limit the number
|
||||
// of methods of storage.KV to what lessor actually needs.
|
||||
@ -37,7 +44,6 @@ type DeleteableRange interface {
|
||||
|
||||
// a lessor is the owner of leases. It can grant, revoke,
|
||||
// renew and modify leases for lessee.
|
||||
// TODO: persist lease on to stable backend for failure recovery.
|
||||
// TODO: use clockwork for testability.
|
||||
type lessor struct {
|
||||
mu sync.Mutex
|
||||
@ -47,49 +53,67 @@ type lessor struct {
|
||||
// We want to make Grant, Revoke, and FindExpired all O(logN) and
|
||||
// Renew O(1).
|
||||
// FindExpired and Renew should be the most frequent operations.
|
||||
leaseMap map[uint64]*lease
|
||||
leaseMap map[LeaseID]*lease
|
||||
|
||||
// A DeleteableRange the lessor operates on.
|
||||
// When a lease expires, the lessor will delete the
|
||||
// leased range (or key) from the DeleteableRange.
|
||||
dr DeleteableRange
|
||||
|
||||
// backend to persist leases. We only persist lease ID and expiry for now.
|
||||
// The leased items can be recovered by iterating all the keys in kv.
|
||||
b backend.Backend
|
||||
|
||||
idgen *idutil.Generator
|
||||
}
|
||||
|
||||
func NewLessor(lessorID uint8, dr DeleteableRange) *lessor {
|
||||
return &lessor{
|
||||
leaseMap: make(map[uint64]*lease),
|
||||
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
|
||||
l := &lessor{
|
||||
leaseMap: make(map[LeaseID]*lease),
|
||||
b: b,
|
||||
dr: dr,
|
||||
idgen: idutil.NewGenerator(lessorID, time.Now()),
|
||||
}
|
||||
|
||||
tx := l.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(leaseBucketName)
|
||||
tx.Unlock()
|
||||
l.b.ForceCommit()
|
||||
|
||||
// TODO: recover from previous state in backend.
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// Grant grants a lease that expires at least at the given expiry
|
||||
// time.
|
||||
// TODO: when leassor is under highload, it should give out lease
|
||||
// with longer term to reduce renew load.
|
||||
func (le *lessor) Grant(expiry time.Time) *lease {
|
||||
// Grant grants a lease that expires at least after TTL seconds.
|
||||
// TODO: when lessor is under high load, it should give out lease
|
||||
// with longer TTL to reduce renew load.
|
||||
func (le *lessor) Grant(ttl int64) *lease {
|
||||
// TODO: define max TTL
|
||||
expiry := time.Now().Add(time.Duration(ttl) * time.Second)
|
||||
expiry = minExpiry(time.Now(), expiry)
|
||||
|
||||
id := le.idgen.Next()
|
||||
id := LeaseID(le.idgen.Next())
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
l := &lease{id: id, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
|
||||
l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})}
|
||||
if _, ok := le.leaseMap[id]; ok {
|
||||
panic("lease: unexpected duplicate ID!")
|
||||
}
|
||||
|
||||
le.leaseMap[id] = l
|
||||
l.persistTo(le.b)
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// Revoke revokes a lease with given ID. The item attached to the
|
||||
// given lease will be removed. If the ID does not exist, an error
|
||||
// will be returned.
|
||||
func (le *lessor) Revoke(id uint64) error {
|
||||
func (le *lessor) Revoke(id LeaseID) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -98,19 +122,20 @@ func (le *lessor) Revoke(id uint64) error {
|
||||
return fmt.Errorf("lease: cannot find lease %x", id)
|
||||
}
|
||||
|
||||
delete(le.leaseMap, l.id)
|
||||
|
||||
for item := range l.itemSet {
|
||||
le.dr.DeleteRange([]byte(item.key), []byte(item.endRange))
|
||||
le.dr.DeleteRange([]byte(item.key), nil)
|
||||
}
|
||||
|
||||
delete(le.leaseMap, l.id)
|
||||
l.removeFrom(le.b)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Renew renews an existing lease with at least the given expiry.
|
||||
// If the given lease does not exist or has expired, an error will
|
||||
// be returned.
|
||||
func (le *lessor) Renew(id uint64, expiry time.Time) error {
|
||||
// Renew renews an existing lease. If the given lease does not exist or
|
||||
// has expired, an error will be returned.
|
||||
// TODO: return new TTL?
|
||||
func (le *lessor) Renew(id LeaseID) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -119,6 +144,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error {
|
||||
return fmt.Errorf("lease: cannot find lease %x", id)
|
||||
}
|
||||
|
||||
expiry := time.Now().Add(time.Duration(l.ttl) * time.Second)
|
||||
l.expiry = minExpiry(time.Now(), expiry)
|
||||
return nil
|
||||
}
|
||||
@ -126,7 +152,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error {
|
||||
// Attach attaches items to the lease with given ID. When the lease
|
||||
// expires, the attached items will be automatically removed.
|
||||
// If the given lease does not exist, an error will be returned.
|
||||
func (le *lessor) Attach(id uint64, items []leaseItem) error {
|
||||
func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -161,7 +187,7 @@ func (le *lessor) findExpiredLeases() []*lease {
|
||||
|
||||
// get gets the lease with given id.
|
||||
// get is a helper fucntion for testing, at least for now.
|
||||
func (le *lessor) get(id uint64) *lease {
|
||||
func (le *lessor) get(id LeaseID) *lease {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -169,16 +195,38 @@ func (le *lessor) get(id uint64) *lease {
|
||||
}
|
||||
|
||||
type lease struct {
|
||||
id uint64
|
||||
id LeaseID
|
||||
ttl int64 // time to live in seconds
|
||||
|
||||
itemSet map[leaseItem]struct{}
|
||||
// expiry time in unixnano
|
||||
expiry time.Time
|
||||
}
|
||||
|
||||
func (l lease) persistTo(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.id))
|
||||
|
||||
lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)}
|
||||
val, err := lpb.Marshal()
|
||||
if err != nil {
|
||||
panic("failed to marshal lease proto item")
|
||||
}
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().UnsafePut(leaseBucketName, key, val)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
func (l lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.id))
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
type leaseItem struct {
|
||||
key string
|
||||
endRange string
|
||||
key string
|
||||
}
|
||||
|
||||
// minExpiry returns a minimal expiry. A minimal expiry is the larger on
|
||||
@ -190,3 +238,9 @@ func minExpiry(now time.Time, expectedExpiry time.Time) time.Time {
|
||||
}
|
||||
return expectedExpiry
|
||||
}
|
||||
|
||||
func int64ToBytes(n int64) []byte {
|
||||
bytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bytes, uint64(n))
|
||||
return bytes
|
||||
}
|
||||
|
@ -15,18 +15,27 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
// TestLessorGrant ensures Lessor can grant wanted lease.
|
||||
// The granted lease should have a unique ID with a term
|
||||
// that is greater than minLeaseTerm.
|
||||
func TestLessorGrant(t *testing.T) {
|
||||
le := NewLessor(1, &fakeDeleteable{})
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
l := le.Grant(time.Now().Add(time.Second))
|
||||
le := NewLessor(1, be, &fakeDeleteable{})
|
||||
|
||||
l := le.Grant(1)
|
||||
gl := le.get(l.id)
|
||||
|
||||
if !reflect.DeepEqual(gl, l) {
|
||||
@ -36,10 +45,17 @@ func TestLessorGrant(t *testing.T) {
|
||||
t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), minLeaseTerm-time.Second)
|
||||
}
|
||||
|
||||
nl := le.Grant(time.Now().Add(time.Second))
|
||||
nl := le.Grant(1)
|
||||
if nl.id == l.id {
|
||||
t.Errorf("new lease.id = %x, want != %x", nl.id, l.id)
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
|
||||
if len(vs) != 1 {
|
||||
t.Errorf("len(vs) = %d, want 1", len(vs))
|
||||
}
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// TestLessorRevoke ensures Lessor can revoke a lease.
|
||||
@ -47,16 +63,21 @@ func TestLessorGrant(t *testing.T) {
|
||||
// the DeleteableKV.
|
||||
// The revoked lease cannot be got from Lessor again.
|
||||
func TestLessorRevoke(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
fd := &fakeDeleteable{}
|
||||
le := NewLessor(1, fd)
|
||||
|
||||
le := NewLessor(1, be, fd)
|
||||
|
||||
// grant a lease with long term (100 seconds) to
|
||||
// avoid early termination during the test.
|
||||
l := le.Grant(time.Now().Add(100 * time.Second))
|
||||
l := le.Grant(100)
|
||||
|
||||
items := []leaseItem{
|
||||
{"foo", ""},
|
||||
{"bar", "zar"},
|
||||
{"foo"},
|
||||
{"bar"},
|
||||
}
|
||||
|
||||
err := le.Attach(l.id, items)
|
||||
@ -73,22 +94,36 @@ func TestLessorRevoke(t *testing.T) {
|
||||
t.Errorf("got revoked lease %x", l.id)
|
||||
}
|
||||
|
||||
wdeleted := []string{"foo_", "bar_zar"}
|
||||
wdeleted := []string{"foo_", "bar_"}
|
||||
if !reflect.DeepEqual(fd.deleted, wdeleted) {
|
||||
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
|
||||
if len(vs) != 0 {
|
||||
t.Errorf("len(vs) = %d, want 0", len(vs))
|
||||
}
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// TestLessorRenew ensures Lessor can renew an existing lease.
|
||||
func TestLessorRenew(t *testing.T) {
|
||||
le := NewLessor(1, &fakeDeleteable{})
|
||||
l := le.Grant(time.Now().Add(5 * time.Second))
|
||||
dir, be := NewTestBackend(t)
|
||||
defer be.Close()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
le.Renew(l.id, time.Now().Add(100*time.Second))
|
||||
le := NewLessor(1, be, &fakeDeleteable{})
|
||||
l := le.Grant(5)
|
||||
|
||||
// manually change the ttl field
|
||||
l.ttl = 10
|
||||
|
||||
le.Renew(l.id)
|
||||
l = le.get(l.id)
|
||||
|
||||
if l.expiry.Sub(time.Now()) < 95*time.Second {
|
||||
t.Errorf("failed to renew the lease for 100 seconds")
|
||||
if l.expiry.Sub(time.Now()) < 9*time.Second {
|
||||
t.Errorf("failed to renew the lease")
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,3 +135,12 @@ func (fd *fakeDeleteable) DeleteRange(key, end []byte) (int64, int64) {
|
||||
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||
tmpPath, err := ioutil.TempDir("", "lease")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create tmpdir (%v)", err)
|
||||
}
|
||||
|
||||
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ PREFIX="github.com/coreos/etcd/Godeps/_workspace/src"
|
||||
ESCAPED_PREFIX=$(echo $PREFIX | sed -e 's/[\/&]/\\&/g')
|
||||
|
||||
# directories containing protos to be built
|
||||
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb"
|
||||
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb ./lease/leasepb"
|
||||
|
||||
# exact version of protoc-gen-gogo to build
|
||||
SHA="932b70afa8b0bf4a8e167fdf0c3367cebba45903"
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -59,7 +60,7 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist
|
||||
}
|
||||
}
|
||||
|
||||
func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
|
||||
func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
id := s.TxnBegin()
|
||||
rev, err := s.TxnPut(id, key, value, lease)
|
||||
if err != nil {
|
||||
@ -109,7 +110,7 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit,
|
||||
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
if s.skip {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -15,14 +15,13 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
type Snapshot backend.Snapshot
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
type KV interface {
|
||||
// Rev returns the current revision of the KV.
|
||||
Rev() int64
|
||||
@ -39,7 +38,7 @@ type KV interface {
|
||||
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
|
||||
// id.
|
||||
// A put also increases the rev of the store, and generates one event in the event history.
|
||||
Put(key, value []byte, lease LeaseID) (rev int64)
|
||||
Put(key, value []byte, lease lease.LeaseID) (rev int64)
|
||||
|
||||
// DeleteRange deletes the given range from the store.
|
||||
// A deleteRange increases the rev of the store if any key in the range exists.
|
||||
@ -57,7 +56,7 @@ type KV interface {
|
||||
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
|
||||
TxnEnd(txnID int64) error
|
||||
TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
|
||||
TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error)
|
||||
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
|
||||
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||
|
||||
Compact(rev int64) error
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -34,7 +35,7 @@ import (
|
||||
|
||||
type (
|
||||
rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error)
|
||||
putFunc func(kv KV, key, value []byte, lease LeaseID) int64
|
||||
putFunc func(kv KV, key, value []byte, lease lease.LeaseID) int64
|
||||
deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
|
||||
)
|
||||
|
||||
@ -48,10 +49,10 @@ var (
|
||||
return kv.TxnRange(id, key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
|
||||
normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||
return kv.Put(key, value, lease)
|
||||
}
|
||||
txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
|
||||
txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||
id := kv.TxnBegin()
|
||||
defer kv.TxnEnd(id)
|
||||
rev, err := kv.TxnPut(id, key, value, lease)
|
||||
@ -280,7 +281,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
||||
for i := 0; i < 10; i++ {
|
||||
base := int64(i + 1)
|
||||
|
||||
rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base))
|
||||
rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base))
|
||||
if rev != base {
|
||||
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -39,7 +40,7 @@ var (
|
||||
markBytePosition = markedRevBytesLen - 1
|
||||
markTombstone byte = 't'
|
||||
|
||||
NoLease = LeaseID(0)
|
||||
NoLease = lease.LeaseID(0)
|
||||
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
@ -97,7 +98,7 @@ func (s *store) Rev() int64 {
|
||||
return s.currentRev.main
|
||||
}
|
||||
|
||||
func (s *store) Put(key, value []byte, lease LeaseID) int64 {
|
||||
func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
|
||||
id := s.TxnBegin()
|
||||
s.put(key, value, lease)
|
||||
s.txnEnd(id)
|
||||
@ -172,7 +173,7 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k
|
||||
return s.rangeKeys(key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
if txnID != s.txnID {
|
||||
return 0, ErrTxnIDMismatch
|
||||
}
|
||||
@ -353,7 +354,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
|
||||
return kvs, rev, nil
|
||||
}
|
||||
|
||||
func (s *store) put(key, value []byte, lease LeaseID) {
|
||||
func (s *store) put(key, value []byte, lease lease.LeaseID) {
|
||||
rev := s.currentRev.main + 1
|
||||
c := rev
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
@ -105,7 +106,7 @@ func TestStorePut(t *testing.T) {
|
||||
s.tx = b.BatchTx()
|
||||
fi.indexGetRespc <- tt.r
|
||||
|
||||
s.put([]byte("foo"), []byte("bar"), LeaseID(i+1))
|
||||
s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
|
||||
|
||||
data, err := tt.wkv.Marshal()
|
||||
if err != nil {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -69,7 +70,7 @@ func newWatchableStore(path string) *watchableStore {
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
|
||||
func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -115,7 +116,7 @@ func (s *watchableStore) TxnBegin() int64 {
|
||||
return s.store.TxnBegin()
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
rev, err = s.store.TxnPut(txnID, key, value, lease)
|
||||
if err == nil {
|
||||
s.tx.put(string(key))
|
||||
|
Loading…
x
Reference in New Issue
Block a user