mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: use same ReadView for read-only txns
A read-only txn isn't serialized by raft, but it uses a fresh read txn for every mvcc access prior to executing its request ops. If a write txn modifies the keys matching the read txn's comparisons, the read txn may return inconsistent results. To fix, use the same read-only mvcc txn for the duration of the etcd txn. Probably gets a modest txn speedup as well since there are fewer read txn allocations.
This commit is contained in:
parent
e72ad5dd2a
commit
c87594f27c
@ -318,33 +318,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
ok := true
|
||||
for _, c := range rt.Compare {
|
||||
if _, ok = a.applyCompare(c); !ok {
|
||||
break
|
||||
isWrite := !isTxnReadonly(rt)
|
||||
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
|
||||
|
||||
reqs, ok := a.compareToOps(txn, rt)
|
||||
if isWrite {
|
||||
if err := a.checkRequestPut(txn, reqs); err != nil {
|
||||
txn.End()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var reqs []*pb.RequestOp
|
||||
if ok {
|
||||
reqs = rt.Success
|
||||
} else {
|
||||
reqs = rt.Failure
|
||||
}
|
||||
|
||||
if err := a.checkRequestPut(reqs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := a.checkRequestRange(reqs); err != nil {
|
||||
if err := checkRequestRange(txn, reqs); err != nil {
|
||||
txn.End()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resps := make([]*pb.ResponseOp, len(reqs))
|
||||
txnResp := &pb.TxnResponse{
|
||||
Responses: resps,
|
||||
Succeeded: ok,
|
||||
Header: &pb.ResponseHeader{},
|
||||
}
|
||||
|
||||
// When executing the operations of txn, etcd must hold the txn lock so
|
||||
// readers do not see any intermediate results.
|
||||
// TODO: use Read txn if only Ranges
|
||||
txn := a.s.KV().Write()
|
||||
// When executing mutable txn ops, etcd must hold the txn lock so
|
||||
// readers do not see any intermediate results. Since writes are
|
||||
// serialized on the raft loop, the revision in the read view will
|
||||
// be the revision of the write txn.
|
||||
if isWrite {
|
||||
txn.End()
|
||||
txn = a.s.KV().Write()
|
||||
}
|
||||
for i := range reqs {
|
||||
resps[i] = a.applyUnion(txn, reqs[i])
|
||||
}
|
||||
@ -354,23 +357,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
}
|
||||
txn.End()
|
||||
|
||||
txnResp := &pb.TxnResponse{}
|
||||
txnResp.Header = &pb.ResponseHeader{}
|
||||
txnResp.Header.Revision = rev
|
||||
txnResp.Responses = resps
|
||||
txnResp.Succeeded = ok
|
||||
return txnResp, nil
|
||||
}
|
||||
|
||||
// applyCompare applies the compare request.
|
||||
// It returns the revision at which the comparison happens. If the comparison
|
||||
// succeeds, the it returns true. Otherwise it returns false.
|
||||
func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
|
||||
rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{})
|
||||
rev := rr.Rev
|
||||
func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) {
|
||||
for _, c := range rt.Compare {
|
||||
if !applyCompare(rv, c) {
|
||||
return rt.Failure, false
|
||||
}
|
||||
}
|
||||
return rt.Success, true
|
||||
}
|
||||
|
||||
// applyCompare applies the compare request.
|
||||
// If the comparison succeeds, it returns true. Otherwise, returns false.
|
||||
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
|
||||
rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
|
||||
if err != nil {
|
||||
return rev, false
|
||||
return false
|
||||
}
|
||||
var ckv mvccpb.KeyValue
|
||||
if len(rr.KVs) != 0 {
|
||||
@ -382,7 +387,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
|
||||
// We can treat non-existence as the empty set explicitly, such that
|
||||
// even a key with a value of length 0 bytes is still a real key
|
||||
// that was written that way
|
||||
return rev, false
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@ -414,23 +419,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
|
||||
|
||||
switch c.Result {
|
||||
case pb.Compare_EQUAL:
|
||||
if result != 0 {
|
||||
return rev, false
|
||||
}
|
||||
return result == 0
|
||||
case pb.Compare_NOT_EQUAL:
|
||||
if result == 0 {
|
||||
return rev, false
|
||||
}
|
||||
return result != 0
|
||||
case pb.Compare_GREATER:
|
||||
if result != 1 {
|
||||
return rev, false
|
||||
}
|
||||
return result > 0
|
||||
case pb.Compare_LESS:
|
||||
if result != -1 {
|
||||
return rev, false
|
||||
}
|
||||
return result < 0
|
||||
}
|
||||
return rev, true
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
|
||||
@ -770,7 +767,7 @@ func (s *kvSortByValue) Less(i, j int) bool {
|
||||
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
|
||||
}
|
||||
|
||||
func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
|
||||
func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
|
||||
for _, requ := range reqs {
|
||||
tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
|
||||
if !ok {
|
||||
@ -782,7 +779,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
|
||||
}
|
||||
if preq.IgnoreValue || preq.IgnoreLease {
|
||||
// expects previous key-value, error if not exist
|
||||
rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{})
|
||||
rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -800,7 +797,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
|
||||
func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
|
||||
for _, requ := range reqs {
|
||||
tv, ok := requ.Request.(*pb.RequestOp_RequestRange)
|
||||
if !ok {
|
||||
@ -811,10 +808,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if greq.Revision > a.s.KV().Rev() {
|
||||
if greq.Revision > rv.Rev() {
|
||||
return mvcc.ErrFutureRev
|
||||
}
|
||||
if greq.Revision < a.s.KV().FirstRev() {
|
||||
if greq.Revision < rv.FirstRev() {
|
||||
return mvcc.ErrCompacted
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user