etcdserver: set txn header revision to store revision following txn

This commit is contained in:
Anthony Romano 2016-04-11 15:51:21 -07:00
parent 8c2225f251
commit c5b8e8dc88
2 changed files with 46 additions and 4 deletions

View File

@ -229,11 +229,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
}
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
var revision int64
ok := true
for _, c := range rt.Compare {
if revision, ok = a.applyCompare(c); !ok {
if _, ok = a.applyCompare(c); !ok {
break
}
}
@ -252,6 +250,8 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, err
}
revision := a.s.KV().Rev()
// When executing the operations of txn, we need to hold the txn lock.
// So the reader will not see any intermediate results.
txnID := a.s.KV().TxnBegin()
@ -263,11 +263,15 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
}()
resps := make([]*pb.ResponseUnion, len(reqs))
changedKV := false
for i := range reqs {
if reqs[i].GetRequestRange() == nil {
changedKV = true
}
resps[i] = a.applyUnion(txnID, reqs[i])
}
if len(resps) != 0 {
if changedKV {
revision += 1
}

View File

@ -204,6 +204,44 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
}
}
// Testv3TxnRevision tests that the transaction header revision is set as expected.
func TestV3TxnRevision(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV
pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
presp, err := kvc.Put(context.TODO(), pr)
if err != nil {
t.Fatal(err)
}
txnget := &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
txn := &pb.TxnRequest{Success: []*pb.RequestUnion{txnget}}
tresp, err := kvc.Txn(context.TODO(), txn)
if err != nil {
t.Fatal(err)
}
// did not update revision
if presp.Header.Revision != tresp.Header.Revision {
t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
}
txnput := &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
txn = &pb.TxnRequest{Success: []*pb.RequestUnion{txnput}}
tresp, err = kvc.Txn(context.TODO(), txn)
if err != nil {
t.Fatal(err)
}
// updated revision
if tresp.Header.Revision != presp.Header.Revision+1 {
t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
}
}
// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
func TestV3PutMissingLease(t *testing.T) {
defer testutil.AfterTest(t)