mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: move leaseID typedef to lease pkg
This commit is contained in:
parent
25f82b25f7
commit
09b420f08c
@ -21,6 +21,7 @@ import (
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/lease"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -143,12 +144,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
|
||||
err error
|
||||
)
|
||||
if txnID != noTxn {
|
||||
rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease))
|
||||
rev, err = kv.TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease))
|
||||
rev = kv.Put(p.Key, p.Value, lease.LeaseID(p.Lease))
|
||||
}
|
||||
resp.Header.Revision = rev
|
||||
return resp, nil
|
||||
|
@ -31,6 +31,8 @@ var (
|
||||
leaseBucketName = []byte("lease")
|
||||
)
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
// DeleteableRange defines an interface with DeleteRange method.
|
||||
// We define this interface only for lessor to limit the number
|
||||
// of methods of storage.KV to what lessor actually needs.
|
||||
@ -51,7 +53,7 @@ type lessor struct {
|
||||
// We want to make Grant, Revoke, and FindExpired all O(logN) and
|
||||
// Renew O(1).
|
||||
// FindExpired and Renew should be the most frequent operations.
|
||||
leaseMap map[int64]*lease
|
||||
leaseMap map[LeaseID]*lease
|
||||
|
||||
// A DeleteableRange the lessor operates on.
|
||||
// When a lease expires, the lessor will delete the
|
||||
@ -67,7 +69,7 @@ type lessor struct {
|
||||
|
||||
func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
|
||||
l := &lessor{
|
||||
leaseMap: make(map[int64]*lease),
|
||||
leaseMap: make(map[LeaseID]*lease),
|
||||
b: b,
|
||||
dr: dr,
|
||||
idgen: idutil.NewGenerator(lessorID, time.Now()),
|
||||
@ -92,7 +94,7 @@ func (le *lessor) Grant(ttl int64) *lease {
|
||||
expiry := time.Now().Add(time.Duration(ttl) * time.Second)
|
||||
expiry = minExpiry(time.Now(), expiry)
|
||||
|
||||
id := int64(le.idgen.Next())
|
||||
id := LeaseID(le.idgen.Next())
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
@ -111,7 +113,7 @@ func (le *lessor) Grant(ttl int64) *lease {
|
||||
// Revoke revokes a lease with given ID. The item attached to the
|
||||
// given lease will be removed. If the ID does not exist, an error
|
||||
// will be returned.
|
||||
func (le *lessor) Revoke(id int64) error {
|
||||
func (le *lessor) Revoke(id LeaseID) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -133,7 +135,7 @@ func (le *lessor) Revoke(id int64) error {
|
||||
// Renew renews an existing lease. If the given lease does not exist or
|
||||
// has expired, an error will be returned.
|
||||
// TODO: return new TTL?
|
||||
func (le *lessor) Renew(id int64) error {
|
||||
func (le *lessor) Renew(id LeaseID) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -150,7 +152,7 @@ func (le *lessor) Renew(id int64) error {
|
||||
// Attach attaches items to the lease with given ID. When the lease
|
||||
// expires, the attached items will be automatically removed.
|
||||
// If the given lease does not exist, an error will be returned.
|
||||
func (le *lessor) Attach(id int64, items []leaseItem) error {
|
||||
func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -185,7 +187,7 @@ func (le *lessor) findExpiredLeases() []*lease {
|
||||
|
||||
// get gets the lease with given id.
|
||||
// get is a helper fucntion for testing, at least for now.
|
||||
func (le *lessor) get(id int64) *lease {
|
||||
func (le *lessor) get(id LeaseID) *lease {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
@ -193,7 +195,7 @@ func (le *lessor) get(id int64) *lease {
|
||||
}
|
||||
|
||||
type lease struct {
|
||||
id int64
|
||||
id LeaseID
|
||||
ttl int64 // time to live in seconds
|
||||
|
||||
itemSet map[leaseItem]struct{}
|
||||
@ -202,9 +204,9 @@ type lease struct {
|
||||
}
|
||||
|
||||
func (l lease) persistTo(b backend.Backend) {
|
||||
key := int64ToBytes(l.id)
|
||||
key := int64ToBytes(int64(l.id))
|
||||
|
||||
lpb := leasepb.Lease{ID: l.id, TTL: int64(l.ttl)}
|
||||
lpb := leasepb.Lease{ID: int64(l.id), TTL: int64(l.ttl)}
|
||||
val, err := lpb.Marshal()
|
||||
if err != nil {
|
||||
panic("failed to marshal lease proto item")
|
||||
@ -216,7 +218,7 @@ func (l lease) persistTo(b backend.Backend) {
|
||||
}
|
||||
|
||||
func (l lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(l.id)
|
||||
key := int64ToBytes(int64(l.id))
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
|
@ -51,7 +51,7 @@ func TestLessorGrant(t *testing.T) {
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0)
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
|
||||
if len(vs) != 1 {
|
||||
t.Errorf("len(vs) = %d, want 1", len(vs))
|
||||
}
|
||||
@ -100,7 +100,7 @@ func TestLessorRevoke(t *testing.T) {
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(l.id), nil, 0)
|
||||
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.id)), nil, 0)
|
||||
if len(vs) != 0 {
|
||||
t.Errorf("len(vs) = %d, want 0", len(vs))
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -59,7 +60,7 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist
|
||||
}
|
||||
}
|
||||
|
||||
func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
|
||||
func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
id := s.TxnBegin()
|
||||
rev, err := s.TxnPut(id, key, value, lease)
|
||||
if err != nil {
|
||||
@ -109,7 +110,7 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit,
|
||||
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
if s.skip {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -25,8 +26,6 @@ type CancelFunc func()
|
||||
|
||||
type Snapshot backend.Snapshot
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
type KV interface {
|
||||
// Rev returns the current revision of the KV.
|
||||
Rev() int64
|
||||
@ -43,7 +42,7 @@ type KV interface {
|
||||
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
|
||||
// id.
|
||||
// A put also increases the rev of the store, and generates one event in the event history.
|
||||
Put(key, value []byte, lease LeaseID) (rev int64)
|
||||
Put(key, value []byte, lease lease.LeaseID) (rev int64)
|
||||
|
||||
// DeleteRange deletes the given range from the store.
|
||||
// A deleteRange increases the rev of the store if any key in the range exists.
|
||||
@ -61,7 +60,7 @@ type KV interface {
|
||||
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
|
||||
TxnEnd(txnID int64) error
|
||||
TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
|
||||
TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error)
|
||||
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
|
||||
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||
|
||||
Compact(rev int64) error
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -34,7 +35,7 @@ import (
|
||||
|
||||
type (
|
||||
rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error)
|
||||
putFunc func(kv KV, key, value []byte, lease LeaseID) int64
|
||||
putFunc func(kv KV, key, value []byte, lease lease.LeaseID) int64
|
||||
deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
|
||||
)
|
||||
|
||||
@ -48,10 +49,10 @@ var (
|
||||
return kv.TxnRange(id, key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
|
||||
normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||
return kv.Put(key, value, lease)
|
||||
}
|
||||
txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
|
||||
txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||
id := kv.TxnBegin()
|
||||
defer kv.TxnEnd(id)
|
||||
rev, err := kv.TxnPut(id, key, value, lease)
|
||||
@ -280,7 +281,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
||||
for i := 0; i < 10; i++ {
|
||||
base := int64(i + 1)
|
||||
|
||||
rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base))
|
||||
rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base))
|
||||
if rev != base {
|
||||
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
@ -39,7 +40,7 @@ var (
|
||||
markBytePosition = markedRevBytesLen - 1
|
||||
markTombstone byte = 't'
|
||||
|
||||
NoLease = LeaseID(0)
|
||||
NoLease = lease.LeaseID(0)
|
||||
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
@ -97,7 +98,7 @@ func (s *store) Rev() int64 {
|
||||
return s.currentRev.main
|
||||
}
|
||||
|
||||
func (s *store) Put(key, value []byte, lease LeaseID) int64 {
|
||||
func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
|
||||
id := s.TxnBegin()
|
||||
s.put(key, value, lease)
|
||||
s.txnEnd(id)
|
||||
@ -172,7 +173,7 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k
|
||||
return s.rangeKeys(key, end, limit, rangeRev)
|
||||
}
|
||||
|
||||
func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
if txnID != s.txnID {
|
||||
return 0, ErrTxnIDMismatch
|
||||
}
|
||||
@ -353,7 +354,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
|
||||
return kvs, rev, nil
|
||||
}
|
||||
|
||||
func (s *store) put(key, value []byte, lease LeaseID) {
|
||||
func (s *store) put(key, value []byte, lease lease.LeaseID) {
|
||||
rev := s.currentRev.main + 1
|
||||
c := rev
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
@ -105,7 +106,7 @@ func TestStorePut(t *testing.T) {
|
||||
s.tx = b.BatchTx()
|
||||
fi.indexGetRespc <- tt.r
|
||||
|
||||
s.put([]byte("foo"), []byte("bar"), LeaseID(i+1))
|
||||
s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
|
||||
|
||||
data, err := tt.wkv.Marshal()
|
||||
if err != nil {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -65,7 +66,7 @@ func newWatchableStore(path string) *watchableStore {
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
|
||||
func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -111,7 +112,7 @@ func (s *watchableStore) TxnBegin() int64 {
|
||||
return s.store.TxnBegin()
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
|
||||
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
|
||||
rev, err = s.store.TxnPut(txnID, key, value, lease)
|
||||
if err == nil {
|
||||
s.tx.put(string(key))
|
||||
|
Loading…
x
Reference in New Issue
Block a user