Merge pull request #13060 from serathius/limit

etcdserver:  Fix invalid count returned on Range with Limit
This commit is contained in:
Piotr Tabor 2021-06-01 15:56:29 +02:00 committed by GitHub
commit 0edb69ae55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 36 deletions

View File

@ -25,8 +25,8 @@ import (
type index interface { type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision) Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64, limit int) []revision Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
CountRevisions(key, end []byte, atRev int64, limit int) int CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision) Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision RangeSince(key, end []byte, rev int64) []revision
@ -106,27 +106,27 @@ func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
}) })
} }
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision) { func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
if end == nil { if end == nil {
rev, _, _, err := ti.Get(key, atRev) rev, _, _, err := ti.Get(key, atRev)
if err != nil { if err != nil {
return nil return nil, 0
} }
return []revision{rev} return []revision{rev}, 1
} }
ti.visit(key, end, func(ki *keyIndex) bool { ti.visit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev) if limit <= 0 || len(revs) < limit {
if len(revs) == limit { revs = append(revs, rev)
return false
} }
total++
} }
return true return true
}) })
return revs return revs, total
} }
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int { func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
if end == nil { if end == nil {
_, _, _, err := ti.Get(key, atRev) _, _, _, err := ti.Get(key, atRev)
if err != nil { if err != nil {
@ -138,9 +138,6 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int
ti.visit(key, end, func(ki *keyIndex) bool { ti.visit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil { if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++ total++
if total == limit {
return false
}
} }
return true return true
}) })

View File

@ -221,17 +221,18 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
wrev := int64(4) wrev := int64(4)
tests := []struct { tests := []struct {
limit int64 limit int64
wkvs []mvccpb.KeyValue wcounts int64
wkvs []mvccpb.KeyValue
}{ }{
// no limit // no limit
{-1, kvs}, {-1, 3, kvs},
// no limit // no limit
{0, kvs}, {0, 3, kvs},
{1, kvs[:1]}, {1, 3, kvs[:1]},
{2, kvs[:2]}, {2, 3, kvs[:2]},
{3, kvs}, {3, 3, kvs},
{100, kvs}, {100, 3, kvs},
} }
for i, tt := range tests { for i, tt := range tests {
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit}) r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
@ -248,7 +249,7 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
if r.Count != len(kvs) { if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs)) t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
} }
} else if r.Count != int(tt.limit) { } else if r.Count != int(tt.wcounts) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit) t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
} }
} }

View File

@ -937,12 +937,15 @@ type fakeIndex struct {
indexCompactRespc chan map[revision]struct{} indexCompactRespc chan map[revision]struct{}
} }
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) []revision { func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) {
_, rev := i.Range(key, end, atRev) _, rev := i.Range(key, end, atRev)
return rev if len(rev) >= limit {
rev = rev[:limit]
}
return rev, len(rev)
} }
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int { func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
_, rev := i.Range(key, end, atRev) _, rev := i.Range(key, end, atRev)
return len(rev) return len(rev)
} }

View File

@ -136,14 +136,14 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
} }
if ro.Count { if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev, int(ro.Limit)) total := tr.s.kvindex.CountRevisions(key, end, rev)
tr.trace.Step("count revisions from in-memory index tree") tr.trace.Step("count revisions from in-memory index tree")
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
} }
revpairs := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
tr.trace.Step("range keys from in-memory index tree") tr.trace.Step("range keys from in-memory index tree")
if len(revpairs) == 0 { if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
} }
limit := int(ro.Limit) limit := int(ro.Limit)
@ -176,7 +176,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
} }
} }
tr.trace.Step("range keys from bolt db") tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
} }
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {

View File

@ -1289,8 +1289,9 @@ func TestV3RangeRequest(t *testing.T) {
putKeys []string putKeys []string
reqs []pb.RangeRequest reqs []pb.RangeRequest
wresps [][]string wresps [][]string
wmores []bool wmores []bool
wcounts []int64
}{ }{
// single key // single key
{ {
@ -1307,6 +1308,7 @@ func TestV3RangeRequest(t *testing.T) {
{}, {},
}, },
[]bool{false, false}, []bool{false, false},
[]int64{1, 0},
}, },
// multi-key // multi-key
{ {
@ -1335,6 +1337,7 @@ func TestV3RangeRequest(t *testing.T) {
{"a", "b", "c", "d", "e"}, {"a", "b", "c", "d", "e"},
}, },
[]bool{false, false, false, false, false, false}, []bool{false, false, false, false, false, false},
[]int64{5, 2, 0, 0, 0, 5},
}, },
// revision // revision
{ {
@ -1353,22 +1356,30 @@ func TestV3RangeRequest(t *testing.T) {
{"a", "b"}, {"a", "b"},
}, },
[]bool{false, false, false, false}, []bool{false, false, false, false},
[]int64{5, 0, 1, 2},
}, },
// limit // limit
{ {
[]string{"foo", "bar"}, []string{"a", "b", "c"},
[]pb.RangeRequest{ []pb.RangeRequest{
// more // more
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1}, {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
// no more // half
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2}, {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
// no more
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 3},
// limit over
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 4},
}, },
[][]string{ [][]string{
{"bar"}, {"a"},
{"bar", "foo"}, {"a", "b"},
{"a", "b", "c"},
{"a", "b", "c"},
}, },
[]bool{true, false}, []bool{true, true, false, false},
[]int64{3, 3, 3, 3},
}, },
// sort // sort
{ {
@ -1421,6 +1432,7 @@ func TestV3RangeRequest(t *testing.T) {
{"b", "a", "c", "d"}, {"b", "a", "c", "d"},
}, },
[]bool{true, true, true, true, false, false}, []bool{true, true, true, true, false, false},
[]int64{4, 4, 4, 4, 0, 4},
}, },
// min/max mod rev // min/max mod rev
{ {
@ -1452,6 +1464,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev4", "rev5", "rev6"}, {"rev2", "rev3", "rev4", "rev5", "rev6"},
}, },
[]bool{false, false, false, false}, []bool{false, false, false, false},
[]int64{5, 5, 5, 5},
}, },
// min/max create rev // min/max create rev
{ {
@ -1483,6 +1496,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev6"}, {"rev2", "rev3", "rev6"},
}, },
[]bool{false, false, false, false}, []bool{false, false, false, false},
[]int64{3, 3, 3, 3},
}, },
} }
@ -1516,6 +1530,9 @@ func TestV3RangeRequest(t *testing.T) {
if resp.More != tt.wmores[j] { if resp.More != tt.wmores[j] {
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j]) t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
} }
if resp.GetCount() != tt.wcounts[j] {
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
}
wrev := int64(len(tt.putKeys) + 1) wrev := int64(len(tt.putKeys) + 1)
if resp.Header.Revision != wrev { if resp.Header.Revision != wrev {
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev) t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)