mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: proper request cancellation for range
This commit is contained in:
parent
6e800b9b01
commit
0558e379c3
@ -383,7 +383,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
events[i] = &evs[i]
|
events[i] = &evs[i]
|
||||||
if needPrevKV {
|
if needPrevKV {
|
||||||
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
|
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
|
||||||
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
|
r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt)
|
||||||
if err == nil && len(r.KVs) != 0 {
|
if err == nil && len(r.KVs) != 0 {
|
||||||
events[i].PrevKv = &(r.KVs[0])
|
events[i].PrevKv = &(r.KVs[0])
|
||||||
}
|
}
|
||||||
|
|||||||
@ -230,7 +230,7 @@ func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.Put
|
|||||||
var rr *mvcc.RangeResult
|
var rr *mvcc.RangeResult
|
||||||
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
||||||
trace.StepWithFunction(func() {
|
trace.StepWithFunction(func() {
|
||||||
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
|
rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
|
||||||
}, "get previous kv pair")
|
}, "get previous kv pair")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -271,7 +271,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
|||||||
}
|
}
|
||||||
|
|
||||||
if dr.PrevKv {
|
if dr.PrevKv {
|
||||||
rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
|
rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -316,7 +316,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
|
|||||||
Count: r.CountOnly,
|
Count: r.CountOnly,
|
||||||
}
|
}
|
||||||
|
|
||||||
rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
|
rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -503,7 +503,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
|
|||||||
// * rewrite rules for common patterns:
|
// * rewrite rules for common patterns:
|
||||||
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
|
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
|
||||||
// * caching
|
// * caching
|
||||||
rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
|
rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -631,7 +631,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com
|
|||||||
return nil, ch, nil, err
|
return nil, ch, nil, err
|
||||||
}
|
}
|
||||||
// get the current revision. which key to get is not important.
|
// get the current revision. which key to get is not important.
|
||||||
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
|
rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{})
|
||||||
resp.Header.Revision = rr.Rev
|
resp.Header.Revision = rr.Rev
|
||||||
return resp, ch, trace, err
|
return resp, ch, trace, err
|
||||||
}
|
}
|
||||||
@ -999,7 +999,7 @@ func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp
|
|||||||
req := tv.RequestPut
|
req := tv.RequestPut
|
||||||
if req.IgnoreValue || req.IgnoreLease {
|
if req.IgnoreValue || req.IgnoreLease {
|
||||||
// expects previous key-value, error if not exist
|
// expects previous key-value, error if not exist
|
||||||
rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{})
|
rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
"go.etcd.io/etcd/server/v3/lease"
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
@ -50,7 +52,7 @@ type ReadView interface {
|
|||||||
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
|
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
|
||||||
// Limit limits the number of keys returned.
|
// Limit limits the number of keys returned.
|
||||||
// If the required rev is compacted, ErrCompacted will be returned.
|
// If the required rev is compacted, ErrCompacted will be returned.
|
||||||
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
|
Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TxnRead represents a read-only transaction with operations that will not
|
// TxnRead represents a read-only transaction with operations that will not
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -45,12 +46,12 @@ type (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
normalRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
normalRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||||
return kv.Range(key, end, ro)
|
return kv.Range(context.TODO(), key, end, ro)
|
||||||
}
|
}
|
||||||
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||||
txn := kv.Read(traceutil.TODO())
|
txn := kv.Read(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
return txn.Range(key, end, ro)
|
return txn.Range(context.TODO(), key, end, ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
|
||||||
@ -268,7 +269,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
|||||||
t.Errorf("#%d: rev = %d, want %d", i, rev, base+1)
|
t.Errorf("#%d: rev = %d, want %d", i, rev, base+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := s.Range([]byte("foo"), nil, RangeOptions{})
|
r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -372,7 +373,7 @@ func TestKVOperationInSequence(t *testing.T) {
|
|||||||
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
|
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
|
r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -392,7 +393,7 @@ func TestKVOperationInSequence(t *testing.T) {
|
|||||||
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+2)
|
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+2)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err = s.Range([]byte("foo"), nil, RangeOptions{Rev: base + 2})
|
r, err = s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -450,7 +451,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
|
|||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(donec)
|
defer close(donec)
|
||||||
s.Range([]byte("foo"), nil, RangeOptions{})
|
s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{})
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-donec:
|
case <-donec:
|
||||||
@ -475,7 +476,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
|
|||||||
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
|
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
|
r, err := txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -495,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
|
|||||||
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
|
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
|
r, err = txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: range error (%v)", i, err)
|
t.Errorf("#%d: range error (%v)", i, err)
|
||||||
}
|
}
|
||||||
@ -554,7 +555,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: unexpect compact error %v", i, err)
|
t.Errorf("#%d: unexpect compact error %v", i, err)
|
||||||
}
|
}
|
||||||
r, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: tt.rev + 1})
|
r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: tt.rev + 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: unexpect range error %v", i, err)
|
t.Errorf("#%d: unexpect range error %v", i, err)
|
||||||
}
|
}
|
||||||
@ -641,7 +642,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
tt(s)
|
tt(s)
|
||||||
var kvss [][]mvccpb.KeyValue
|
var kvss [][]mvccpb.KeyValue
|
||||||
for k := int64(0); k < 10; k++ {
|
for k := int64(0); k < 10; k++ {
|
||||||
r, _ := s.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
|
r, _ := s.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k})
|
||||||
kvss = append(kvss, r.KVs)
|
kvss = append(kvss, r.KVs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -659,7 +660,7 @@ func TestKVRestore(t *testing.T) {
|
|||||||
testutil.WaitSchedule()
|
testutil.WaitSchedule()
|
||||||
var nkvss [][]mvccpb.KeyValue
|
var nkvss [][]mvccpb.KeyValue
|
||||||
for k := int64(0); k < 10; k++ {
|
for k := int64(0); k < 10; k++ {
|
||||||
r, _ := ns.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
|
r, _ := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k})
|
||||||
nkvss = append(nkvss, r.KVs)
|
nkvss = append(nkvss, r.KVs)
|
||||||
}
|
}
|
||||||
cleanup(ns, b, tmpPath)
|
cleanup(ns, b, tmpPath)
|
||||||
@ -703,7 +704,7 @@ func TestKVSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
|
||||||
defer ns.Close()
|
defer ns.Close()
|
||||||
r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
|
r, err := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpect range error (%v)", err)
|
t.Errorf("unexpect range error (%v)", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
"go.etcd.io/etcd/server/v3/lease"
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
)
|
)
|
||||||
@ -33,10 +35,10 @@ func (rv *readView) Rev() int64 {
|
|||||||
return tr.Rev()
|
return tr.Rev()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||||
tr := rv.kv.Read(traceutil.TODO())
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.Range(key, end, ro)
|
return tr.Range(ctx, key, end, ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
type writeView struct{ kv KV }
|
type writeView struct{ kv KV }
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
@ -67,7 +68,7 @@ func benchmarkStoreRange(b *testing.B, n int) {
|
|||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
s.Range(begin, end, RangeOptions{})
|
s.Range(context.TODO(), begin, end, RangeOptions{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@ -130,7 +131,7 @@ func TestCompactAllAndRestore(t *testing.T) {
|
|||||||
if s1.Rev() != rev {
|
if s1.Rev() != rev {
|
||||||
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
|
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
|
||||||
}
|
}
|
||||||
_, err = s1.Range([]byte("foo"), nil, RangeOptions{})
|
_, err = s1.Range(context.TODO(), []byte("foo"), nil, RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpect range error %v", err)
|
t.Errorf("unexpect range error %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@ package mvcc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -213,7 +214,7 @@ func TestStoreRange(t *testing.T) {
|
|||||||
b.tx.rangeRespc <- tt.r
|
b.tx.rangeRespc <- tt.r
|
||||||
fi.indexRangeRespc <- tt.idxr
|
fi.indexRangeRespc <- tt.idxr
|
||||||
|
|
||||||
ret, err := s.Range([]byte("foo"), []byte("goo"), ro)
|
ret, err := s.Range(context.TODO(), []byte("foo"), []byte("goo"), ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: err = %v, want nil", i, err)
|
t.Errorf("#%d: err = %v, want nil", i, err)
|
||||||
}
|
}
|
||||||
@ -455,7 +456,7 @@ func TestRestoreDelete(t *testing.T) {
|
|||||||
defer s.Close()
|
defer s.Close()
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
ks := fmt.Sprintf("foo-%d", i)
|
ks := fmt.Sprintf("foo-%d", i)
|
||||||
r, err := s.Range([]byte(ks), nil, RangeOptions{})
|
r, err := s.Range(context.TODO(), []byte(ks), nil, RangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -502,7 +503,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
|||||||
// wait for scheduled compaction to be finished
|
// wait for scheduled compaction to be finished
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
if _, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
|
if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
|
||||||
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
|
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
|
||||||
}
|
}
|
||||||
// check the key in backend is deleted
|
// check the key in backend is deleted
|
||||||
@ -676,7 +677,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
|||||||
// readTx2 simulates a short read request
|
// readTx2 simulates a short read request
|
||||||
readTx2 := s.Read(traceutil.TODO())
|
readTx2 := s.Read(traceutil.TODO())
|
||||||
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
||||||
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to range: %v", err)
|
t.Fatalf("failed to range: %v", err)
|
||||||
}
|
}
|
||||||
@ -693,7 +694,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
readTx2.End()
|
readTx2.End()
|
||||||
|
|
||||||
ret, err = readTx1.Range([]byte("foo"), nil, ro)
|
ret, err = readTx1.Range(context.TODO(), []byte("foo"), nil, ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to range: %v", err)
|
t.Fatalf("failed to range: %v", err)
|
||||||
}
|
}
|
||||||
@ -760,7 +761,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
|||||||
tx := s.Read(traceutil.TODO())
|
tx := s.Read(traceutil.TODO())
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
// get all keys in backend store, and compare with wKVs
|
// get all keys in backend store, and compare with wKVs
|
||||||
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
||||||
tx.End()
|
tx.End()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to range keys: %v", err)
|
t.Errorf("failed to range keys: %v", err)
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
"go.etcd.io/etcd/server/v3/lease"
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
@ -47,8 +49,8 @@ func (s *store) Read(trace *traceutil.Trace) TxnRead {
|
|||||||
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
|
||||||
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
|
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
|
||||||
|
|
||||||
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||||
return tr.rangeKeys(key, end, tr.Rev(), ro)
|
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) End() {
|
func (tr *storeTxnRead) End() {
|
||||||
@ -79,12 +81,12 @@ func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
|||||||
|
|
||||||
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
|
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
|
||||||
|
|
||||||
func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||||
rev := tw.beginRev
|
rev := tw.beginRev
|
||||||
if len(tw.changes) > 0 {
|
if len(tw.changes) > 0 {
|
||||||
rev++
|
rev++
|
||||||
}
|
}
|
||||||
return tw.rangeKeys(key, end, rev, ro)
|
return tw.rangeKeys(ctx, key, end, rev, ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
|
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
|
||||||
@ -114,7 +116,7 @@ func (tw *storeTxnWrite) End() {
|
|||||||
tw.s.mu.RUnlock()
|
tw.s.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
|
||||||
rev := ro.Rev
|
rev := ro.Rev
|
||||||
if rev > curRev {
|
if rev > curRev {
|
||||||
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
|
||||||
@ -144,6 +146,11 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
|||||||
kvs := make([]mvccpb.KeyValue, limit)
|
kvs := make([]mvccpb.KeyValue, limit)
|
||||||
revBytes := newRevBytes()
|
revBytes := newRevBytes()
|
||||||
for i, revpair := range revpairs[:len(kvs)] {
|
for i, revpair := range revpairs[:len(kvs)] {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
revToBytes(revpair, revBytes)
|
revToBytes(revpair, revBytes)
|
||||||
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
|
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
|
||||||
if len(vs) != 1 {
|
if len(vs) != 1 {
|
||||||
|
|||||||
@ -14,7 +14,11 @@
|
|||||||
|
|
||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import "go.etcd.io/etcd/server/v3/lease"
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
|
)
|
||||||
|
|
||||||
type metricsTxnWrite struct {
|
type metricsTxnWrite struct {
|
||||||
TxnWrite
|
TxnWrite
|
||||||
@ -32,9 +36,9 @@ func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
|
|||||||
return &metricsTxnWrite{tw, 0, 0, 0, 0}
|
return &metricsTxnWrite{tw, 0, 0, 0, 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||||
tw.ranges++
|
tw.ranges++
|
||||||
return tw.TxnWrite.Range(key, end, ro)
|
return tw.TxnWrite.Range(ctx, key, end, ro)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
|
func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user