mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15137 from fuweid/backport-11990-to-3.4
[3.4] mvcc: push down RangeOptions.limit argv into index tree to reduce memory overhead
This commit is contained in:
commit
e4b154231c
@ -1287,6 +1287,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
|
|
||||||
wresps [][]string
|
wresps [][]string
|
||||||
wmores []bool
|
wmores []bool
|
||||||
|
wcounts []int64
|
||||||
}{
|
}{
|
||||||
// single key
|
// single key
|
||||||
{
|
{
|
||||||
@ -1303,6 +1304,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{},
|
{},
|
||||||
},
|
},
|
||||||
[]bool{false, false},
|
[]bool{false, false},
|
||||||
|
[]int64{1, 0},
|
||||||
},
|
},
|
||||||
// multi-key
|
// multi-key
|
||||||
{
|
{
|
||||||
@ -1331,6 +1333,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{"a", "b", "c", "d", "e"},
|
{"a", "b", "c", "d", "e"},
|
||||||
},
|
},
|
||||||
[]bool{false, false, false, false, false, false},
|
[]bool{false, false, false, false, false, false},
|
||||||
|
[]int64{5, 2, 0, 0, 0, 5},
|
||||||
},
|
},
|
||||||
// revision
|
// revision
|
||||||
{
|
{
|
||||||
@ -1349,22 +1352,30 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{"a", "b"},
|
{"a", "b"},
|
||||||
},
|
},
|
||||||
[]bool{false, false, false, false},
|
[]bool{false, false, false, false},
|
||||||
|
[]int64{5, 0, 1, 2},
|
||||||
},
|
},
|
||||||
// limit
|
// limit
|
||||||
{
|
{
|
||||||
[]string{"foo", "bar"},
|
[]string{"a", "b", "c"},
|
||||||
[]pb.RangeRequest{
|
[]pb.RangeRequest{
|
||||||
// more
|
// more
|
||||||
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
|
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
|
||||||
// no more
|
// half
|
||||||
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
|
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
|
||||||
|
// no more
|
||||||
|
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 3},
|
||||||
|
// limit over
|
||||||
|
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 4},
|
||||||
},
|
},
|
||||||
|
|
||||||
[][]string{
|
[][]string{
|
||||||
{"bar"},
|
{"a"},
|
||||||
{"bar", "foo"},
|
{"a", "b"},
|
||||||
|
{"a", "b", "c"},
|
||||||
|
{"a", "b", "c"},
|
||||||
},
|
},
|
||||||
[]bool{true, false},
|
[]bool{true, true, false, false},
|
||||||
|
[]int64{3, 3, 3, 3},
|
||||||
},
|
},
|
||||||
// sort
|
// sort
|
||||||
{
|
{
|
||||||
@ -1417,6 +1428,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{"b", "a", "c", "d"},
|
{"b", "a", "c", "d"},
|
||||||
},
|
},
|
||||||
[]bool{true, true, true, true, false, false},
|
[]bool{true, true, true, true, false, false},
|
||||||
|
[]int64{4, 4, 4, 4, 0, 4},
|
||||||
},
|
},
|
||||||
// min/max mod rev
|
// min/max mod rev
|
||||||
{
|
{
|
||||||
@ -1448,6 +1460,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{"rev2", "rev3", "rev4", "rev5", "rev6"},
|
{"rev2", "rev3", "rev4", "rev5", "rev6"},
|
||||||
},
|
},
|
||||||
[]bool{false, false, false, false},
|
[]bool{false, false, false, false},
|
||||||
|
[]int64{5, 5, 5, 5},
|
||||||
},
|
},
|
||||||
// min/max create rev
|
// min/max create rev
|
||||||
{
|
{
|
||||||
@ -1479,6 +1492,7 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
{"rev2", "rev3", "rev6"},
|
{"rev2", "rev3", "rev6"},
|
||||||
},
|
},
|
||||||
[]bool{false, false, false, false},
|
[]bool{false, false, false, false},
|
||||||
|
[]int64{3, 3, 3, 3},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1512,6 +1526,9 @@ func TestV3RangeRequest(t *testing.T) {
|
|||||||
if resp.More != tt.wmores[j] {
|
if resp.More != tt.wmores[j] {
|
||||||
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
|
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
|
||||||
}
|
}
|
||||||
|
if resp.GetCount() != tt.wcounts[j] {
|
||||||
|
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
|
||||||
|
}
|
||||||
wrev := int64(len(tt.putKeys) + 1)
|
wrev := int64(len(tt.putKeys) + 1)
|
||||||
if resp.Header.Revision != wrev {
|
if resp.Header.Revision != wrev {
|
||||||
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
|
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
type index interface {
|
type index interface {
|
||||||
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
|
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
|
||||||
Range(key, end []byte, atRev int64) ([][]byte, []revision)
|
Range(key, end []byte, atRev int64) ([][]byte, []revision)
|
||||||
Revisions(key, end []byte, atRev int64) []revision
|
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
|
||||||
CountRevisions(key, end []byte, atRev int64) int
|
CountRevisions(key, end []byte, atRev int64) int
|
||||||
Put(key []byte, rev revision)
|
Put(key []byte, rev revision)
|
||||||
Tombstone(key []byte, rev revision) error
|
Tombstone(key []byte, rev revision) error
|
||||||
@ -89,7 +89,7 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
|
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
|
||||||
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
|
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
|
||||||
|
|
||||||
ti.RLock()
|
ti.RLock()
|
||||||
@ -99,25 +99,31 @@ func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
|
|||||||
if len(endi.key) > 0 && !item.Less(endi) {
|
if len(endi.key) > 0 && !item.Less(endi) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
f(item.(*keyIndex))
|
if !f(item.(*keyIndex)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
|
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
|
||||||
if end == nil {
|
if end == nil {
|
||||||
rev, _, _, err := ti.Get(key, atRev)
|
rev, _, _, err := ti.Get(key, atRev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil, 0
|
||||||
}
|
}
|
||||||
return []revision{rev}
|
return []revision{rev}, 1
|
||||||
}
|
}
|
||||||
ti.visit(key, end, func(ki *keyIndex) {
|
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||||
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||||
|
if limit <= 0 || len(revs) < limit {
|
||||||
revs = append(revs, rev)
|
revs = append(revs, rev)
|
||||||
}
|
}
|
||||||
|
total++
|
||||||
|
}
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return revs
|
return revs, total
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
||||||
@ -129,10 +135,11 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
total := 0
|
total := 0
|
||||||
ti.visit(key, end, func(ki *keyIndex) {
|
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||||
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||||
total++
|
total++
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return total
|
return total
|
||||||
}
|
}
|
||||||
@ -145,11 +152,12 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
|
|||||||
}
|
}
|
||||||
return [][]byte{key}, []revision{rev}
|
return [][]byte{key}, []revision{rev}
|
||||||
}
|
}
|
||||||
ti.visit(key, end, func(ki *keyIndex) {
|
ti.visit(key, end, func(ki *keyIndex) bool {
|
||||||
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
|
||||||
revs = append(revs, rev)
|
revs = append(revs, rev)
|
||||||
keys = append(keys, ki.key)
|
keys = append(keys, ki.key)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return keys, revs
|
return keys, revs
|
||||||
}
|
}
|
||||||
|
@ -206,60 +206,80 @@ func TestIndexRevision(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
key, end []byte
|
key, end []byte
|
||||||
atRev int64
|
atRev int64
|
||||||
|
limit int
|
||||||
wrevs []revision
|
wrevs []revision
|
||||||
wcounts int
|
wcounts int
|
||||||
}{
|
}{
|
||||||
// single key that not found
|
// single key that not found
|
||||||
{
|
{
|
||||||
[]byte("bar"), nil, 6, nil, 0,
|
[]byte("bar"), nil, 6, 0, nil, 0,
|
||||||
},
|
},
|
||||||
// single key that found
|
// single key that found
|
||||||
{
|
{
|
||||||
[]byte("foo"), nil, 6, []revision{{main: 6}}, 1,
|
[]byte("foo"), nil, 6, 0, []revision{{main: 6}}, 1,
|
||||||
},
|
},
|
||||||
// various range keys, fixed atRev
|
// various range keys, fixed atRev, unlimited
|
||||||
{
|
{
|
||||||
[]byte("foo"), []byte("foo1"), 6, []revision{{main: 6}}, 1,
|
[]byte("foo"), []byte("foo1"), 6, 0, []revision{{main: 6}}, 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo"), []byte("foo2"), 6, []revision{{main: 6}, {main: 5}}, 2,
|
[]byte("foo"), []byte("foo2"), 6, 0, []revision{{main: 6}, {main: 5}}, 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo"), []byte("fop"), 6, []revision{{main: 6}, {main: 5}, {main: 4}}, 3,
|
[]byte("foo"), []byte("fop"), 6, 0, []revision{{main: 6}, {main: 5}, {main: 4}}, 3,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 6, []revision{{main: 5}, {main: 4}}, 2,
|
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo2"), []byte("fop"), 6, []revision{{main: 4}}, 1,
|
[]byte("foo2"), []byte("fop"), 6, 0, []revision{{main: 4}}, 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo3"), []byte("fop"), 6, nil, 0,
|
[]byte("foo3"), []byte("fop"), 6, 0, nil, 0,
|
||||||
},
|
},
|
||||||
// fixed range keys, various atRev
|
// fixed range keys, various atRev, unlimited
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 1, nil, 0,
|
[]byte("foo1"), []byte("fop"), 1, 0, nil, 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 2, []revision{{main: 2}}, 1,
|
[]byte("foo1"), []byte("fop"), 2, 0, []revision{{main: 2}}, 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 3, []revision{{main: 2}, {main: 3}}, 2,
|
[]byte("foo1"), []byte("fop"), 3, 0, []revision{{main: 2}, {main: 3}}, 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 4, []revision{{main: 2}, {main: 4}}, 2,
|
[]byte("foo1"), []byte("fop"), 4, 0, []revision{{main: 2}, {main: 4}}, 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 5, []revision{{main: 5}, {main: 4}}, 2,
|
[]byte("foo1"), []byte("fop"), 5, 0, []revision{{main: 5}, {main: 4}}, 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]byte("foo1"), []byte("fop"), 6, []revision{{main: 5}, {main: 4}}, 2,
|
[]byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2,
|
||||||
|
},
|
||||||
|
// fixed range keys, fixed atRev, various limit
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 6, 1, []revision{{main: 6}}, 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 6, 2, []revision{{main: 6}, {main: 5}}, 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 6, 3, []revision{{main: 6}, {main: 5}, {main: 4}}, 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 3, 1, []revision{{main: 1}}, 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 3, 2, []revision{{main: 1}, {main: 2}}, 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
[]byte("foo"), []byte("fop"), 3, 3, []revision{{main: 1}, {main: 2}, {main: 3}}, 3,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
revs := ti.Revisions(tt.key, tt.end, tt.atRev)
|
revs, _ := ti.Revisions(tt.key, tt.end, tt.atRev, tt.limit)
|
||||||
if !reflect.DeepEqual(revs, tt.wrevs) {
|
if !reflect.DeepEqual(revs, tt.wrevs) {
|
||||||
t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
|
t.Errorf("#%d limit %d: revs = %+v, want %+v", i, tt.limit, revs, tt.wrevs)
|
||||||
}
|
}
|
||||||
count := ti.CountRevisions(tt.key, tt.end, tt.atRev)
|
count := ti.CountRevisions(tt.key, tt.end, tt.atRev)
|
||||||
if count != tt.wcounts {
|
if count != tt.wcounts {
|
||||||
|
@ -220,16 +220,17 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
|||||||
wrev := int64(4)
|
wrev := int64(4)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
limit int64
|
limit int64
|
||||||
|
wcounts int64
|
||||||
wkvs []mvccpb.KeyValue
|
wkvs []mvccpb.KeyValue
|
||||||
}{
|
}{
|
||||||
// no limit
|
// no limit
|
||||||
{-1, kvs},
|
{-1, 3, kvs},
|
||||||
// no limit
|
// no limit
|
||||||
{0, kvs},
|
{0, 3, kvs},
|
||||||
{1, kvs[:1]},
|
{1, 3, kvs[:1]},
|
||||||
{2, kvs[:2]},
|
{2, 3, kvs[:2]},
|
||||||
{3, kvs},
|
{3, 3, kvs},
|
||||||
{100, kvs},
|
{100, 3, kvs},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
|
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
|
||||||
@ -242,9 +243,13 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
|||||||
if r.Rev != wrev {
|
if r.Rev != wrev {
|
||||||
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
|
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
|
||||||
}
|
}
|
||||||
|
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
|
||||||
if r.Count != len(kvs) {
|
if r.Count != len(kvs) {
|
||||||
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
|
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
|
||||||
}
|
}
|
||||||
|
} else if r.Count != int(tt.wcounts) {
|
||||||
|
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -936,9 +936,12 @@ type fakeIndex struct {
|
|||||||
indexCompactRespc chan map[revision]struct{}
|
indexCompactRespc chan map[revision]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision {
|
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) {
|
||||||
_, rev := i.Range(key, end, atRev)
|
_, rev := i.Range(key, end, atRev)
|
||||||
return rev
|
if len(rev) >= limit {
|
||||||
|
rev = rev[:limit]
|
||||||
|
}
|
||||||
|
return rev, len(rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
|
||||||
|
@ -130,10 +130,10 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
tr.trace.Step("count revisions from in-memory index tree")
|
tr.trace.Step("count revisions from in-memory index tree")
|
||||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
||||||
tr.trace.Step("range keys from in-memory index tree")
|
tr.trace.Step("range keys from in-memory index tree")
|
||||||
if len(revpairs) == 0 {
|
if len(revpairs) == 0 {
|
||||||
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
limit := int(ro.Limit)
|
limit := int(ro.Limit)
|
||||||
@ -176,7 +176,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
tr.trace.Step("range keys from bolt db")
|
tr.trace.Step("range keys from bolt db")
|
||||||
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user