From bfa5e310a9819e758750e004746a10e642cc556f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 3 Feb 2016 15:03:54 -0800 Subject: [PATCH] *: detach keys from leases 1. deatch when a key is removed 2. deatch when the key's lease changes 3. potentially deatch when restroing a tombstone key --- lease/lessor.go | 29 +++++++++++++++++++- lease/lessor_test.go | 39 +++++++++++++++++++++++++++ storage/kvstore.go | 60 ++++++++++++++++++++++++++++++++--------- storage/kvstore_test.go | 51 +++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 13 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index 752f13bb5..8dd5f0fa5 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -72,6 +72,10 @@ type Lessor interface { // If the lease does not exist, an error will be returned. Attach(id LeaseID, items []LeaseItem) error + // Detach detaches given leaseItem from the lease with given LeaseID. + // If the lease does not exist, an error will be returned. + Detach(id LeaseID, items []LeaseItem) error + // Promote promotes the lessor to be the primary lessor. Primary lessor manages // the expiration and renew of leases. Promote() @@ -194,12 +198,14 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { func (le *lessor) Revoke(id LeaseID) error { le.mu.Lock() - defer le.mu.Unlock() l := le.leaseMap[id] if l == nil { + le.mu.Unlock() return ErrLeaseNotFound } + // unlock before doing external work + le.mu.Unlock() if le.rd != nil { for item := range l.itemSet { @@ -207,6 +213,8 @@ func (le *lessor) Revoke(id LeaseID) error { } } + le.mu.Lock() + defer le.mu.Unlock() delete(le.leaseMap, l.ID) l.removeFrom(le.b) @@ -284,6 +292,23 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error { return nil } +// Detach detaches items from the lease with given ID. +// If the given lease does not exist, an error will be returned. +func (le *lessor) Detach(id LeaseID, items []LeaseItem) error { + le.mu.Lock() + defer le.mu.Unlock() + + l := le.leaseMap[id] + if l == nil { + return ErrLeaseNotFound + } + + for _, it := range items { + delete(l.itemSet, it) + } + return nil +} + func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) { le.mu.Lock() defer le.mu.Unlock() @@ -462,6 +487,8 @@ func (fl *FakeLessor) Revoke(id LeaseID) error { return nil } func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil } +func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil } + func (fl *FakeLessor) Promote() {} func (fl *FakeLessor) Demote() {} diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 254fa772b..ef41c6452 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -151,6 +151,45 @@ func TestLessorRenew(t *testing.T) { } } +func TestLessorDetach(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + fd := &fakeDeleter{} + + le := newLessor(be) + le.SetRangeDeleter(fd) + + // grant a lease with long term (100 seconds) to + // avoid early termination during the test. + l, err := le.Grant(1, 100) + if err != nil { + t.Fatalf("could not grant lease for 100s ttl (%v)", err) + } + + items := []LeaseItem{ + {"foo"}, + {"bar"}, + } + + if err := le.Attach(l.ID, items); err != nil { + t.Fatalf("failed to attach items to the lease: %v", err) + } + + if err := le.Detach(l.ID, items[0:1]); err != nil { + t.Fatalf("failed to de-attach items to the lease: %v", err) + } + + l = le.Lookup(l.ID) + if len(l.itemSet) != 1 { + t.Fatalf("len(l.itemSet) = %d, failed to de-attach items", len(l.itemSet)) + } + if _, ok := l.itemSet[LeaseItem{"bar"}]; !ok { + t.Fatalf("de-attached wrong item, want %q exists", "bar") + } +} + // TestLessorRecover ensures Lessor recovers leases from // persist backend. func TestLessorRecover(t *testing.T) { diff --git a/storage/kvstore.go b/storage/kvstore.go index 626d9ca7f..200426f5c 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -312,8 +312,13 @@ func (s *store) restore() error { // restore index switch { case isTombstone(key): - // TODO: De-attach keys from lease if necessary s.kvindex.Tombstone(kv.Key, rev) + if lease.LeaseID(kv.Lease) != lease.NoLease { + err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) + if err != nil && err != lease.ErrLeaseNotFound { + log.Fatalf("storage: unexpected Detach error %v", err) + } + } default: s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) if lease.LeaseID(kv.Lease) != lease.NoLease { @@ -413,11 +418,21 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage func (s *store) put(key, value []byte, leaseID lease.LeaseID) { rev := s.currentRev.main + 1 c := rev + oldLease := lease.NoLease - // if the key exists before, use its previous created - _, created, ver, err := s.kvindex.Get(key, rev) + // if the key exists before, use its previous created and + // get its previous leaseID + grev, created, ver, err := s.kvindex.Get(key, rev) if err == nil { c = created.main + ibytes := newRevBytes() + revToBytes(grev, ibytes) + _, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0) + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal value: %v", err) + } + oldLease = lease.LeaseID(kv.Lease) } ibytes := newRevBytes() @@ -443,15 +458,22 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { s.changes = append(s.changes, kv) s.currentRev.sub += 1 + if oldLease != lease.NoLease { + if s.le == nil { + panic("no lessor to detach lease") + } + + err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) + if err != nil { + panic("unexpected error from lease detach") + } + } + if leaseID != lease.NoLease { if s.le == nil { panic("no lessor to attach lease") } - // TODO: validate the existence of lease before call Attach. - // We need to ensure put always successful since we do not want - // to handle abortion for txn request. We need to ensure all requests - // inside the txn can execute without error before executing them. err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) if err != nil { panic("unexpected error from lease Attach") @@ -464,19 +486,19 @@ func (s *store) deleteRange(key, end []byte) int64 { if s.currentRev.sub > 0 { rrev += 1 } - keys, _ := s.kvindex.Range(key, end, rrev) + keys, revs := s.kvindex.Range(key, end, rrev) if len(keys) == 0 { return 0 } - for _, key := range keys { - s.delete(key) + for i, key := range keys { + s.delete(key, revs[i]) } return int64(len(keys)) } -func (s *store) delete(key []byte) { +func (s *store) delete(key []byte, rev revision) { mainrev := s.currentRev.main + 1 ibytes := newRevBytes() @@ -500,7 +522,21 @@ func (s *store) delete(key []byte) { s.changes = append(s.changes, kv) s.currentRev.sub += 1 - // TODO: De-attach keys from lease if necessary + ibytes = newRevBytes() + revToBytes(rev, ibytes) + _, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0) + + kv.Reset() + if err := kv.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal value: %v", err) + } + + if lease.LeaseID(kv.Lease) != lease.NoLease { + err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) + if err != nil { + log.Fatalf("storage: cannot detach %v", err) + } + } } func (s *store) getChanges() []storagepb.KeyValue { diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index b616545d7..41b3c076b 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -45,9 +45,22 @@ func TestStoreRev(t *testing.T) { } func TestStorePut(t *testing.T) { + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, + } + kvb, err := kv.Marshal() + if err != nil { + t.Fatal(err) + } + tests := []struct { rev revision r indexGetResp + rr *rangeResp wrev revision wkey []byte @@ -57,6 +70,8 @@ func TestStorePut(t *testing.T) { { revision{1, 0}, indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, + nil, + revision{1, 1}, newTestKeyBytes(revision{2, 0}, false), storagepb.KeyValue{ @@ -72,6 +87,8 @@ func TestStorePut(t *testing.T) { { revision{1, 1}, indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, + &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, + revision{1, 2}, newTestKeyBytes(revision{2, 1}, false), storagepb.KeyValue{ @@ -87,6 +104,8 @@ func TestStorePut(t *testing.T) { { revision{2, 0}, indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, + &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, + revision{2, 1}, newTestKeyBytes(revision{3, 0}, false), storagepb.KeyValue{ @@ -108,6 +127,9 @@ func TestStorePut(t *testing.T) { s.currentRev = tt.rev s.tx = b.BatchTx() fi.indexGetRespc <- tt.r + if tt.rr != nil { + b.tx.rangeRespc <- *tt.rr + } s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) @@ -115,9 +137,18 @@ func TestStorePut(t *testing.T) { if err != nil { t.Errorf("#%d: marshal err = %v, want nil", i, err) } + wact := []testutil.Action{ {"put", []interface{}{keyBucketName, tt.wkey, data}}, } + + if tt.rr != nil { + wact = []testutil.Action{ + {"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}}, + {"put", []interface{}{keyBucketName, tt.wkey, data}}, + } + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) } @@ -208,9 +239,23 @@ func TestStoreRange(t *testing.T) { } func TestStoreDeleteRange(t *testing.T) { + key := newTestKeyBytes(revision{2, 0}, false) + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, + } + kvb, err := kv.Marshal() + if err != nil { + t.Fatal(err) + } + tests := []struct { rev revision r indexRangeResp + rr rangeResp wkey []byte wrev revision @@ -220,6 +265,8 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 0}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, + newTestKeyBytes(revision{3, 0}, true), revision{2, 1}, 2, @@ -228,6 +275,8 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 1}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, + newTestKeyBytes(revision{3, 1}, true), revision{2, 2}, 3, @@ -242,6 +291,7 @@ func TestStoreDeleteRange(t *testing.T) { s.currentRev = tt.rev s.tx = b.BatchTx() fi.indexRangeRespc <- tt.r + b.tx.rangeRespc <- tt.rr n := s.deleteRange([]byte("foo"), []byte("goo")) if n != 1 { @@ -256,6 +306,7 @@ func TestStoreDeleteRange(t *testing.T) { } wact := []testutil.Action{ {"put", []interface{}{keyBucketName, tt.wkey, data}}, + {"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)