diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 3dbb8276b..b747a9b8c 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -380,26 +380,33 @@ func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error { return nil } +func checkRequestRange(kv dstorage.KV, reqs []*pb.RequestUnion) error { + for _, requ := range reqs { + greq := requ.RequestRange + if greq == nil || greq.Revision == 0 { + continue + } + + if greq.Revision > kv.Rev() { + return dstorage.ErrFutureRev + } + if greq.Revision < kv.FirstRev() { + return dstorage.ErrCompacted + } + } + return nil +} + func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) { var revision int64 - txnID := kv.TxnBegin() - defer func() { - err := kv.TxnEnd(txnID) - if err != nil { - panic(fmt.Sprint("unexpected error when closing txn", txnID)) - } - }() - ok := true for _, c := range rt.Compare { - if revision, ok = applyCompare(txnID, kv, c); !ok { + if revision, ok = applyCompare(kv, c); !ok { break } } - // TODO: check potential errors before actually applying anything - var reqs []*pb.RequestUnion if ok { reqs = rt.Success @@ -410,6 +417,19 @@ func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnRespon if err := checkRequestLeases(le, reqs); err != nil { return nil, err } + if err := checkRequestRange(kv, reqs); err != nil { + return nil, err + } + + // When executing the operations of txn, we need to hold the txn lock. + // So the reader will not see any intermediate results. + txnID := kv.TxnBegin() + defer func() { + err := kv.TxnEnd(txnID) + if err != nil { + panic(fmt.Sprint("unexpected error when closing txn", txnID)) + } + }() resps := make([]*pb.ResponseUnion, len(reqs)) for i := range reqs { @@ -467,15 +487,10 @@ func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.Respons } // applyCompare applies the compare request. -// applyCompare should only be called within a txn request and an valid txn ID must -// be presented. Or applyCompare panics. // It returns the revision at which the comparison happens. If the comparison // succeeds, the it returns true. Otherwise it returns false. -func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) { - if txnID == noTxn { - panic("applyCompare called with noTxn") - } - ckvs, rev, err := kv.TxnRange(txnID, c.Key, nil, 1, 0) +func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) { + ckvs, rev, err := kv.Range(c.Key, nil, 1, 0) if err != nil { if err == dstorage.ErrTxnIDMismatch { panic("unexpected txn ID mismatch error") diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 0fffa1426..e8a2cf149 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/storagepb" @@ -263,6 +264,44 @@ func TestV3DeleteRange(t *testing.T) { } } +// TestV3TxnInvaildRange tests txn +func TestV3TxnInvaildRange(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + kvc := pb.NewKVClient(clus.RandConn()) + preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + + for i := 0; i < 3; i++ { + _, err := kvc.Put(context.Background(), preq) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + } + + _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2}) + if err != nil { + t.Fatalf("couldn't compact kv space (%v)", err) + } + + // future rev + txn := &pb.TxnRequest{} + txn.Success = append(txn.Success, &pb.RequestUnion{RequestPut: preq}) + + rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100} + txn.Success = append(txn.Success, &pb.RequestUnion{RequestRange: rreq}) + + if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrFutureRev { + t.Errorf("err = %v, want %v", err, v3rpc.ErrFutureRev) + } + + // compacted rev + txn.Success[1].RequestRange.Revision = 1 + if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrCompacted { + t.Errorf("err = %v, want %v", err, v3rpc.ErrCompacted) + } +} + // TestV3WatchFromCurrentRevision tests Watch APIs from current revision. func TestV3WatchFromCurrentRevision(t *testing.T) { tests := []struct { diff --git a/storage/kv.go b/storage/kv.go index 7fc4e246f..ffdb43c69 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -24,6 +24,11 @@ type KV interface { // Rev returns the current revision of the KV. Rev() int64 + // FirstRev returns the first revision of the KV. + // After a compaction, the first revision increases to the compaction + // revision. + FirstRev() int64 + // Range gets the keys in the range at rangeRev. // If rangeRev <=0, range gets the keys at currentRev. // If `end` is nil, the request returns the key. diff --git a/storage/kvstore.go b/storage/kvstore.go index f1a11f356..27325ea12 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -106,6 +106,13 @@ func (s *store) Rev() int64 { return s.currentRev.main } +func (s *store) FirstRev() int64 { + s.mu.Lock() + defer s.mu.Unlock() + + return s.compactMainRev +} + func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 { id := s.TxnBegin() s.put(key, value, lease)