From 0558e379c3bd3ca7c8dca6783d75f95eaabc0ce1 Mon Sep 17 00:00:00 2001 From: jingyih Date: Thu, 5 Nov 2020 08:18:31 -0800 Subject: [PATCH] server: proper request cancellation for range --- server/etcdserver/api/v3rpc/watch.go | 2 +- server/etcdserver/apply.go | 12 ++++++------ server/mvcc/kv.go | 4 +++- server/mvcc/kv_test.go | 25 +++++++++++++------------ server/mvcc/kv_view.go | 6 ++++-- server/mvcc/kvstore_bench_test.go | 3 ++- server/mvcc/kvstore_compaction_test.go | 3 ++- server/mvcc/kvstore_test.go | 13 +++++++------ server/mvcc/kvstore_txn.go | 17 ++++++++++++----- server/mvcc/metrics_txn.go | 10 +++++++--- 10 files changed, 57 insertions(+), 38 deletions(-) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index dca865d8e..df876232c 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -383,7 +383,7 @@ func (sws *serverWatchStream) sendLoop() { events[i] = &evs[i] if needPrevKV { opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1} - r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt) + r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt) if err == nil && len(r.KVs) != 0 { events[i].PrevKv = &(r.KVs[0]) } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 179a4bd96..9ec6e4e84 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -230,7 +230,7 @@ func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.Put var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { trace.StepWithFunction(func() { - rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) + rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) }, "get previous kv pair") if err != nil { @@ -271,7 +271,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ } if dr.PrevKv { - rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{}) + rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) if err != nil { return nil, err } @@ -316,7 +316,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra Count: r.CountOnly, } - rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro) + rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) if err != nil { return nil, err } @@ -503,7 +503,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { // * rewrite rules for common patterns: // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0" // * caching - rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{}) + rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{}) if err != nil { return false } @@ -631,7 +631,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com return nil, ch, nil, err } // get the current revision. which key to get is not important. - rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{}) + rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{}) resp.Header.Revision = rr.Rev return resp, ch, trace, err } @@ -999,7 +999,7 @@ func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp req := tv.RequestPut if req.IgnoreValue || req.IgnoreLease { // expects previous key-value, error if not exist - rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{}) + rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{}) if err != nil { return err } diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index 850183d75..b8cd982da 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -15,6 +15,8 @@ package mvcc import ( + "context" + "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" @@ -50,7 +52,7 @@ type ReadView interface { // If `end` is not nil and empty, it gets the keys greater than or equal to key. // Limit limits the number of keys returned. // If the required rev is compacted, ErrCompacted will be returned. - Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) + Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) } // TxnRead represents a read-only transaction with operations that will not diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index 326efc0aa..565944ad2 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -15,6 +15,7 @@ package mvcc import ( + "context" "fmt" "os" "reflect" @@ -45,12 +46,12 @@ type ( var ( normalRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - return kv.Range(key, end, ro) + return kv.Range(context.TODO(), key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { txn := kv.Read(traceutil.TODO()) defer txn.End() - return txn.Range(key, end, ro) + return txn.Range(context.TODO(), key, end, ro) } normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { @@ -268,7 +269,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) { t.Errorf("#%d: rev = %d, want %d", i, rev, base+1) } - r, err := s.Range([]byte("foo"), nil, RangeOptions{}) + r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{}) if err != nil { t.Fatal(err) } @@ -372,7 +373,7 @@ func TestKVOperationInSequence(t *testing.T) { t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1) } - r, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Fatal(err) } @@ -392,7 +393,7 @@ func TestKVOperationInSequence(t *testing.T) { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+2) } - r, err = s.Range([]byte("foo"), nil, RangeOptions{Rev: base + 2}) + r, err = s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 2}) if err != nil { t.Fatal(err) } @@ -450,7 +451,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { donec := make(chan struct{}) go func() { defer close(donec) - s.Range([]byte("foo"), nil, RangeOptions{}) + s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{}) }() select { case <-donec: @@ -475,7 +476,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1) } - r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err := txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Fatal(err) } @@ -495,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1) } - r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err = txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Errorf("#%d: range error (%v)", i, err) } @@ -554,7 +555,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { if err != nil { t.Errorf("#%d: unexpect compact error %v", i, err) } - r, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: tt.rev + 1}) + r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: tt.rev + 1}) if err != nil { t.Errorf("#%d: unexpect range error %v", i, err) } @@ -641,7 +642,7 @@ func TestKVRestore(t *testing.T) { tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { - r, _ := s.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k}) + r, _ := s.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k}) kvss = append(kvss, r.KVs) } @@ -659,7 +660,7 @@ func TestKVRestore(t *testing.T) { testutil.WaitSchedule() var nkvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { - r, _ := ns.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k}) + r, _ := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k}) nkvss = append(nkvss, r.KVs) } cleanup(ns, b, tmpPath) @@ -703,7 +704,7 @@ func TestKVSnapshot(t *testing.T) { ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer ns.Close() - r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{}) + r, err := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{}) if err != nil { t.Errorf("unexpect range error (%v)", err) } diff --git a/server/mvcc/kv_view.go b/server/mvcc/kv_view.go index f25c2aaf3..29464c50e 100644 --- a/server/mvcc/kv_view.go +++ b/server/mvcc/kv_view.go @@ -15,6 +15,8 @@ package mvcc import ( + "context" + "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" ) @@ -33,10 +35,10 @@ func (rv *readView) Rev() int64 { return tr.Rev() } -func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { +func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { tr := rv.kv.Read(traceutil.TODO()) defer tr.End() - return tr.Range(key, end, ro) + return tr.Range(ctx, key, end, ro) } type writeView struct{ kv KV } diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index be7ca7790..ae1482e45 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -15,6 +15,7 @@ package mvcc import ( + "context" "testing" "go.etcd.io/etcd/pkg/v3/traceutil" @@ -67,7 +68,7 @@ func benchmarkStoreRange(b *testing.B, n int) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.Range(begin, end, RangeOptions{}) + s.Range(context.TODO(), begin, end, RangeOptions{}) } } diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 7f95724a0..ac9e794d2 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -15,6 +15,7 @@ package mvcc import ( + "context" "os" "reflect" "testing" @@ -130,7 +131,7 @@ func TestCompactAllAndRestore(t *testing.T) { if s1.Rev() != rev { t.Errorf("rev = %v, want %v", s1.Rev(), rev) } - _, err = s1.Range([]byte("foo"), nil, RangeOptions{}) + _, err = s1.Range(context.TODO(), []byte("foo"), nil, RangeOptions{}) if err != nil { t.Errorf("unexpect range error %v", err) } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 0e2355ee6..6cff76b93 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -16,6 +16,7 @@ package mvcc import ( "bytes" + "context" "crypto/rand" "encoding/binary" "fmt" @@ -213,7 +214,7 @@ func TestStoreRange(t *testing.T) { b.tx.rangeRespc <- tt.r fi.indexRangeRespc <- tt.idxr - ret, err := s.Range([]byte("foo"), []byte("goo"), ro) + ret, err := s.Range(context.TODO(), []byte("foo"), []byte("goo"), ro) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } @@ -455,7 +456,7 @@ func TestRestoreDelete(t *testing.T) { defer s.Close() for i := 0; i < 20; i++ { ks := fmt.Sprintf("foo-%d", i) - r, err := s.Range([]byte(ks), nil, RangeOptions{}) + r, err := s.Range(context.TODO(), []byte(ks), nil, RangeOptions{}) if err != nil { t.Fatal(err) } @@ -502,7 +503,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond) - if _, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { + if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) } // check the key in backend is deleted @@ -676,7 +677,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { // readTx2 simulates a short read request readTx2 := s.Read(traceutil.TODO()) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} - ret, err := readTx2.Range([]byte("foo"), nil, ro) + ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro) if err != nil { t.Fatalf("failed to range: %v", err) } @@ -693,7 +694,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } readTx2.End() - ret, err = readTx1.Range([]byte("foo"), nil, ro) + ret, err = readTx1.Range(context.TODO(), []byte("foo"), nil, ro) if err != nil { t.Fatalf("failed to range: %v", err) } @@ -760,7 +761,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { tx := s.Read(traceutil.TODO()) mu.Unlock() // get all keys in backend store, and compare with wKVs - ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) + ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) tx.End() if err != nil { t.Errorf("failed to range keys: %v", err) diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 8d880e9e7..aaa93d9ab 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -15,6 +15,8 @@ package mvcc import ( + "context" + "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" @@ -47,8 +49,8 @@ func (s *store) Read(trace *traceutil.Trace) TxnRead { func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } func (tr *storeTxnRead) Rev() int64 { return tr.rev } -func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - return tr.rangeKeys(key, end, tr.Rev(), ro) +func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + return tr.rangeKeys(ctx, key, end, tr.Rev(), ro) } func (tr *storeTxnRead) End() { @@ -79,12 +81,12 @@ func (s *store) Write(trace *traceutil.Trace) TxnWrite { func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev } -func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { +func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { rev := tw.beginRev if len(tw.changes) > 0 { rev++ } - return tw.rangeKeys(key, end, rev, ro) + return tw.rangeKeys(ctx, key, end, rev, ro) } func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) { @@ -114,7 +116,7 @@ func (tw *storeTxnWrite) End() { tw.s.mu.RUnlock() } -func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { +func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { rev := ro.Rev if rev > curRev { return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev @@ -144,6 +146,11 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions kvs := make([]mvccpb.KeyValue, limit) revBytes := newRevBytes() for i, revpair := range revpairs[:len(kvs)] { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } revToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) if len(vs) != 1 { diff --git a/server/mvcc/metrics_txn.go b/server/mvcc/metrics_txn.go index 7e40d9352..d4a90eed5 100644 --- a/server/mvcc/metrics_txn.go +++ b/server/mvcc/metrics_txn.go @@ -14,7 +14,11 @@ package mvcc -import "go.etcd.io/etcd/server/v3/lease" +import ( + "context" + + "go.etcd.io/etcd/server/v3/lease" +) type metricsTxnWrite struct { TxnWrite @@ -32,9 +36,9 @@ func newMetricsTxnWrite(tw TxnWrite) TxnWrite { return &metricsTxnWrite{tw, 0, 0, 0, 0} } -func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) { +func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) { tw.ranges++ - return tw.TxnWrite.Range(key, end, ro) + return tw.TxnWrite.Range(ctx, key, end, ro) } func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {