Encapsulation of applier logic: Move Txn related code out of applier.go.

The PR removes calls to applierV3base logic from server.go that is NOT part of 'application'.
The original idea was that read-only transaction and Range call shared logic with Apply,
so they can call appliers directly (but bypassing all 'corrupt', 'quota' and 'auth' wrappers).

This PR moves all the logic to a separate file (that later can become package on its own).
This commit is contained in:
Piotr Tabor 2022-04-02 22:21:53 +02:00
parent cc5d6f7dc3
commit cdf9869d70
4 changed files with 608 additions and 577 deletions

View File

@ -15,17 +15,14 @@
package etcdserver
import (
"bytes"
"context"
"fmt"
"sort"
"strconv"
"time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/auth"
@ -99,24 +96,12 @@ type applierV3 interface {
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
}
type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
type applierV3backend struct {
s *EtcdServer
checkPut checkReqFunc
checkRange checkReqFunc
}
func (s *EtcdServer) newApplierV3Backend() applierV3 {
base := &applierV3backend{s: s}
base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
return base.checkRequestPut(rv, req)
}
base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
return base.checkRequestRange(rv, req)
}
return base
return &applierV3backend{s: s}
}
func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
@ -249,434 +234,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 member
}
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
a.s.Logger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txn = a.s.KV().Write(trace)
defer txn.End()
}
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() {
rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")
if err != nil {
return nil, nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil
return Put(ctx, a.s.Logger(), a.s.lessor, a.s.KV(), txn, p)
}
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)
if txn == nil {
txn = a.s.kv.Write(traceutil.TODO())
defer txn.End()
}
if dr.PrevKv {
rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
if rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
}
}
}
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
return resp, nil
return DeleteRange(a.s.KV(), txn, dr)
}
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
lg := a.s.Logger()
if txn == nil {
txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End()
}
limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
}
if limit > 0 {
// fetch one extra for 'more' flag
limit = limit + 1
}
ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}
rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
}
if r.MinModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
pruneKVs(rr, f)
}
if r.MaxCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
pruneKVs(rr, f)
}
if r.MinCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
pruneKVs(rr, f)
}
sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// sort ASCEND by default only when target is not 'KEY'
sortOrder = pb.RangeRequest_ASCEND
} else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// don't re-sort when target is 'KEY' and order is ASCEND
sortOrder = pb.RangeRequest_NONE
}
if sortOrder != pb.RangeRequest_NONE {
var sorter sort.Interface
switch {
case r.SortTarget == pb.RangeRequest_KEY:
sorter = &kvSortByKey{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VERSION:
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_CREATE:
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_MOD:
sorter = &kvSortByMod{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VALUE:
sorter = &kvSortByValue{&kvSort{rr.KVs}}
default:
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))
}
switch {
case sortOrder == pb.RangeRequest_ASCEND:
sort.Sort(sorter)
case sortOrder == pb.RangeRequest_DESCEND:
sort.Sort(sort.Reverse(sorter))
}
}
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
return Range(ctx, a.s.Logger(), a.s.KV(), txn, r)
}
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", a.s.Logger())
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite
if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
} else {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
}
var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txn, rt)
},
"compare",
)
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
txn.End()
return nil, nil, err
}
}
if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
txn.End()
return nil, nil, err
}
trace.Step("check requests")
txnResp, _ := newTxnResp(rt, txnPath)
// When executing mutable txn ops, etcd must hold the txn lock so
// readers do not see any intermediate results. Since writes are
// serialized on the raft loop, the revision in the read view will
// be the revision of the write txn.
if isWrite {
txn.End()
txn = a.s.KV().Write(trace)
}
a.applyTxn(ctx, txn, rt, txnPath, txnResp)
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
}
txn.End()
txnResp.Header.Revision = rev
trace.AddField(
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
)
return txnResp, trace, nil
}
// newTxnResp allocates a txn response for a txn request given a path.
func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
resps := make([]*pb.ResponseOp, len(reqs))
txnResp = &pb.TxnResponse{
Responses: resps,
Succeeded: txnPath[0],
Header: &pb.ResponseHeader{},
}
for i, req := range reqs {
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
case *pb.RequestOp_RequestPut:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
case *pb.RequestOp_RequestDeleteRange:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
case *pb.RequestOp_RequestTxn:
resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
txnPath = txnPath[1+txns:]
txnCount += txns + 1
default:
}
}
return txnResp, txnCount
}
func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
txnPath := make([]bool, 1)
ops := rt.Success
if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
ops = rt.Failure
}
for _, op := range ops {
tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
if !ok || tv.RequestTxn == nil {
continue
}
txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
}
return txnPath
}
func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
for _, c := range cmps {
if !applyCompare(rv, c) {
return false
}
}
return true
}
// applyCompare applies the compare request.
// If the comparison succeeds, it returns true. Otherwise, returns false.
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// TODO: possible optimizations
// * chunk reads for large ranges to conserve memory
// * rewrite rules for common patterns:
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
// * caching
rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return false
}
if len(rr.KVs) == 0 {
if c.Target == pb.Compare_VALUE {
// Always fail if comparing a value on a key/keys that doesn't exist;
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
return false
}
}
return true
}
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
case pb.Compare_VALUE:
v := []byte{}
if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
v = tv.Value
}
result = bytes.Compare(ckv.Value, v)
case pb.Compare_CREATE:
if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
rev = tv.CreateRevision
}
result = compareInt64(ckv.CreateRevision, rev)
case pb.Compare_MOD:
if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
rev = tv.ModRevision
}
result = compareInt64(ckv.ModRevision, rev)
case pb.Compare_VERSION:
if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
rev = tv.Version
}
result = compareInt64(ckv.Version, rev)
case pb.Compare_LEASE:
if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
rev = tv.Lease
}
result = compareInt64(ckv.Lease, rev)
}
switch c.Result {
case pb.Compare_EQUAL:
return result == 0
case pb.Compare_NOT_EQUAL:
return result != 0
case pb.Compare_GREATER:
return result > 0
case pb.Compare_LESS:
return result < 0
}
return true
}
func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
lg := a.s.Logger()
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "range"},
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := a.Range(ctx, txn, tv.RequestRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestPut:
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "put"},
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, _, err := a.Put(ctx, txn, tv.RequestPut)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange:
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:]
default:
// empty union
}
}
return txns
return Txn(ctx, a.s.Logger(), rt, a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, a.s.KV(), a.s.lessor)
}
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
@ -1007,131 +577,6 @@ func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantRes
return resp, err
}
type kvSort struct{ kvs []mvccpb.KeyValue }
func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
s.kvs[i] = s.kvs[j]
s.kvs[j] = t
}
func (s *kvSort) Len() int { return len(s.kvs) }
type kvSortByKey struct{ *kvSort }
func (s *kvSortByKey) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
}
type kvSortByVersion struct{ *kvSort }
func (s *kvSortByVersion) Less(i, j int) bool {
return (s.kvs[i].Version - s.kvs[j].Version) < 0
}
type kvSortByCreate struct{ *kvSort }
func (s *kvSortByCreate) Less(i, j int) bool {
return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
}
type kvSortByMod struct{ *kvSort }
func (s *kvSortByMod) Less(i, j int) bool {
return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
}
type kvSortByValue struct{ *kvSort }
func (s *kvSortByValue) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
}
func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
txnCount := 0
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
for _, req := range reqs {
if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
if err != nil {
return 0, err
}
txnCount += txns + 1
txnPath = txnPath[txns+1:]
continue
}
if err := f(rv, req); err != nil {
return 0, err
}
}
return txnCount, nil
}
func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
if !ok || tv.RequestPut == nil {
return nil
}
req := tv.RequestPut
if req.IgnoreValue || req.IgnoreLease {
// expects previous key-value, error if not exist
rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
if err != nil {
return err
}
if rr == nil || len(rr.KVs) == 0 {
return ErrKeyNotFound
}
}
if lease.LeaseID(req.Lease) != lease.NoLease {
if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
return lease.ErrLeaseNotFound
}
}
return nil
}
func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
if !ok || tv.RequestRange == nil {
return nil
}
req := tv.RequestRange
switch {
case req.Revision == 0:
return nil
case req.Revision > rv.Rev():
return mvcc.ErrFutureRev
case req.Revision < rv.FirstRev():
return mvcc.ErrCompacted
}
return nil
}
func compareInt64(a, b int64) int {
switch {
case a < b:
return -1
case a > b:
return 1
default:
return 0
}
}
// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}
func noSideEffect(r *pb.InternalRaftRequest) bool {
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
}
@ -1154,17 +599,6 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
txn.Failure = f(txn.Failure)
}
func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
j++
}
}
rr.KVs = rr.KVs[:j]
}
func newHeader(s *EtcdServer) *pb.ResponseHeader {
return &pb.ResponseHeader{
ClusterId: uint64(s.Cluster().ID()),

View File

@ -252,8 +252,6 @@ type EtcdServer struct {
// applyV3 is the applier with auth and quotas
applyV3 applierV3
// applyV3Base is the core applier without auth or quotas
applyV3Base applierV3
// applyV3Internal is the applier for internal request
applyV3Internal applierV3Internal
applyWait wait.WaitTime
@ -392,7 +390,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.compactor.Run()
}
srv.applyV3Base = srv.newApplierV3Backend()
srv.applyV3Internal = srv.newApplierV3Internal()
if err = srv.restoreAlarms(); err != nil {
return nil, err

597
server/etcdserver/txn.go Normal file
View File

@ -0,0 +1,597 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"bytes"
"context"
"sort"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap"
)
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
lg,
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txn = kv.Write(trace)
defer txn.End()
}
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() {
rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")
if err != nil {
return nil, nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil
}
func DeleteRange(kv mvcc.KV, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)
if txn == nil {
txn = kv.Write(traceutil.TODO())
defer txn.End()
}
if dr.PrevKv {
rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
if rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
}
}
}
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
return resp, nil
}
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
if txn == nil {
txn = kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End()
}
limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
}
if limit > 0 {
// fetch one extra for 'more' flag
limit = limit + 1
}
ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}
rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
}
if r.MinModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
pruneKVs(rr, f)
}
if r.MaxCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
pruneKVs(rr, f)
}
if r.MinCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
pruneKVs(rr, f)
}
sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// sort ASCEND by default only when target is not 'KEY'
sortOrder = pb.RangeRequest_ASCEND
} else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// don't re-sort when target is 'KEY' and order is ASCEND
sortOrder = pb.RangeRequest_NONE
}
if sortOrder != pb.RangeRequest_NONE {
var sorter sort.Interface
switch {
case r.SortTarget == pb.RangeRequest_KEY:
sorter = &kvSortByKey{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VERSION:
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_CREATE:
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_MOD:
sorter = &kvSortByMod{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VALUE:
sorter = &kvSortByValue{&kvSort{rr.KVs}}
default:
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))
}
switch {
case sortOrder == pb.RangeRequest_ASCEND:
sort.Sort(sorter)
case sortOrder == pb.RangeRequest_DESCEND:
sort.Sort(sort.Reverse(sorter))
}
}
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", lg)
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ {
txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))
} else {
txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace))
}
var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txn, rt)
},
"compare",
)
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
if _, err := checkRequests(txn, rt, txnPath,
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
txn.End()
return nil, nil, err
}
}
if _, err := checkRequests(txn, rt, txnPath, checkRequestRange); err != nil {
txn.End()
return nil, nil, err
}
trace.Step("check requests")
txnResp, _ := newTxnResp(rt, txnPath)
// When executing mutable txn ops, etcd must hold the txn lock so
// readers do not see any intermediate results. Since writes are
// serialized on the raft loop, the revision in the read view will
// be the revision of the write txn.
if isWrite {
txn.End()
txn = kv.Write(trace)
}
applyTxn(ctx, lg, kv, lessor, txn, rt, txnPath, txnResp)
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
}
txn.End()
txnResp.Header.Revision = rev
trace.AddField(
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
)
return txnResp, trace, nil
}
// newTxnResp allocates a txn response for a txn request given a path.
func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
resps := make([]*pb.ResponseOp, len(reqs))
txnResp = &pb.TxnResponse{
Responses: resps,
Succeeded: txnPath[0],
Header: &pb.ResponseHeader{},
}
for i, req := range reqs {
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
case *pb.RequestOp_RequestPut:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
case *pb.RequestOp_RequestDeleteRange:
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
case *pb.RequestOp_RequestTxn:
resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
txnPath = txnPath[1+txns:]
txnCount += txns + 1
default:
}
}
return txnResp, txnCount
}
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "range"},
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := Range(ctx, lg, kv, txn, tv.RequestRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestPut:
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "put"},
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, _, err := Put(ctx, lg, lessor, kv, txn, tv.RequestPut)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange:
resp, err := DeleteRange(kv, txn, tv.RequestDeleteRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := applyTxn(ctx, lg, kv, lessor, txn, tv.RequestTxn, txnPath[1:], resp)
txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:]
default:
// empty union
}
}
return txns
}
//---------------------------------------------------------
type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
func checkRequestPut(rv mvcc.ReadView, lessor lease.Lessor, reqOp *pb.RequestOp) error {
tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
if !ok || tv.RequestPut == nil {
return nil
}
req := tv.RequestPut
if req.IgnoreValue || req.IgnoreLease {
// expects previous key-value, error if not exist
rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
if err != nil {
return err
}
if rr == nil || len(rr.KVs) == 0 {
return ErrKeyNotFound
}
}
if lease.LeaseID(req.Lease) != lease.NoLease {
if l := lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
return lease.ErrLeaseNotFound
}
}
return nil
}
func checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
if !ok || tv.RequestRange == nil {
return nil
}
req := tv.RequestRange
switch {
case req.Revision == 0:
return nil
case req.Revision > rv.Rev():
return mvcc.ErrFutureRev
case req.Revision < rv.FirstRev():
return mvcc.ErrCompacted
}
return nil
}
func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
txnCount := 0
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
for _, req := range reqs {
if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
if err != nil {
return 0, err
}
txnCount += txns + 1
txnPath = txnPath[txns+1:]
continue
}
if err := f(rv, req); err != nil {
return 0, err
}
}
return txnCount, nil
}
// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}
func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
j++
}
}
rr.KVs = rr.KVs[:j]
}
type kvSort struct{ kvs []mvccpb.KeyValue }
func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
s.kvs[i] = s.kvs[j]
s.kvs[j] = t
}
func (s *kvSort) Len() int { return len(s.kvs) }
type kvSortByKey struct{ *kvSort }
func (s *kvSortByKey) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
}
type kvSortByVersion struct{ *kvSort }
func (s *kvSortByVersion) Less(i, j int) bool {
return (s.kvs[i].Version - s.kvs[j].Version) < 0
}
type kvSortByCreate struct{ *kvSort }
func (s *kvSortByCreate) Less(i, j int) bool {
return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
}
type kvSortByMod struct{ *kvSort }
func (s *kvSortByMod) Less(i, j int) bool {
return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
}
type kvSortByValue struct{ *kvSort }
func (s *kvSortByValue) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
}
func compareInt64(a, b int64) int {
switch {
case a < b:
return -1
case a > b:
return 1
default:
return 0
}
}
func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
txnPath := make([]bool, 1)
ops := rt.Success
if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
ops = rt.Failure
}
for _, op := range ops {
tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
if !ok || tv.RequestTxn == nil {
continue
}
txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
}
return txnPath
}
func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
for _, c := range cmps {
if !applyCompare(rv, c) {
return false
}
}
return true
}
// applyCompare applies the compare request.
// If the comparison succeeds, it returns true. Otherwise, returns false.
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// TODO: possible optimizations
// * chunk reads for large ranges to conserve memory
// * rewrite rules for common patterns:
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
// * caching
rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return false
}
if len(rr.KVs) == 0 {
if c.Target == pb.Compare_VALUE {
// Always fail if comparing a value on a key/keys that doesn't exist;
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
return false
}
}
return true
}
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
case pb.Compare_VALUE:
v := []byte{}
if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
v = tv.Value
}
result = bytes.Compare(ckv.Value, v)
case pb.Compare_CREATE:
if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
rev = tv.CreateRevision
}
result = compareInt64(ckv.CreateRevision, rev)
case pb.Compare_MOD:
if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
rev = tv.ModRevision
}
result = compareInt64(ckv.ModRevision, rev)
case pb.Compare_VERSION:
if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
rev = tv.Version
}
result = compareInt64(ckv.Version, rev)
case pb.Compare_LEASE:
if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
rev = tv.Lease
}
result = compareInt64(ckv.Lease, rev)
}
switch c.Result {
case pb.Compare_EQUAL:
return result == 0
case pb.Compare_NOT_EQUAL:
return result != 0
case pb.Compare_GREATER:
return result > 0
case pb.Compare_LESS:
return result < 0
}
return true
}

View File

@ -128,7 +128,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
get := func() { resp, err = Range(ctx, s.Logger(), s.KV(), nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
@ -178,7 +178,9 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
trace.LogIfLong(traceThreshold)
}(time.Now())
get := func() { resp, _, err = s.applyV3Base.Txn(ctx, r) }
get := func() {
resp, _, err = Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
@ -390,7 +392,8 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
return nil, ErrCanceled
}
func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
// LeaseLeases is really ListLeases !???
func (s *EtcdServer) LeaseLeases(_ context.Context, _ *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
ls := s.lessor.Leases()
lss := make([]*pb.LeaseStatus, len(ls))
for i := range ls {