mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
code clean: make rangeKeys close to storeTxnRead.
This commit is contained in:
parent
28b9089de9
commit
5b09de33a5
@ -62,6 +62,61 @@ func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOpti
|
|||||||
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
|
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||||
|
rev := ro.Rev
|
||||||
|
if rev > curRev {
|
||||||
|
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||||
|
}
|
||||||
|
if rev <= 0 {
|
||||||
|
rev = curRev
|
||||||
|
}
|
||||||
|
if rev < tr.s.compactMainRev {
|
||||||
|
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
||||||
|
}
|
||||||
|
if ro.Count {
|
||||||
|
total := tr.s.kvindex.CountRevisions(key, end, rev)
|
||||||
|
tr.trace.Step("count revisions from in-memory index tree")
|
||||||
|
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||||
|
}
|
||||||
|
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
||||||
|
tr.trace.Step("range keys from in-memory index tree")
|
||||||
|
if len(revpairs) == 0 {
|
||||||
|
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
limit := int(ro.Limit)
|
||||||
|
if limit <= 0 || limit > len(revpairs) {
|
||||||
|
limit = len(revpairs)
|
||||||
|
}
|
||||||
|
|
||||||
|
kvs := make([]mvccpb.KeyValue, limit)
|
||||||
|
revBytes := newRevBytes()
|
||||||
|
for i, revpair := range revpairs[:len(kvs)] {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
revToBytes(revpair, revBytes)
|
||||||
|
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
|
||||||
|
if len(vs) != 1 {
|
||||||
|
tr.s.lg.Fatal(
|
||||||
|
"range failed to find revision pair",
|
||||||
|
zap.Int64("revision-main", revpair.main),
|
||||||
|
zap.Int64("revision-sub", revpair.sub),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
||||||
|
tr.s.lg.Fatal(
|
||||||
|
"failed to unmarshal mvccpb.KeyValue",
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tr.trace.Step("range keys from bolt db")
|
||||||
|
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) End() {
|
func (tr *storeTxnRead) End() {
|
||||||
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
|
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
|
||||||
tr.s.mu.RUnlock()
|
tr.s.mu.RUnlock()
|
||||||
@ -124,61 +179,6 @@ func (tw *storeTxnWrite) End() {
|
|||||||
tw.s.mu.RUnlock()
|
tw.s.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
|
||||||
rev := ro.Rev
|
|
||||||
if rev > curRev {
|
|
||||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
|
||||||
}
|
|
||||||
if rev <= 0 {
|
|
||||||
rev = curRev
|
|
||||||
}
|
|
||||||
if rev < tr.s.compactMainRev {
|
|
||||||
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
|
|
||||||
}
|
|
||||||
if ro.Count {
|
|
||||||
total := tr.s.kvindex.CountRevisions(key, end, rev)
|
|
||||||
tr.trace.Step("count revisions from in-memory index tree")
|
|
||||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
|
||||||
}
|
|
||||||
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
|
|
||||||
tr.trace.Step("range keys from in-memory index tree")
|
|
||||||
if len(revpairs) == 0 {
|
|
||||||
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
limit := int(ro.Limit)
|
|
||||||
if limit <= 0 || limit > len(revpairs) {
|
|
||||||
limit = len(revpairs)
|
|
||||||
}
|
|
||||||
|
|
||||||
kvs := make([]mvccpb.KeyValue, limit)
|
|
||||||
revBytes := newRevBytes()
|
|
||||||
for i, revpair := range revpairs[:len(kvs)] {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
revToBytes(revpair, revBytes)
|
|
||||||
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
|
|
||||||
if len(vs) != 1 {
|
|
||||||
tr.s.lg.Fatal(
|
|
||||||
"range failed to find revision pair",
|
|
||||||
zap.Int64("revision-main", revpair.main),
|
|
||||||
zap.Int64("revision-sub", revpair.sub),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
|
||||||
tr.s.lg.Fatal(
|
|
||||||
"failed to unmarshal mvccpb.KeyValue",
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tr.trace.Step("range keys from bolt db")
|
|
||||||
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
|
||||||
rev := tw.beginRev + 1
|
rev := tw.beginRev + 1
|
||||||
c := rev
|
c := rev
|
||||||
|
Loading…
x
Reference in New Issue
Block a user