diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 9d5207677..949f36ca7 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -319,33 +319,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { - ok := true - for _, c := range rt.Compare { - if _, ok = a.applyCompare(c); !ok { - break + isWrite := !isTxnReadonly(rt) + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read()) + + reqs, ok := a.compareToOps(txn, rt) + if isWrite { + if err := a.checkRequestPut(txn, reqs); err != nil { + txn.End() + return nil, err } } - - var reqs []*pb.RequestOp - if ok { - reqs = rt.Success - } else { - reqs = rt.Failure - } - - if err := a.checkRequestPut(reqs); err != nil { - return nil, err - } - if err := a.checkRequestRange(reqs); err != nil { + if err := checkRequestRange(txn, reqs); err != nil { + txn.End() return nil, err } resps := make([]*pb.ResponseOp, len(reqs)) + txnResp := &pb.TxnResponse{ + Responses: resps, + Succeeded: ok, + Header: &pb.ResponseHeader{}, + } - // When executing the operations of txn, etcd must hold the txn lock so - // readers do not see any intermediate results. - // TODO: use Read txn if only Ranges - txn := a.s.KV().Write() + // When executing mutable txn ops, etcd must hold the txn lock so + // readers do not see any intermediate results. Since writes are + // serialized on the raft loop, the revision in the read view will + // be the revision of the write txn. + if isWrite { + txn.End() + txn = a.s.KV().Write() + } for i := range reqs { resps[i] = a.applyUnion(txn, reqs[i]) } @@ -355,23 +358,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { } txn.End() - txnResp := &pb.TxnResponse{} - txnResp.Header = &pb.ResponseHeader{} txnResp.Header.Revision = rev - txnResp.Responses = resps - txnResp.Succeeded = ok return txnResp, nil } -// applyCompare applies the compare request. -// It returns the revision at which the comparison happens. If the comparison -// succeeds, the it returns true. Otherwise it returns false. -func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { - rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{}) - rev := rr.Rev +func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) { + for _, c := range rt.Compare { + if !applyCompare(rv, c) { + return rt.Failure, false + } + } + return rt.Success, true +} +// applyCompare applies the compare request. +// If the comparison succeeds, it returns true. Otherwise, returns false. +func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { + rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{}) if err != nil { - return rev, false + return false } var ckv mvccpb.KeyValue if len(rr.KVs) != 0 { @@ -383,7 +388,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { // We can treat non-existence as the empty set explicitly, such that // even a key with a value of length 0 bytes is still a real key // that was written that way - return rev, false + return false } } @@ -415,23 +420,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { switch c.Result { case pb.Compare_EQUAL: - if result != 0 { - return rev, false - } + return result == 0 case pb.Compare_NOT_EQUAL: - if result == 0 { - return rev, false - } + return result != 0 case pb.Compare_GREATER: - if result != 1 { - return rev, false - } + return result > 0 case pb.Compare_LESS: - if result != -1 { - return rev, false - } + return result < 0 } - return rev, true + return true } func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp { @@ -771,7 +768,7 @@ func (s *kvSortByValue) Less(i, j int) bool { return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 } -func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { +func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error { for _, requ := range reqs { tv, ok := requ.Request.(*pb.RequestOp_RequestPut) if !ok { @@ -783,7 +780,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { } if preq.IgnoreValue || preq.IgnoreLease { // expects previous key-value, error if not exist - rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{}) + rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{}) if err != nil { return err } @@ -801,7 +798,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { return nil } -func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error { +func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error { for _, requ := range reqs { tv, ok := requ.Request.(*pb.RequestOp_RequestRange) if !ok { @@ -812,10 +809,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error { continue } - if greq.Revision > a.s.KV().Rev() { + if greq.Revision > rv.Rev() { return mvcc.ErrFutureRev } - if greq.Revision < a.s.KV().FirstRev() { + if greq.Revision < rv.FirstRev() { return mvcc.ErrCompacted } } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 8eaf4ec37..da04d76e6 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -328,6 +328,58 @@ func TestV3TxnRevision(t *testing.T) { } } +// Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected +// when compared to the Succeeded field in the txn response. +func TestV3TxnCmpHeaderRev(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.RandClient()).KV + + for i := 0; i < 10; i++ { + // Concurrently put a key with a txn comparing on it. + revc := make(chan int64, 1) + go func() { + defer close(revc) + pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")} + presp, err := kvc.Put(context.TODO(), pr) + if err != nil { + t.Fatal(err) + } + revc <- presp.Header.Revision + }() + + // The read-only txn uses the optimized readindex server path. + txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{Key: []byte("k")}}} + txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}} + // i = 0 /\ Succeeded => put followed txn + cmp := &pb.Compare{ + Result: pb.Compare_EQUAL, + Target: pb.Compare_VERSION, + Key: []byte("k"), + TargetUnion: &pb.Compare_Version{Version: int64(i)}, + } + txn.Compare = append(txn.Compare, cmp) + + tresp, err := kvc.Txn(context.TODO(), txn) + if err != nil { + t.Fatal(err) + } + + prev := <-revc + // put followed txn; should eval to false + if prev > tresp.Header.Revision && !tresp.Succeeded { + t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp) + } + // txn follows put; should eval to true + if tresp.Header.Revision >= prev && tresp.Succeeded { + t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp) + } + } +} + // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair. func TestV3PutIgnoreValue(t *testing.T) { defer testutil.AfterTest(t) diff --git a/mvcc/kv.go b/mvcc/kv.go index e13cd6479..6636347aa 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -93,7 +93,9 @@ func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("un func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { panic("unexpected Put") } -func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") } +func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil } + +func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} } type KV interface { ReadView