From 9572197aee70d0962878ddc99d1c87184dfc7f7b Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 22 Jan 2016 15:48:06 -0800 Subject: [PATCH] etcdserver: return error when putting a key with a bad lease id --- etcdserver/v3demo_server.go | 35 +++++++++++++++++---- integration/v3_grpc_test.go | 62 +++++++++++++++++++++++++++++++++++++ lease/lessor.go | 14 +++++++++ 3 files changed, 105 insertions(+), 6 deletions(-) diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index fb0fe1326..9502b1fe1 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -196,11 +196,11 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { case r.Range != nil: ar.resp, ar.err = applyRange(noTxn, kv, r.Range) case r.Put != nil: - ar.resp, ar.err = applyPut(noTxn, kv, r.Put) + ar.resp, ar.err = applyPut(noTxn, kv, le, r.Put) case r.DeleteRange != nil: ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange) case r.Txn != nil: - ar.resp, ar.err = applyTxn(kv, r.Txn) + ar.resp, ar.err = applyTxn(kv, le, r.Txn) case r.Compaction != nil: ar.resp, ar.err = applyCompaction(kv, r.Compaction) case r.LeaseCreate != nil: @@ -214,7 +214,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { return ar } -func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, error) { +func applyPut(txnID int64, kv dstorage.KV, le lease.Lessor, p *pb.PutRequest) (*pb.PutResponse, error) { resp := &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} var ( @@ -227,7 +227,13 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e return nil, err } } else { - rev = kv.Put(p.Key, p.Value, lease.LeaseID(p.Lease)) + leaseID := lease.LeaseID(p.Lease) + if leaseID != lease.NoLease { + if l := le.Lookup(leaseID); l == nil { + return nil, lease.ErrLeaseNotFound + } + } + rev = kv.Put(p.Key, p.Value, leaseID) } resp.Header.Revision = rev return resp, nil @@ -360,7 +366,20 @@ func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (* return resp, nil } -func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) { +func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error { + for _, requ := range reqs { + preq := requ.RequestPut + if preq == nil || lease.LeaseID(preq.Lease) == lease.NoLease { + continue + } + if l := le.Lookup(lease.LeaseID(preq.Lease)); l == nil { + return lease.ErrLeaseNotFound + } + } + return nil +} + +func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) { var revision int64 txnID := kv.TxnBegin() @@ -387,6 +406,10 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) { reqs = rt.Failure } + if err := checkRequestLeases(le, reqs); err != nil { + return nil, err + } + resps := make([]*pb.ResponseUnion, len(reqs)) for i := range reqs { resps[i] = applyUnion(txnID, kv, reqs[i]) @@ -425,7 +448,7 @@ func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.Respons } return &pb.ResponseUnion{ResponseRange: resp} case union.RequestPut != nil: - resp, err := applyPut(txnID, kv, union.RequestPut) + resp, err := applyPut(txnID, kv, nil, union.RequestPut) if err != nil { panic("unexpected error during txn") } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 2bdae66c9..2c59bee40 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -110,6 +110,68 @@ func TestV3PutOverwrite(t *testing.T) { } } +// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails. +func TestV3PutMissingLease(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + kvc := pb.NewKVClient(clus.RandConn()) + key := []byte("foo") + preq := &pb.PutRequest{Key: key, Lease: 123456} + tests := []func(){ + // put case + func() { + if presp, err := kvc.Put(context.TODO(), preq); err == nil { + t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp) + } + }, + // txn success case + func() { + txn := &pb.TxnRequest{} + txn.Success = append(txn.Success, &pb.RequestUnion{RequestPut: preq}) + if tresp, err := kvc.Txn(context.TODO(), txn); err == nil { + t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp) + } + }, + // txn failure case + func() { + txn := &pb.TxnRequest{} + txn.Failure = append(txn.Failure, &pb.RequestUnion{RequestPut: preq}) + cmp := &pb.Compare{ + Result: pb.Compare_GREATER, + Target: pb.Compare_CREATE, + Key: []byte("bar"), + } + txn.Compare = append(txn.Compare, cmp) + if tresp, err := kvc.Txn(context.TODO(), txn); err == nil { + t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp) + } + }, + // ignore bad lease in failure on success txn + func() { + txn := &pb.TxnRequest{} + rreq := &pb.RangeRequest{Key: []byte("bar")} + txn.Success = append(txn.Success, &pb.RequestUnion{RequestRange: rreq}) + txn.Failure = append(txn.Failure, &pb.RequestUnion{RequestPut: preq}) + if tresp, err := kvc.Txn(context.TODO(), txn); err != nil { + t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp) + } + }, + } + + for i, f := range tests { + f() + // key shouldn't have been stored + rreq := &pb.RangeRequest{Key: key} + rresp, err := kvc.Range(context.TODO(), rreq) + if err != nil { + t.Errorf("#%d. could not rangereq (%v)", i, err) + } else if len(rresp.Kvs) != 0 { + t.Errorf("#%d. expected no keys, got %v", i, rresp) + } + } +} + // TestV3DeleteRange tests various edge cases in the DeleteRange API. func TestV3DeleteRange(t *testing.T) { tests := []struct { diff --git a/lease/lessor.go b/lease/lessor.go index f01fa0412..7148a6671 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -83,6 +83,9 @@ type Lessor interface { // an error will be returned. Renew(id LeaseID) (int64, error) + // Lookup gives the lease at a given lease id, if any + Lookup(id LeaseID) *Lease + // ExpiredLeasesC returns a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease @@ -230,6 +233,15 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { return l.TTL, nil } +func (le *lessor) Lookup(id LeaseID) *Lease { + le.mu.Lock() + defer le.mu.Unlock() + if l, ok := le.leaseMap[id]; ok { + return l + } + return nil +} + func (le *lessor) Promote() { le.mu.Lock() defer le.mu.Unlock() @@ -456,6 +468,8 @@ func (fl *FakeLessor) Demote() {} func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } +func (le *FakeLessor) Lookup(id LeaseID) *Lease { return nil } + func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } func (fl *FakeLessor) Stop() {}