Move server/etcdserver/txn.go to new package: server/etcdserver/txn

This commit is contained in:
Piotr Tabor
2022-04-03 23:29:26 +02:00
parent b073129d03
commit e2ae9b1d13
4 changed files with 48 additions and 39 deletions

View File

@@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
mvcc_txn "go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
serverstorage "go.etcd.io/etcd/server/v3/storage"
@@ -145,19 +146,19 @@ func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequ
}
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return Put(ctx, a.lg, a.lessor, a.kv, txn, p)
return mvcc_txn.Put(ctx, a.lg, a.lessor, a.kv, txn, p)
}
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
return DeleteRange(a.kv, txn, dr)
return mvcc_txn.DeleteRange(a.kv, txn, dr)
}
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
return Range(ctx, a.lg, a.kv, txn, r)
return mvcc_txn.Range(ctx, a.lg, a.kv, txn, r)
}
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
return mvcc_txn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {

View File

@@ -17,6 +17,8 @@ package etcdserver
import (
"errors"
"fmt"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
)
var (
@@ -37,11 +39,11 @@ var (
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrKeyNotFound = txn.ErrKeyNotFound
)
type DiscoveryError struct {

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package txn
import (
"bytes"
"context"
"errors"
"sort"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@@ -27,6 +28,10 @@ import (
"go.uber.org/zap"
)
var (
ErrKeyNotFound = errors.New("etcdserver: key not found")
)
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
@@ -221,7 +226,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
trace = traceutil.New("transaction", lg)
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
isWrite := !IsTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
@@ -595,3 +600,31 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
}
return true
}
func IsTxnSerializable(r *pb.TxnRequest) bool {
for _, u := range r.Success {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
for _, u := range r.Failure {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
return true
}
func IsTxnReadonly(r *pb.TxnRequest) bool {
for _, u := range r.Success {
if r := u.GetRequestRange(); r == nil {
return false
}
}
for _, u := range r.Failure {
if r := u.GetRequestRange(); r == nil {
return false
}
}
return true
}

View File

@@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/storage/mvcc"
@@ -128,7 +129,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = Range(ctx, s.Logger(), s.KV(), nil, r) }
get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
@@ -154,13 +155,13 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
}
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
if isTxnReadonly(r) {
if txn.IsTxnReadonly(r) {
trace := traceutil.New("transaction",
s.Logger(),
traceutil.Field{Key: "read_only", Value: true},
)
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
if !isTxnSerializable(r) {
if !txn.IsTxnSerializable(r) {
err := s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
@@ -179,7 +180,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())
get := func() {
resp, _, err = Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
@@ -195,34 +196,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
return resp.(*pb.TxnResponse), nil
}
func isTxnSerializable(r *pb.TxnRequest) bool {
for _, u := range r.Success {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
for _, u := range r.Failure {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
return true
}
func isTxnReadonly(r *pb.TxnRequest) bool {
for _, u := range r.Success {
if r := u.GetRequestRange(); r == nil {
return false
}
}
for _, u := range r.Failure {
if r := u.GetRequestRange(); r == nil {
return false
}
}
return true
}
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
startTime := time.Now()
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})