mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11990 from tangcong/add-limit-for-revisions
mvcc: push down RangeOptions.limit argv into index tree to reduce memory overhead
This commit is contained in:
commit
1c1029ecba
@ -121,6 +121,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
|
||||
- Add [missing CRC checksum check in WAL validate method otherwise causes panic](https://github.com/etcd-io/etcd/pull/11924).
|
||||
- See https://github.com/etcd-io/etcd/issues/11918.
|
||||
- Improve logging around snapshot send and receive.
|
||||
- [Push down RangeOptions.limit argv into index tree to reduce memory overhead](https://github.com/etcd-io/etcd/pull/11990).
|
||||
|
||||
### Package `embed`
|
||||
|
||||
|
@ -25,8 +25,8 @@ import (
|
||||
type index interface {
|
||||
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
|
||||
Range(key, end []byte, atRev int64) ([][]byte, []revision)
|
||||
Revisions(key, end []byte, atRev int64) []revision
|
||||
CountRevisions(key, end []byte, atRev int64) int
|
||||
Revisions(key, end []byte, atRev int64, limit int) []revision
|
||||
CountRevisions(key, end []byte, atRev int64, limit int) int
|
||||
Put(key []byte, rev revision)
|
||||
Tombstone(key []byte, rev revision) error
|
||||
RangeSince(key, end []byte, rev int64) []revision
|
||||
@ -89,7 +89,7 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
|
||||
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
|
||||
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
|
||||
|
||||
ti.RLock()
|
||||
@ -99,12 +99,14 @@ func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
|
||||
if len(endi.key) > 0 && !item.Less(endi) {
|
||||
return false
|
||||
}
|
||||
f(item.(*keyIndex))
|
||||
if !f(item.(*keyIndex)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
|
||||
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision) {
|
||||
if end == nil {
|
||||
rev, _, _, err := ti.Get(key, atRev)
|
||||
if err != nil {
|
||||
@ -112,15 +114,19 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
|
||||
}
|
||||
return []revision{rev}
|
||||
}
|
||||
ti.visit(key, end, func(ki *keyIndex) {
|
||||
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||
revs = append(revs, rev)
|
||||
if len(revs) == limit {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return revs
|
||||
}
|
||||
|
||||
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
||||
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
|
||||
if end == nil {
|
||||
_, _, _, err := ti.Get(key, atRev)
|
||||
if err != nil {
|
||||
@ -129,10 +135,14 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
||||
return 1
|
||||
}
|
||||
total := 0
|
||||
ti.visit(key, end, func(ki *keyIndex) {
|
||||
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||
total++
|
||||
if total == limit {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return total
|
||||
}
|
||||
@ -145,11 +155,12 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
|
||||
}
|
||||
return [][]byte{key}, []revision{rev}
|
||||
}
|
||||
ti.visit(key, end, func(ki *keyIndex) {
|
||||
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||
revs = append(revs, rev)
|
||||
keys = append(keys, ki.key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return keys, revs
|
||||
}
|
||||
|
@ -242,8 +242,12 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
||||
if r.Rev != wrev {
|
||||
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
|
||||
}
|
||||
if r.Count != len(kvs) {
|
||||
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
|
||||
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
|
||||
if r.Count != len(kvs) {
|
||||
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
|
||||
}
|
||||
} else if r.Count != int(tt.limit) {
|
||||
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -936,12 +936,12 @@ type fakeIndex struct {
|
||||
indexCompactRespc chan map[revision]struct{}
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision {
|
||||
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) []revision {
|
||||
_, rev := i.Range(key, end, atRev)
|
||||
return rev
|
||||
}
|
||||
|
||||
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
||||
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
|
||||
_, rev := i.Range(key, end, atRev)
|
||||
return len(rev)
|
||||
}
|
||||
|
@ -126,11 +126,11 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
||||
}
|
||||
if ro.Count {
|
||||
total := tr.s.kvindex.CountRevisions(key, end, rev)
|
||||
total := tr.s.kvindex.CountRevisions(key, end, rev, int(ro.Limit))
|
||||
tr.trace.Step("count revisions from in-memory index tree")
|
||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
||||
revpairs := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
||||
tr.trace.Step("range keys from in-memory index tree")
|
||||
if len(revpairs) == 0 {
|
||||
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user