From 25f82b25f7ca7e97fb4c258ea7ae17fda72d9dbf Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 4 Jan 2016 22:53:11 -0800 Subject: [PATCH 1/2] lease: modify API and persist lease to disk --- lease/leasepb/lease.pb.go | 282 ++++++++++++++++++++++++++++++++++++++ lease/leasepb/lease.proto | 15 ++ lease/lessor.go | 102 ++++++++++---- lease/lessor_test.go | 70 ++++++++-- scripts/genproto.sh | 2 +- 5 files changed, 432 insertions(+), 39 deletions(-) create mode 100644 lease/leasepb/lease.pb.go create mode 100644 lease/leasepb/lease.proto diff --git a/lease/leasepb/lease.pb.go b/lease/leasepb/lease.pb.go new file mode 100644 index 000000000..3d14027b8 --- /dev/null +++ b/lease/leasepb/lease.pb.go @@ -0,0 +1,282 @@ +// Code generated by protoc-gen-gogo. +// source: lease.proto +// DO NOT EDIT! + +/* + Package leasepb is a generated protocol buffer package. + + It is generated from these files: + lease.proto + + It has these top-level messages: + Lease +*/ +package leasepb + +import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + +// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +type Lease struct { + ID int64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"` + TTL int64 `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"` +} + +func (m *Lease) Reset() { *m = Lease{} } +func (m *Lease) String() string { return proto.CompactTextString(m) } +func (*Lease) ProtoMessage() {} + +func (m *Lease) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Lease) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.ID != 0 { + data[i] = 0x8 + i++ + i = encodeVarintLease(data, i, uint64(m.ID)) + } + if m.TTL != 0 { + data[i] = 0x10 + i++ + i = encodeVarintLease(data, i, uint64(m.TTL)) + } + return i, nil +} + +func encodeFixed64Lease(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Lease(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLease(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Lease) Size() (n int) { + var l int + _ = l + if m.ID != 0 { + n += 1 + sovLease(uint64(m.ID)) + } + if m.TTL != 0 { + n += 1 + sovLease(uint64(m.TTL)) + } + return n +} + +func sovLease(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLease(x uint64) (n int) { + return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Lease) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + m.ID = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) + } + m.TTL = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.TTL |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipLease(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func skipLease(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthLease + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLease(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling") +) diff --git a/lease/leasepb/lease.proto b/lease/leasepb/lease.proto new file mode 100644 index 000000000..f2c996c8a --- /dev/null +++ b/lease/leasepb/lease.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package leasepb; + +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; + +message Lease { + int64 ID = 1; + int64 TTL = 2; +} diff --git a/lease/lessor.go b/lease/lessor.go index 73717dbe6..ab70dc152 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -15,15 +15,20 @@ package lease import ( + "encoding/binary" "fmt" "sync" "time" + "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/pkg/idutil" + "github.com/coreos/etcd/storage/backend" ) var ( minLeaseTerm = 5 * time.Second + + leaseBucketName = []byte("lease") ) // DeleteableRange defines an interface with DeleteRange method. @@ -37,7 +42,6 @@ type DeleteableRange interface { // a lessor is the owner of leases. It can grant, revoke, // renew and modify leases for lessee. -// TODO: persist lease on to stable backend for failure recovery. // TODO: use clockwork for testability. type lessor struct { mu sync.Mutex @@ -47,49 +51,67 @@ type lessor struct { // We want to make Grant, Revoke, and FindExpired all O(logN) and // Renew O(1). // FindExpired and Renew should be the most frequent operations. - leaseMap map[uint64]*lease + leaseMap map[int64]*lease // A DeleteableRange the lessor operates on. // When a lease expires, the lessor will delete the // leased range (or key) from the DeleteableRange. dr DeleteableRange + // backend to persist leases. We only persist lease ID and expiry for now. + // The leased items can be recovered by iterating all the keys in kv. + b backend.Backend + idgen *idutil.Generator } -func NewLessor(lessorID uint8, dr DeleteableRange) *lessor { - return &lessor{ - leaseMap: make(map[uint64]*lease), +func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { + l := &lessor{ + leaseMap: make(map[int64]*lease), + b: b, dr: dr, idgen: idutil.NewGenerator(lessorID, time.Now()), } + + tx := l.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(leaseBucketName) + tx.Unlock() + l.b.ForceCommit() + + // TODO: recover from previous state in backend. + + return l } -// Grant grants a lease that expires at least at the given expiry -// time. -// TODO: when leassor is under highload, it should give out lease -// with longer term to reduce renew load. -func (le *lessor) Grant(expiry time.Time) *lease { +// Grant grants a lease that expires at least after TTL seconds. +// TODO: when lessor is under high load, it should give out lease +// with longer TTL to reduce renew load. +func (le *lessor) Grant(ttl int64) *lease { + // TODO: define max TTL + expiry := time.Now().Add(time.Duration(ttl) * time.Second) expiry = minExpiry(time.Now(), expiry) - id := le.idgen.Next() + id := int64(le.idgen.Next()) le.mu.Lock() defer le.mu.Unlock() - l := &lease{id: id, expiry: expiry, itemSet: make(map[leaseItem]struct{})} + l := &lease{id: id, ttl: ttl, expiry: expiry, itemSet: make(map[leaseItem]struct{})} if _, ok := le.leaseMap[id]; ok { panic("lease: unexpected duplicate ID!") } le.leaseMap[id] = l + l.persistTo(le.b) + return l } // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. -func (le *lessor) Revoke(id uint64) error { +func (le *lessor) Revoke(id int64) error { le.mu.Lock() defer le.mu.Unlock() @@ -98,19 +120,20 @@ func (le *lessor) Revoke(id uint64) error { return fmt.Errorf("lease: cannot find lease %x", id) } - delete(le.leaseMap, l.id) - for item := range l.itemSet { - le.dr.DeleteRange([]byte(item.key), []byte(item.endRange)) + le.dr.DeleteRange([]byte(item.key), nil) } + delete(le.leaseMap, l.id) + l.removeFrom(le.b) + return nil } -// Renew renews an existing lease with at least the given expiry. -// If the given lease does not exist or has expired, an error will -// be returned. -func (le *lessor) Renew(id uint64, expiry time.Time) error { +// Renew renews an existing lease. If the given lease does not exist or +// has expired, an error will be returned. +// TODO: return new TTL? +func (le *lessor) Renew(id int64) error { le.mu.Lock() defer le.mu.Unlock() @@ -119,6 +142,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error { return fmt.Errorf("lease: cannot find lease %x", id) } + expiry := time.Now().Add(time.Duration(l.ttl) * time.Second) l.expiry = minExpiry(time.Now(), expiry) return nil } @@ -126,7 +150,7 @@ func (le *lessor) Renew(id uint64, expiry time.Time) error { // Attach attaches items to the lease with given ID. When the lease // expires, the attached items will be automatically removed. // If the given lease does not exist, an error will be returned. -func (le *lessor) Attach(id uint64, items []leaseItem) error { +func (le *lessor) Attach(id int64, items []leaseItem) error { le.mu.Lock() defer le.mu.Unlock() @@ -161,7 +185,7 @@ func (le *lessor) findExpiredLeases() []*lease { // get gets the lease with given id. // get is a helper fucntion for testing, at least for now. -func (le *lessor) get(id uint64) *lease { +func (le *lessor) get(id int64) *lease { le.mu.Lock() defer le.mu.Unlock() @@ -169,16 +193,38 @@ func (le *lessor) get(id uint64) *lease { } type lease struct { - id uint64 + id int64 + ttl int64 // time to live in seconds itemSet map[leaseItem]struct{} // expiry time in unixnano expiry time.Time } +func (l lease) persistTo(b backend.Backend) { + key := int64ToBytes(l.id) + + lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)} + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + + b.BatchTx().Lock() + b.BatchTx().UnsafePut(leaseBucketName, key, val) + b.BatchTx().Unlock() +} + +func (l lease) removeFrom(b backend.Backend) { + key := int64ToBytes(l.id) + + b.BatchTx().Lock() + b.BatchTx().UnsafeDelete(leaseBucketName, key) + b.BatchTx().Unlock() +} + type leaseItem struct { - key string - endRange string + key string } // minExpiry returns a minimal expiry. A minimal expiry is the larger on @@ -190,3 +236,9 @@ func minExpiry(now time.Time, expectedExpiry time.Time) time.Time { } return expectedExpiry } + +func int64ToBytes(n int64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(n)) + return bytes +} diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 548c57c79..e6078b1bc 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -15,18 +15,27 @@ package lease import ( + "io/ioutil" + "os" + "path" "reflect" "testing" "time" + + "github.com/coreos/etcd/storage/backend" ) // TestLessorGrant ensures Lessor can grant wanted lease. // The granted lease should have a unique ID with a term // that is greater than minLeaseTerm. func TestLessorGrant(t *testing.T) { - le := NewLessor(1, &fakeDeleteable{}) + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() - l := le.Grant(time.Now().Add(time.Second)) + le := NewLessor(1, be, &fakeDeleteable{}) + + l := le.Grant(1) gl := le.get(l.id) if !reflect.DeepEqual(gl, l) { @@ -36,10 +45,17 @@ func TestLessorGrant(t *testing.T) { t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), minLeaseTerm-time.Second) } - nl := le.Grant(time.Now().Add(time.Second)) + nl := le.Grant(1) if nl.id == l.id { t.Errorf("new lease.id = %x, want != %x", nl.id, l.id) } + + be.BatchTx().Lock() + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + if len(vs) != 1 { + t.Errorf("len(vs) = %d, want 1", len(vs)) + } + be.BatchTx().Unlock() } // TestLessorRevoke ensures Lessor can revoke a lease. @@ -47,16 +63,21 @@ func TestLessorGrant(t *testing.T) { // the DeleteableKV. // The revoked lease cannot be got from Lessor again. func TestLessorRevoke(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + fd := &fakeDeleteable{} - le := NewLessor(1, fd) + + le := NewLessor(1, be, fd) // grant a lease with long term (100 seconds) to // avoid early termination during the test. - l := le.Grant(time.Now().Add(100 * time.Second)) + l := le.Grant(100) items := []leaseItem{ - {"foo", ""}, - {"bar", "zar"}, + {"foo"}, + {"bar"}, } err := le.Attach(l.id, items) @@ -73,22 +94,36 @@ func TestLessorRevoke(t *testing.T) { t.Errorf("got revoked lease %x", l.id) } - wdeleted := []string{"foo_", "bar_zar"} + wdeleted := []string{"foo_", "bar_"} if !reflect.DeepEqual(fd.deleted, wdeleted) { t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted) } + + be.BatchTx().Lock() + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + if len(vs) != 0 { + t.Errorf("len(vs) = %d, want 0", len(vs)) + } + be.BatchTx().Unlock() } // TestLessorRenew ensures Lessor can renew an existing lease. func TestLessorRenew(t *testing.T) { - le := NewLessor(1, &fakeDeleteable{}) - l := le.Grant(time.Now().Add(5 * time.Second)) + dir, be := NewTestBackend(t) + defer be.Close() + defer os.RemoveAll(dir) - le.Renew(l.id, time.Now().Add(100*time.Second)) + le := NewLessor(1, be, &fakeDeleteable{}) + l := le.Grant(5) + + // manually change the ttl field + l.ttl = 10 + + le.Renew(l.id) l = le.get(l.id) - if l.expiry.Sub(time.Now()) < 95*time.Second { - t.Errorf("failed to renew the lease for 100 seconds") + if l.expiry.Sub(time.Now()) < 9*time.Second { + t.Errorf("failed to renew the lease") } } @@ -100,3 +135,12 @@ func (fd *fakeDeleteable) DeleteRange(key, end []byte) (int64, int64) { fd.deleted = append(fd.deleted, string(key)+"_"+string(end)) return 0, 0 } + +func NewTestBackend(t *testing.T) (string, backend.Backend) { + tmpPath, err := ioutil.TempDir("", "lease") + if err != nil { + t.Fatalf("failed to create tmpdir (%v)", err) + } + + return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000) +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 1176473ba..327a45a1e 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -20,7 +20,7 @@ PREFIX="github.com/coreos/etcd/Godeps/_workspace/src" ESCAPED_PREFIX=$(echo $PREFIX | sed -e 's/[\/&]/\\&/g') # directories containing protos to be built -DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb" +DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb ./lease/leasepb" # exact version of protoc-gen-gogo to build SHA="932b70afa8b0bf4a8e167fdf0c3367cebba45903" From 09b420f08c3780fa8eb0225cee1431c0876c12e0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 5 Jan 2016 10:16:50 -0800 Subject: [PATCH 2/2] *: move leaseID typedef to lease pkg --- etcdserver/v3demo_server.go | 5 +++-- lease/lessor.go | 24 +++++++++++++----------- lease/lessor_test.go | 4 ++-- storage/consistent_watchable_store.go | 5 +++-- storage/kv.go | 7 +++---- storage/kv_test.go | 9 +++++---- storage/kvstore.go | 9 +++++---- storage/kvstore_test.go | 3 ++- storage/watchable_store.go | 5 +++-- 9 files changed, 39 insertions(+), 32 deletions(-) diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index a1632ccef..0f3c4d65d 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/storage/storagepb" ) @@ -143,12 +144,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e err error ) if txnID != noTxn { - rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease)) + rev, err = kv.TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) if err != nil { return nil, err } } else { - rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease)) + rev = kv.Put(p.Key, p.Value, lease.LeaseID(p.Lease)) } resp.Header.Revision = rev return resp, nil diff --git a/lease/lessor.go b/lease/lessor.go index ab70dc152..4306518ec 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -31,6 +31,8 @@ var ( leaseBucketName = []byte("lease") ) +type LeaseID int64 + // DeleteableRange defines an interface with DeleteRange method. // We define this interface only for lessor to limit the number // of methods of storage.KV to what lessor actually needs. @@ -51,7 +53,7 @@ type lessor struct { // We want to make Grant, Revoke, and FindExpired all O(logN) and // Renew O(1). // FindExpired and Renew should be the most frequent operations. - leaseMap map[int64]*lease + leaseMap map[LeaseID]*lease // A DeleteableRange the lessor operates on. // When a lease expires, the lessor will delete the @@ -67,7 +69,7 @@ type lessor struct { func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { l := &lessor{ - leaseMap: make(map[int64]*lease), + leaseMap: make(map[LeaseID]*lease), b: b, dr: dr, idgen: idutil.NewGenerator(lessorID, time.Now()), @@ -92,7 +94,7 @@ func (le *lessor) Grant(ttl int64) *lease { expiry := time.Now().Add(time.Duration(ttl) * time.Second) expiry = minExpiry(time.Now(), expiry) - id := int64(le.idgen.Next()) + id := LeaseID(le.idgen.Next()) le.mu.Lock() defer le.mu.Unlock() @@ -111,7 +113,7 @@ func (le *lessor) Grant(ttl int64) *lease { // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. -func (le *lessor) Revoke(id int64) error { +func (le *lessor) Revoke(id LeaseID) error { le.mu.Lock() defer le.mu.Unlock() @@ -133,7 +135,7 @@ func (le *lessor) Revoke(id int64) error { // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. // TODO: return new TTL? -func (le *lessor) Renew(id int64) error { +func (le *lessor) Renew(id LeaseID) error { le.mu.Lock() defer le.mu.Unlock() @@ -150,7 +152,7 @@ func (le *lessor) Renew(id int64) error { // Attach attaches items to the lease with given ID. When the lease // expires, the attached items will be automatically removed. // If the given lease does not exist, an error will be returned. -func (le *lessor) Attach(id int64, items []leaseItem) error { +func (le *lessor) Attach(id LeaseID, items []leaseItem) error { le.mu.Lock() defer le.mu.Unlock() @@ -185,7 +187,7 @@ func (le *lessor) findExpiredLeases() []*lease { // get gets the lease with given id. // get is a helper fucntion for testing, at least for now. -func (le *lessor) get(id int64) *lease { +func (le *lessor) get(id LeaseID) *lease { le.mu.Lock() defer le.mu.Unlock() @@ -193,7 +195,7 @@ func (le *lessor) get(id int64) *lease { } type lease struct { - id int64 + id LeaseID ttl int64 // time to live in seconds itemSet map[leaseItem]struct{} @@ -202,9 +204,9 @@ type lease struct { } func (l lease) persistTo(b backend.Backend) { - key := int64ToBytes(l.id) + key := int64ToBytes(int64(l.id)) - lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)} + lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)} val, err := lpb.Marshal() if err != nil { panic("failed to marshal lease proto item") @@ -216,7 +218,7 @@ func (l lease) persistTo(b backend.Backend) { } func (l lease) removeFrom(b backend.Backend) { - key := int64ToBytes(l.id) + key := int64ToBytes(int64(l.id)) b.BatchTx().Lock() b.BatchTx().UnsafeDelete(leaseBucketName, key) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index e6078b1bc..eebd72923 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -51,7 +51,7 @@ func TestLessorGrant(t *testing.T) { } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0) if len(vs) != 1 { t.Errorf("len(vs) = %d, want 1", len(vs)) } @@ -100,7 +100,7 @@ func TestLessorRevoke(t *testing.T) { } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0) + _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0) if len(vs) != 0 { t.Errorf("len(vs) = %d, want 0", len(vs)) } diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index 78002eef2..688959bff 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "log" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/storagepb" ) @@ -59,7 +60,7 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist } } -func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) { +func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { id := s.TxnBegin() rev, err := s.TxnPut(id, key, value, lease) if err != nil { @@ -109,7 +110,7 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev) } -func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { +func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { if s.skip { return 0, nil } diff --git a/storage/kv.go b/storage/kv.go index 8069e1a56..1bbd2088b 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -15,6 +15,7 @@ package storage import ( + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -25,8 +26,6 @@ type CancelFunc func() type Snapshot backend.Snapshot -type LeaseID int64 - type KV interface { // Rev returns the current revision of the KV. Rev() int64 @@ -43,7 +42,7 @@ type KV interface { // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease // id. // A put also increases the rev of the store, and generates one event in the event history. - Put(key, value []byte, lease LeaseID) (rev int64) + Put(key, value []byte, lease lease.LeaseID) (rev int64) // DeleteRange deletes the given range from the store. // A deleteRange increases the rev of the store if any key in the range exists. @@ -61,7 +60,7 @@ type KV interface { // TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned. TxnEnd(txnID int64) error TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) - TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) + TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) Compact(rev int64) error diff --git a/storage/kv_test.go b/storage/kv_test.go index b6e29d38b..cb717a769 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/storagepb" ) @@ -34,7 +35,7 @@ import ( type ( rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error) - putFunc func(kv KV, key, value []byte, lease LeaseID) int64 + putFunc func(kv KV, key, value []byte, lease lease.LeaseID) int64 deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64) ) @@ -48,10 +49,10 @@ var ( return kv.TxnRange(id, key, end, limit, rangeRev) } - normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 { + normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { return kv.Put(key, value, lease) } - txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 { + txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { id := kv.TxnBegin() defer kv.TxnEnd(id) rev, err := kv.TxnPut(id, key, value, lease) @@ -280,7 +281,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) { for i := 0; i < 10; i++ { base := int64(i + 1) - rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base)) + rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base)) if rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) } diff --git a/storage/kvstore.go b/storage/kvstore.go index 1da2df82e..d2c34b862 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -39,7 +40,7 @@ var ( markBytePosition = markedRevBytesLen - 1 markTombstone byte = 't' - NoLease = LeaseID(0) + NoLease = lease.LeaseID(0) scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -97,7 +98,7 @@ func (s *store) Rev() int64 { return s.currentRev.main } -func (s *store) Put(key, value []byte, lease LeaseID) int64 { +func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 { id := s.TxnBegin() s.put(key, value, lease) s.txnEnd(id) @@ -172,7 +173,7 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k return s.rangeKeys(key, end, limit, rangeRev) } -func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { +func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { if txnID != s.txnID { return 0, ErrTxnIDMismatch } @@ -353,7 +354,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage return kvs, rev, nil } -func (s *store) put(key, value []byte, lease LeaseID) { +func (s *store) put(key, value []byte, lease lease.LeaseID) { rev := s.currentRev.main + 1 c := rev diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 4700d2707..3e282c42e 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" @@ -105,7 +106,7 @@ func TestStorePut(t *testing.T) { s.tx = b.BatchTx() fi.indexGetRespc <- tt.r - s.put([]byte("foo"), []byte("bar"), LeaseID(i+1)) + s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) data, err := tt.wkv.Marshal() if err != nil { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 0a9d523d1..9e881f13f 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/storagepb" ) @@ -65,7 +66,7 @@ func newWatchableStore(path string) *watchableStore { return s } -func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) { +func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { s.mu.Lock() defer s.mu.Unlock() @@ -111,7 +112,7 @@ func (s *watchableStore) TxnBegin() int64 { return s.store.TxnBegin() } -func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { +func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { rev, err = s.store.TxnPut(txnID, key, value, lease) if err == nil { s.tx.put(string(key))