etcdserver: support mvcc txn

This commit is contained in:
Anthony Romano 2017-01-05 01:13:47 -08:00
parent f0c184b3a2
commit 58da8b17ee
4 changed files with 87 additions and 136 deletions

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"bytes"
"fmt"
"sort"
"time"
@ -30,11 +29,6 @@ import (
)
const (
// noTxn is an invalid txn ID.
// To apply with independent Range, Put, Delete, you can pass noTxn
// to apply functions instead of a valid txn ID.
noTxn = -1
warnApplyDuration = 100 * time.Millisecond
)
@ -51,9 +45,9 @@ type applyResult struct {
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult
Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
@ -99,11 +93,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
ar.resp, ar.err = a.s.applyV3.Range(noTxn, r.Range)
ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
case r.Put != nil:
ar.resp, ar.err = a.s.applyV3.Put(noTxn, r.Put)
ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
case r.DeleteRange != nil:
ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange)
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
case r.Compaction != nil:
@ -152,122 +146,87 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
return ar
}
func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
resp := &pb.PutResponse{}
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
var (
rev int64
err error
)
var rr *mvcc.RangeResult
if p.PrevKv || p.IgnoreValue || p.IgnoreLease {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if p.IgnoreValue {
if rr == nil || len(rr.KVs) == 0 {
// ignore_value flag expects previous key-value pair
return nil, ErrKeyNotFound
}
p.Value = rr.KVs[0].Value
}
if p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_lease flag expects previous key-value pair
return nil, ErrKeyNotFound
}
p.Lease = rr.KVs[0].Lease
}
if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
return nil, err
}
} else {
leaseID := lease.LeaseID(p.Lease)
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
}
}
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
txn = a.s.KV().Write()
defer txn.End()
}
resp.Header.Revision = rev
if p.PrevKv && rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
return resp, nil
}
func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
var (
n int64
rev int64
err error
)
if txn == nil {
txn = a.s.kv.Write()
defer txn.End()
}
if isGteRange(dr.RangeEnd) {
dr.RangeEnd = []byte{}
}
var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd)
}
resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
}
resp.Header.Revision = rev
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd)
return resp, nil
}
func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
var (
rr *mvcc.RangeResult
err error
)
if txn == nil {
txn = a.s.kv.Read()
defer txn.End()
}
if isGteRange(r.RangeEnd) {
r.RangeEnd = []byte{}
@ -291,16 +250,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
Count: r.CountOnly,
}
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
rr, err := txn.Range(r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
if r.MaxModRevision != 0 {
@ -387,23 +339,24 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, err
}
// When executing the operations of txn, we need to hold the txn lock.
// So the reader will not see any intermediate results.
txnID := a.s.KV().TxnBegin()
resps := make([]*pb.ResponseOp, len(reqs))
for i := range reqs {
resps[i] = a.applyUnion(txnID, reqs[i])
}
err := a.s.KV().TxnEnd(txnID)
if err != nil {
panic(fmt.Sprint("unexpected error when closing txn", txnID))
// When executing the operations of txn, etcd must hold the txn lock so
// readers do not see any intermediate results.
// TODO: use Read txn if only Ranges
txn := a.s.KV().Write()
for i := range reqs {
resps[i] = a.applyUnion(txn, reqs[i])
}
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
}
txn.End()
txnResp := &pb.TxnResponse{}
txnResp.Header = &pb.ResponseHeader{}
txnResp.Header.Revision = a.s.KV().Rev()
txnResp.Header.Revision = rev
txnResp.Responses = resps
txnResp.Succeeded = ok
return txnResp, nil
@ -417,9 +370,6 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
rev := rr.Rev
if err != nil {
if err == mvcc.ErrTxnIDMismatch {
panic("unexpected txn ID mismatch error")
}
return rev, false
}
var ckv mvccpb.KeyValue
@ -483,11 +433,11 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
return rev, true
}
func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp {
func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
switch tv := union.Request.(type) {
case *pb.RequestOp_RequestRange:
if tv.RequestRange != nil {
resp, err := a.Range(txnID, tv.RequestRange)
resp, err := a.Range(txn, tv.RequestRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
@ -495,7 +445,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp
}
case *pb.RequestOp_RequestPut:
if tv.RequestPut != nil {
resp, err := a.Put(txnID, tv.RequestPut)
resp, err := a.Put(txn, tv.RequestPut)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
@ -503,7 +453,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp
}
case *pb.RequestOp_RequestDeleteRange:
if tv.RequestDeleteRange != nil {
resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange)
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
@ -605,7 +555,7 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
return nil, ErrNoSpace
}
@ -699,9 +649,9 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s)}
}
func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
ok := a.q.Available(p)
resp, err := a.applierV3.Put(txnID, p)
resp, err := a.applierV3.Put(txn, p)
if err == nil && !ok {
err = ErrNoSpace
}

View File

@ -19,6 +19,7 @@ import (
"github.com/coreos/etcd/auth"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
)
type authApplierV3 struct {
@ -58,7 +59,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
return ret
}
func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, error) {
func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
return nil, err
}
@ -68,17 +69,17 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
return nil, err
}
}
return aa.applierV3.Put(txnID, r)
return aa.applierV3.Put(txn, r)
}
func (aa *authApplierV3) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, err
}
return aa.applierV3.Range(txnID, r)
return aa.applierV3.Range(txn, r)
}
func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, err
}
@ -89,7 +90,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
}
}
return aa.applierV3.DeleteRange(txnID, r)
return aa.applierV3.DeleteRange(txn, r)
}
func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error {

View File

@ -807,7 +807,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
plog.Info("finished recovering lessor")
}

View File

@ -107,7 +107,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
get := func() { resp, err = s.applyV3Base.Range(nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
@ -122,7 +122,7 @@ func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.R
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
get := func() { resp, err = s.applyV3Base.Range(nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}