mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
f34b8d334b
@ -72,6 +72,10 @@ type Lessor interface {
|
|||||||
// If the lease does not exist, an error will be returned.
|
// If the lease does not exist, an error will be returned.
|
||||||
Attach(id LeaseID, items []LeaseItem) error
|
Attach(id LeaseID, items []LeaseItem) error
|
||||||
|
|
||||||
|
// Detach detaches given leaseItem from the lease with given LeaseID.
|
||||||
|
// If the lease does not exist, an error will be returned.
|
||||||
|
Detach(id LeaseID, items []LeaseItem) error
|
||||||
|
|
||||||
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
|
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
|
||||||
// the expiration and renew of leases.
|
// the expiration and renew of leases.
|
||||||
Promote()
|
Promote()
|
||||||
@ -194,12 +198,14 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
|
|||||||
|
|
||||||
func (le *lessor) Revoke(id LeaseID) error {
|
func (le *lessor) Revoke(id LeaseID) error {
|
||||||
le.mu.Lock()
|
le.mu.Lock()
|
||||||
defer le.mu.Unlock()
|
|
||||||
|
|
||||||
l := le.leaseMap[id]
|
l := le.leaseMap[id]
|
||||||
if l == nil {
|
if l == nil {
|
||||||
|
le.mu.Unlock()
|
||||||
return ErrLeaseNotFound
|
return ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
|
// unlock before doing external work
|
||||||
|
le.mu.Unlock()
|
||||||
|
|
||||||
if le.rd != nil {
|
if le.rd != nil {
|
||||||
for item := range l.itemSet {
|
for item := range l.itemSet {
|
||||||
@ -207,6 +213,8 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
le.mu.Lock()
|
||||||
|
defer le.mu.Unlock()
|
||||||
delete(le.leaseMap, l.ID)
|
delete(le.leaseMap, l.ID)
|
||||||
l.removeFrom(le.b)
|
l.removeFrom(le.b)
|
||||||
|
|
||||||
@ -284,6 +292,23 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Detach detaches items from the lease with given ID.
|
||||||
|
// If the given lease does not exist, an error will be returned.
|
||||||
|
func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
||||||
|
le.mu.Lock()
|
||||||
|
defer le.mu.Unlock()
|
||||||
|
|
||||||
|
l := le.leaseMap[id]
|
||||||
|
if l == nil {
|
||||||
|
return ErrLeaseNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, it := range items {
|
||||||
|
delete(l.itemSet, it)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
|
func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
|
||||||
le.mu.Lock()
|
le.mu.Lock()
|
||||||
defer le.mu.Unlock()
|
defer le.mu.Unlock()
|
||||||
@ -462,6 +487,8 @@ func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
|
|||||||
|
|
||||||
func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
|
func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
|
||||||
|
|
||||||
|
func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
|
||||||
|
|
||||||
func (fl *FakeLessor) Promote() {}
|
func (fl *FakeLessor) Promote() {}
|
||||||
|
|
||||||
func (fl *FakeLessor) Demote() {}
|
func (fl *FakeLessor) Demote() {}
|
||||||
|
@ -151,6 +151,45 @@ func TestLessorRenew(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLessorDetach(t *testing.T) {
|
||||||
|
dir, be := NewTestBackend(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer be.Close()
|
||||||
|
|
||||||
|
fd := &fakeDeleter{}
|
||||||
|
|
||||||
|
le := newLessor(be)
|
||||||
|
le.SetRangeDeleter(fd)
|
||||||
|
|
||||||
|
// grant a lease with long term (100 seconds) to
|
||||||
|
// avoid early termination during the test.
|
||||||
|
l, err := le.Grant(1, 100)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
items := []LeaseItem{
|
||||||
|
{"foo"},
|
||||||
|
{"bar"},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := le.Attach(l.ID, items); err != nil {
|
||||||
|
t.Fatalf("failed to attach items to the lease: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := le.Detach(l.ID, items[0:1]); err != nil {
|
||||||
|
t.Fatalf("failed to de-attach items to the lease: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l = le.Lookup(l.ID)
|
||||||
|
if len(l.itemSet) != 1 {
|
||||||
|
t.Fatalf("len(l.itemSet) = %d, failed to de-attach items", len(l.itemSet))
|
||||||
|
}
|
||||||
|
if _, ok := l.itemSet[LeaseItem{"bar"}]; !ok {
|
||||||
|
t.Fatalf("de-attached wrong item, want %q exists", "bar")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestLessorRecover ensures Lessor recovers leases from
|
// TestLessorRecover ensures Lessor recovers leases from
|
||||||
// persist backend.
|
// persist backend.
|
||||||
func TestLessorRecover(t *testing.T) {
|
func TestLessorRecover(t *testing.T) {
|
||||||
|
@ -312,8 +312,13 @@ func (s *store) restore() error {
|
|||||||
// restore index
|
// restore index
|
||||||
switch {
|
switch {
|
||||||
case isTombstone(key):
|
case isTombstone(key):
|
||||||
// TODO: De-attach keys from lease if necessary
|
|
||||||
s.kvindex.Tombstone(kv.Key, rev)
|
s.kvindex.Tombstone(kv.Key, rev)
|
||||||
|
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||||
|
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||||
|
if err != nil && err != lease.ErrLeaseNotFound {
|
||||||
|
log.Fatalf("storage: unexpected Detach error %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||||
@ -413,11 +418,21 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
|
|||||||
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||||
rev := s.currentRev.main + 1
|
rev := s.currentRev.main + 1
|
||||||
c := rev
|
c := rev
|
||||||
|
oldLease := lease.NoLease
|
||||||
|
|
||||||
// if the key exists before, use its previous created
|
// if the key exists before, use its previous created and
|
||||||
_, created, ver, err := s.kvindex.Get(key, rev)
|
// get its previous leaseID
|
||||||
|
grev, created, ver, err := s.kvindex.Get(key, rev)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c = created.main
|
c = created.main
|
||||||
|
ibytes := newRevBytes()
|
||||||
|
revToBytes(grev, ibytes)
|
||||||
|
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
|
||||||
|
var kv storagepb.KeyValue
|
||||||
|
if err := kv.Unmarshal(vs[0]); err != nil {
|
||||||
|
log.Fatalf("storage: cannot unmarshal value: %v", err)
|
||||||
|
}
|
||||||
|
oldLease = lease.LeaseID(kv.Lease)
|
||||||
}
|
}
|
||||||
|
|
||||||
ibytes := newRevBytes()
|
ibytes := newRevBytes()
|
||||||
@ -443,15 +458,22 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
s.changes = append(s.changes, kv)
|
s.changes = append(s.changes, kv)
|
||||||
s.currentRev.sub += 1
|
s.currentRev.sub += 1
|
||||||
|
|
||||||
|
if oldLease != lease.NoLease {
|
||||||
|
if s.le == nil {
|
||||||
|
panic("no lessor to detach lease")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||||
|
if err != nil {
|
||||||
|
panic("unexpected error from lease detach")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if leaseID != lease.NoLease {
|
if leaseID != lease.NoLease {
|
||||||
if s.le == nil {
|
if s.le == nil {
|
||||||
panic("no lessor to attach lease")
|
panic("no lessor to attach lease")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: validate the existence of lease before call Attach.
|
|
||||||
// We need to ensure put always successful since we do not want
|
|
||||||
// to handle abortion for txn request. We need to ensure all requests
|
|
||||||
// inside the txn can execute without error before executing them.
|
|
||||||
err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|
err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("unexpected error from lease Attach")
|
panic("unexpected error from lease Attach")
|
||||||
@ -464,19 +486,19 @@ func (s *store) deleteRange(key, end []byte) int64 {
|
|||||||
if s.currentRev.sub > 0 {
|
if s.currentRev.sub > 0 {
|
||||||
rrev += 1
|
rrev += 1
|
||||||
}
|
}
|
||||||
keys, _ := s.kvindex.Range(key, end, rrev)
|
keys, revs := s.kvindex.Range(key, end, rrev)
|
||||||
|
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range keys {
|
for i, key := range keys {
|
||||||
s.delete(key)
|
s.delete(key, revs[i])
|
||||||
}
|
}
|
||||||
return int64(len(keys))
|
return int64(len(keys))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) delete(key []byte) {
|
func (s *store) delete(key []byte, rev revision) {
|
||||||
mainrev := s.currentRev.main + 1
|
mainrev := s.currentRev.main + 1
|
||||||
|
|
||||||
ibytes := newRevBytes()
|
ibytes := newRevBytes()
|
||||||
@ -500,7 +522,21 @@ func (s *store) delete(key []byte) {
|
|||||||
s.changes = append(s.changes, kv)
|
s.changes = append(s.changes, kv)
|
||||||
s.currentRev.sub += 1
|
s.currentRev.sub += 1
|
||||||
|
|
||||||
// TODO: De-attach keys from lease if necessary
|
ibytes = newRevBytes()
|
||||||
|
revToBytes(rev, ibytes)
|
||||||
|
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
|
||||||
|
|
||||||
|
kv.Reset()
|
||||||
|
if err := kv.Unmarshal(vs[0]); err != nil {
|
||||||
|
log.Fatalf("storage: cannot unmarshal value: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||||
|
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("storage: cannot detach %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) getChanges() []storagepb.KeyValue {
|
func (s *store) getChanges() []storagepb.KeyValue {
|
||||||
|
@ -45,9 +45,22 @@ func TestStoreRev(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStorePut(t *testing.T) {
|
func TestStorePut(t *testing.T) {
|
||||||
|
kv := storagepb.KeyValue{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
CreateRevision: 1,
|
||||||
|
ModRevision: 2,
|
||||||
|
Version: 1,
|
||||||
|
}
|
||||||
|
kvb, err := kv.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
rev revision
|
rev revision
|
||||||
r indexGetResp
|
r indexGetResp
|
||||||
|
rr *rangeResp
|
||||||
|
|
||||||
wrev revision
|
wrev revision
|
||||||
wkey []byte
|
wkey []byte
|
||||||
@ -57,6 +70,8 @@ func TestStorePut(t *testing.T) {
|
|||||||
{
|
{
|
||||||
revision{1, 0},
|
revision{1, 0},
|
||||||
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
|
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
|
||||||
|
nil,
|
||||||
|
|
||||||
revision{1, 1},
|
revision{1, 1},
|
||||||
newTestKeyBytes(revision{2, 0}, false),
|
newTestKeyBytes(revision{2, 0}, false),
|
||||||
storagepb.KeyValue{
|
storagepb.KeyValue{
|
||||||
@ -72,6 +87,8 @@ func TestStorePut(t *testing.T) {
|
|||||||
{
|
{
|
||||||
revision{1, 1},
|
revision{1, 1},
|
||||||
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
|
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
|
||||||
|
&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
|
||||||
|
|
||||||
revision{1, 2},
|
revision{1, 2},
|
||||||
newTestKeyBytes(revision{2, 1}, false),
|
newTestKeyBytes(revision{2, 1}, false),
|
||||||
storagepb.KeyValue{
|
storagepb.KeyValue{
|
||||||
@ -87,6 +104,8 @@ func TestStorePut(t *testing.T) {
|
|||||||
{
|
{
|
||||||
revision{2, 0},
|
revision{2, 0},
|
||||||
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
|
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
|
||||||
|
&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
|
||||||
|
|
||||||
revision{2, 1},
|
revision{2, 1},
|
||||||
newTestKeyBytes(revision{3, 0}, false),
|
newTestKeyBytes(revision{3, 0}, false),
|
||||||
storagepb.KeyValue{
|
storagepb.KeyValue{
|
||||||
@ -108,6 +127,9 @@ func TestStorePut(t *testing.T) {
|
|||||||
s.currentRev = tt.rev
|
s.currentRev = tt.rev
|
||||||
s.tx = b.BatchTx()
|
s.tx = b.BatchTx()
|
||||||
fi.indexGetRespc <- tt.r
|
fi.indexGetRespc <- tt.r
|
||||||
|
if tt.rr != nil {
|
||||||
|
b.tx.rangeRespc <- *tt.rr
|
||||||
|
}
|
||||||
|
|
||||||
s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
|
s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
|
||||||
|
|
||||||
@ -115,9 +137,18 @@ func TestStorePut(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: marshal err = %v, want nil", i, err)
|
t.Errorf("#%d: marshal err = %v, want nil", i, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wact := []testutil.Action{
|
wact := []testutil.Action{
|
||||||
{"put", []interface{}{keyBucketName, tt.wkey, data}},
|
{"put", []interface{}{keyBucketName, tt.wkey, data}},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tt.rr != nil {
|
||||||
|
wact = []testutil.Action{
|
||||||
|
{"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}},
|
||||||
|
{"put", []interface{}{keyBucketName, tt.wkey, data}},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
||||||
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
|
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
|
||||||
}
|
}
|
||||||
@ -208,9 +239,23 @@ func TestStoreRange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStoreDeleteRange(t *testing.T) {
|
func TestStoreDeleteRange(t *testing.T) {
|
||||||
|
key := newTestKeyBytes(revision{2, 0}, false)
|
||||||
|
kv := storagepb.KeyValue{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
CreateRevision: 1,
|
||||||
|
ModRevision: 2,
|
||||||
|
Version: 1,
|
||||||
|
}
|
||||||
|
kvb, err := kv.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
rev revision
|
rev revision
|
||||||
r indexRangeResp
|
r indexRangeResp
|
||||||
|
rr rangeResp
|
||||||
|
|
||||||
wkey []byte
|
wkey []byte
|
||||||
wrev revision
|
wrev revision
|
||||||
@ -220,6 +265,8 @@ func TestStoreDeleteRange(t *testing.T) {
|
|||||||
{
|
{
|
||||||
revision{2, 0},
|
revision{2, 0},
|
||||||
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
|
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
|
||||||
|
rangeResp{[][]byte{key}, [][]byte{kvb}},
|
||||||
|
|
||||||
newTestKeyBytes(revision{3, 0}, true),
|
newTestKeyBytes(revision{3, 0}, true),
|
||||||
revision{2, 1},
|
revision{2, 1},
|
||||||
2,
|
2,
|
||||||
@ -228,6 +275,8 @@ func TestStoreDeleteRange(t *testing.T) {
|
|||||||
{
|
{
|
||||||
revision{2, 1},
|
revision{2, 1},
|
||||||
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
|
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
|
||||||
|
rangeResp{[][]byte{key}, [][]byte{kvb}},
|
||||||
|
|
||||||
newTestKeyBytes(revision{3, 1}, true),
|
newTestKeyBytes(revision{3, 1}, true),
|
||||||
revision{2, 2},
|
revision{2, 2},
|
||||||
3,
|
3,
|
||||||
@ -242,6 +291,7 @@ func TestStoreDeleteRange(t *testing.T) {
|
|||||||
s.currentRev = tt.rev
|
s.currentRev = tt.rev
|
||||||
s.tx = b.BatchTx()
|
s.tx = b.BatchTx()
|
||||||
fi.indexRangeRespc <- tt.r
|
fi.indexRangeRespc <- tt.r
|
||||||
|
b.tx.rangeRespc <- tt.rr
|
||||||
|
|
||||||
n := s.deleteRange([]byte("foo"), []byte("goo"))
|
n := s.deleteRange([]byte("foo"), []byte("goo"))
|
||||||
if n != 1 {
|
if n != 1 {
|
||||||
@ -256,6 +306,7 @@ func TestStoreDeleteRange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
wact := []testutil.Action{
|
wact := []testutil.Action{
|
||||||
{"put", []interface{}{keyBucketName, tt.wkey, data}},
|
{"put", []interface{}{keyBucketName, tt.wkey, data}},
|
||||||
|
{"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}},
|
||||||
}
|
}
|
||||||
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
||||||
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
|
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user