diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 0b27b3f5b..b90e0a099 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -16,7 +16,6 @@ package etcdserver import ( "bytes" - "fmt" "sort" "time" @@ -30,11 +29,6 @@ import ( ) const ( - // noTxn is an invalid txn ID. - // To apply with independent Range, Put, Delete, you can pass noTxn - // to apply functions instead of a valid txn ID. - noTxn = -1 - warnApplyDuration = 100 * time.Millisecond ) @@ -51,9 +45,9 @@ type applyResult struct { type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) - Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) - DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) + Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) + Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) @@ -99,11 +93,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: - ar.resp, ar.err = a.s.applyV3.Range(noTxn, r.Range) + ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range) case r.Put != nil: - ar.resp, ar.err = a.s.applyV3.Put(noTxn, r.Put) + ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: - ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange) + ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) case r.Compaction != nil: @@ -152,122 +146,87 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { - resp := &pb.PutResponse{} +func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { + resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - var ( - rev int64 - err error - ) - var rr *mvcc.RangeResult - if p.PrevKv || p.IgnoreValue || p.IgnoreLease { - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } - } - - if p.IgnoreValue { - if rr == nil || len(rr.KVs) == 0 { - // ignore_value flag expects previous key-value pair - return nil, ErrKeyNotFound - } - p.Value = rr.KVs[0].Value - } - - if p.IgnoreLease { - if rr == nil || len(rr.KVs) == 0 { - // ignore_lease flag expects previous key-value pair - return nil, ErrKeyNotFound - } - p.Lease = rr.KVs[0].Lease - } - - if txnID != noTxn { - rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) - if err != nil { - return nil, err - } - } else { - leaseID := lease.LeaseID(p.Lease) + val, leaseID := p.Value, lease.LeaseID(p.Lease) + if txn == nil { if leaseID != lease.NoLease { if l := a.s.lessor.Lookup(leaseID); l == nil { return nil, lease.ErrLeaseNotFound } } - rev = a.s.KV().Put(p.Key, p.Value, leaseID) + txn = a.s.KV().Write() + defer txn.End() } - resp.Header.Revision = rev - if p.PrevKv && rr != nil && len(rr.KVs) != 0 { - resp.PrevKv = &rr.KVs[0] + + var rr *mvcc.RangeResult + if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) + if err != nil { + return nil, err + } } + if p.IgnoreValue || p.IgnoreLease { + if rr == nil || len(rr.KVs) == 0 { + // ignore_{lease,value} flag expects previous key-value pair + return nil, ErrKeyNotFound + } + } + if p.IgnoreValue { + val = rr.KVs[0].Value + } + if p.IgnoreLease { + leaseID = lease.LeaseID(rr.KVs[0].Lease) + } + if p.PrevKv { + if rr != nil && len(rr.KVs) != 0 { + resp.PrevKv = &rr.KVs[0] + } + } + + resp.Header.Revision = txn.Put(p.Key, val, leaseID) return resp, nil } -func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} - var ( - n int64 - rev int64 - err error - ) + if txn == nil { + txn = a.s.kv.Write() + defer txn.End() + } if isGteRange(dr.RangeEnd) { dr.RangeEnd = []byte{} } - var rr *mvcc.RangeResult if dr.PrevKv { - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } - } - - if txnID != noTxn { - n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) + rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) if err != nil { return nil, err } - } else { - n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd) - } - - resp.Deleted = n - if rr != nil { - for i := range rr.KVs { - resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i]) + if rr != nil { + for i := range rr.KVs { + resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i]) + } } } - resp.Header.Revision = rev + + resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd) return resp, nil } -func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - var ( - rr *mvcc.RangeResult - err error - ) + if txn == nil { + txn = a.s.kv.Read() + defer txn.End() + } if isGteRange(r.RangeEnd) { r.RangeEnd = []byte{} @@ -291,16 +250,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp Count: r.CountOnly, } - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro) - if err != nil { - return nil, err - } + rr, err := txn.Range(r.Key, r.RangeEnd, ro) + if err != nil { + return nil, err } if r.MaxModRevision != 0 { @@ -387,23 +339,24 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { return nil, err } - // When executing the operations of txn, we need to hold the txn lock. - // So the reader will not see any intermediate results. - txnID := a.s.KV().TxnBegin() - resps := make([]*pb.ResponseOp, len(reqs)) - for i := range reqs { - resps[i] = a.applyUnion(txnID, reqs[i]) - } - err := a.s.KV().TxnEnd(txnID) - if err != nil { - panic(fmt.Sprint("unexpected error when closing txn", txnID)) + // When executing the operations of txn, etcd must hold the txn lock so + // readers do not see any intermediate results. + // TODO: use Read txn if only Ranges + txn := a.s.KV().Write() + for i := range reqs { + resps[i] = a.applyUnion(txn, reqs[i]) } + rev := txn.Rev() + if len(txn.Changes()) != 0 { + rev++ + } + txn.End() txnResp := &pb.TxnResponse{} txnResp.Header = &pb.ResponseHeader{} - txnResp.Header.Revision = a.s.KV().Rev() + txnResp.Header.Revision = rev txnResp.Responses = resps txnResp.Succeeded = ok return txnResp, nil @@ -417,9 +370,6 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { rev := rr.Rev if err != nil { - if err == mvcc.ErrTxnIDMismatch { - panic("unexpected txn ID mismatch error") - } return rev, false } var ckv mvccpb.KeyValue @@ -483,11 +433,11 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { return rev, true } -func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp { +func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp { switch tv := union.Request.(type) { case *pb.RequestOp_RequestRange: if tv.RequestRange != nil { - resp, err := a.Range(txnID, tv.RequestRange) + resp, err := a.Range(txn, tv.RequestRange) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -495,7 +445,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp } case *pb.RequestOp_RequestPut: if tv.RequestPut != nil { - resp, err := a.Put(txnID, tv.RequestPut) + resp, err := a.Put(txn, tv.RequestPut) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -503,7 +453,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp } case *pb.RequestOp_RequestDeleteRange: if tv.RequestDeleteRange != nil { - resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange) + resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -605,7 +555,7 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { return nil, ErrNoSpace } @@ -699,9 +649,9 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s)} } -func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { ok := a.q.Available(p) - resp, err := a.applierV3.Put(txnID, p) + resp, err := a.applierV3.Put(txn, p) if err == nil && !ok { err = ErrNoSpace } diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index 4868e855c..7da4ae45d 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/auth" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" ) type authApplierV3 struct { @@ -58,7 +59,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, error) { +func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { return nil, err } @@ -68,17 +69,17 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er return nil, err } } - return aa.applierV3.Put(txnID, r) + return aa.applierV3.Put(txn, r) } -func (aa *authApplierV3) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(txnID, r) + return aa.applierV3.Range(txn, r) } -func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } @@ -89,7 +90,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb } } - return aa.applierV3.DeleteRange(txnID, r) + return aa.applierV3.DeleteRange(txn, r) } func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error { diff --git a/etcdserver/server.go b/etcdserver/server.go index 759200a48..dc6629852 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -807,7 +807,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. if s.lessor != nil { plog.Info("recovering lessor...") - s.lessor.Recover(newbe, s.kv) + s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) plog.Info("finished recovering lessor") } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 1472ac34b..c53c17539 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -107,7 +107,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe chk := func(ai *auth.AuthInfo) error { return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + get := func() { resp, err = s.applyV3Base.Range(nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr } @@ -122,7 +122,7 @@ func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.R chk := func(ai *auth.AuthInfo) error { return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + get := func() { resp, err = s.applyV3Base.Range(nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr }