mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13561 from horizonzy/code-clean
Code clean: make rangeKeys close to storeTxnRead.
This commit is contained in:
commit
2d7bc2f59e
@ -62,6 +62,61 @@ func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOpti
|
||||
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||
rev := ro.Rev
|
||||
if rev > curRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||
}
|
||||
if rev <= 0 {
|
||||
rev = curRev
|
||||
}
|
||||
if rev < tr.s.compactMainRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
||||
}
|
||||
if ro.Count {
|
||||
total := tr.s.kvindex.CountRevisions(key, end, rev)
|
||||
tr.trace.Step("count revisions from in-memory index tree")
|
||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
||||
tr.trace.Step("range keys from in-memory index tree")
|
||||
if len(revpairs) == 0 {
|
||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
|
||||
limit := int(ro.Limit)
|
||||
if limit <= 0 || limit > len(revpairs) {
|
||||
limit = len(revpairs)
|
||||
}
|
||||
|
||||
kvs := make([]mvccpb.KeyValue, limit)
|
||||
revBytes := newRevBytes()
|
||||
for i, revpair := range revpairs[:len(kvs)] {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
revToBytes(revpair, revBytes)
|
||||
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
tr.s.lg.Fatal(
|
||||
"range failed to find revision pair",
|
||||
zap.Int64("revision-main", revpair.main),
|
||||
zap.Int64("revision-sub", revpair.sub),
|
||||
)
|
||||
}
|
||||
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
||||
tr.s.lg.Fatal(
|
||||
"failed to unmarshal mvccpb.KeyValue",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
tr.trace.Step("range keys from bolt db")
|
||||
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) End() {
|
||||
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
|
||||
tr.s.mu.RUnlock()
|
||||
@ -124,61 +179,6 @@ func (tw *storeTxnWrite) End() {
|
||||
tw.s.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||
rev := ro.Rev
|
||||
if rev > curRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||
}
|
||||
if rev <= 0 {
|
||||
rev = curRev
|
||||
}
|
||||
if rev < tr.s.compactMainRev {
|
||||
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
||||
}
|
||||
if ro.Count {
|
||||
total := tr.s.kvindex.CountRevisions(key, end, rev)
|
||||
tr.trace.Step("count revisions from in-memory index tree")
|
||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
||||
tr.trace.Step("range keys from in-memory index tree")
|
||||
if len(revpairs) == 0 {
|
||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
|
||||
limit := int(ro.Limit)
|
||||
if limit <= 0 || limit > len(revpairs) {
|
||||
limit = len(revpairs)
|
||||
}
|
||||
|
||||
kvs := make([]mvccpb.KeyValue, limit)
|
||||
revBytes := newRevBytes()
|
||||
for i, revpair := range revpairs[:len(kvs)] {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
revToBytes(revpair, revBytes)
|
||||
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
tr.s.lg.Fatal(
|
||||
"range failed to find revision pair",
|
||||
zap.Int64("revision-main", revpair.main),
|
||||
zap.Int64("revision-sub", revpair.sub),
|
||||
)
|
||||
}
|
||||
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
||||
tr.s.lg.Fatal(
|
||||
"failed to unmarshal mvccpb.KeyValue",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
tr.trace.Step("range keys from bolt db")
|
||||
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
|
||||
}
|
||||
|
||||
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
rev := tw.beginRev + 1
|
||||
c := rev
|
||||
|
Loading…
x
Reference in New Issue
Block a user