mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6620 from nekto0n/put_update_optimize
Optimize updating key by storing lease in lessor
This commit is contained in:
commit
cb9c77c4ba
@ -76,6 +76,10 @@ type Lessor interface {
|
||||
// If the lease does not exist, an error will be returned.
|
||||
Attach(id LeaseID, items []LeaseItem) error
|
||||
|
||||
// GetLease returns LeaseID for given item.
|
||||
// If no lease found, NoLease value will be returned.
|
||||
GetLease(item LeaseItem) LeaseID
|
||||
|
||||
// Detach detaches given leaseItem from the lease with given LeaseID.
|
||||
// If the lease does not exist, an error will be returned.
|
||||
Detach(id LeaseID, items []LeaseItem) error
|
||||
@ -123,6 +127,8 @@ type lessor struct {
|
||||
// findExpiredLeases and Renew should be the most frequent operations.
|
||||
leaseMap map[LeaseID]*Lease
|
||||
|
||||
itemMap map[LeaseItem]LeaseID
|
||||
|
||||
// When a lease expires, the lessor will delete the
|
||||
// leased range (or key) by the RangeDeleter.
|
||||
rd RangeDeleter
|
||||
@ -149,6 +155,7 @@ func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
|
||||
func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
|
||||
l := &lessor{
|
||||
leaseMap: make(map[LeaseID]*Lease),
|
||||
itemMap: make(map[LeaseItem]LeaseID),
|
||||
b: b,
|
||||
minLeaseTTL: minLeaseTTL,
|
||||
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
||||
@ -361,10 +368,18 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
|
||||
|
||||
for _, it := range items {
|
||||
l.itemSet[it] = struct{}{}
|
||||
le.itemMap[it] = id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (le *lessor) GetLease(item LeaseItem) LeaseID {
|
||||
le.mu.Lock()
|
||||
id := le.itemMap[item]
|
||||
le.mu.Unlock()
|
||||
return id
|
||||
}
|
||||
|
||||
// Detach detaches items from the lease with given ID.
|
||||
// If the given lease does not exist, an error will be returned.
|
||||
func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
||||
@ -378,6 +393,7 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
||||
|
||||
for _, it := range items {
|
||||
delete(l.itemSet, it)
|
||||
delete(le.itemMap, it)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -389,7 +405,7 @@ func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
|
||||
le.b = b
|
||||
le.rd = rd
|
||||
le.leaseMap = make(map[LeaseID]*Lease)
|
||||
|
||||
le.itemMap = make(map[LeaseItem]LeaseID)
|
||||
le.initAndRecover()
|
||||
}
|
||||
|
||||
@ -560,6 +576,7 @@ func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
|
||||
|
||||
func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
|
||||
|
||||
func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 }
|
||||
func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
|
||||
|
||||
func (fl *FakeLessor) Promote(extend time.Duration) {}
|
||||
|
@ -517,17 +517,10 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
|
||||
// if the key exists before, use its previous created and
|
||||
// get its previous leaseID
|
||||
grev, created, ver, err := s.kvindex.Get(key, rev)
|
||||
_, created, ver, err := s.kvindex.Get(key, rev)
|
||||
if err == nil {
|
||||
c = created.main
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(grev, ibytes)
|
||||
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
|
||||
var kv mvccpb.KeyValue
|
||||
if err = kv.Unmarshal(vs[0]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal value: %v", err)
|
||||
}
|
||||
oldLease = lease.LeaseID(kv.Lease)
|
||||
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
|
||||
}
|
||||
|
||||
ibytes := newRevBytes()
|
||||
@ -619,17 +612,11 @@ func (s *store) delete(key []byte, rev revision) {
|
||||
s.changes = append(s.changes, kv)
|
||||
s.currentRev.sub += 1
|
||||
|
||||
ibytes = newRevBytes()
|
||||
revToBytes(rev, ibytes)
|
||||
_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
|
||||
item := lease.LeaseItem{Key: string(key)}
|
||||
leaseID := s.le.GetLease(item)
|
||||
|
||||
kv.Reset()
|
||||
if err = kv.Unmarshal(vs[0]); err != nil {
|
||||
plog.Fatalf("cannot unmarshal value: %v", err)
|
||||
}
|
||||
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if leaseID != lease.NoLease {
|
||||
err = s.le.Detach(leaseID, []lease.LeaseItem{item})
|
||||
if err != nil {
|
||||
plog.Errorf("cannot detach %v", err)
|
||||
}
|
||||
|
@ -45,6 +45,23 @@ func BenchmarkStorePut(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key
|
||||
func BenchmarkStorePutUpdate(b *testing.B) {
|
||||
var i fakeConsistentIndex
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(be, &lease.FakeLessor{}, &i)
|
||||
defer cleanup(s, be, tmpPath)
|
||||
|
||||
// arbitrary number of bytes
|
||||
keys := createBytesSlice(64, 1)
|
||||
vals := createBytesSlice(1024, 1)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Put(keys[0], vals[0], lease.NoLease)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkStoreTxnPut benchmarks the Put operation
|
||||
// with transaction begin and end, where transaction involves
|
||||
// some synchronization operations, such as mutex locking.
|
||||
|
@ -144,7 +144,6 @@ func TestStorePut(t *testing.T) {
|
||||
|
||||
if tt.rr != nil {
|
||||
wact = []testutil.Action{
|
||||
{"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}},
|
||||
{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
|
||||
}
|
||||
}
|
||||
@ -306,7 +305,6 @@ func TestStoreDeleteRange(t *testing.T) {
|
||||
}
|
||||
wact := []testutil.Action{
|
||||
{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
|
||||
{"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}},
|
||||
}
|
||||
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
|
||||
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
|
||||
|
Loading…
x
Reference in New Issue
Block a user