diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 5ccda706f..ac2d33815 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" "go.etcd.io/etcd/server/v3/etcdserver/cindex" + mvcc_txn "go.etcd.io/etcd/server/v3/etcdserver/txn" "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" serverstorage "go.etcd.io/etcd/server/v3/storage" @@ -145,19 +146,19 @@ func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequ } func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - return Put(ctx, a.lg, a.lessor, a.kv, txn, p) + return mvcc_txn.Put(ctx, a.lg, a.lessor, a.kv, txn, p) } func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { - return DeleteRange(a.kv, txn, dr) + return mvcc_txn.DeleteRange(a.kv, txn, dr) } func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { - return Range(ctx, a.lg, a.kv, txn, r) + return mvcc_txn.Range(ctx, a.lg, a.kv, txn, r) } func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { - return Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor) + return mvcc_txn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor) } func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index e28f49c17..aa74739d8 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -17,6 +17,8 @@ package etcdserver import ( "errors" "fmt" + + "go.etcd.io/etcd/server/v3/etcdserver/txn" ) var ( @@ -37,11 +39,11 @@ var ( ErrNoSpace = errors.New("etcdserver: no space") ErrTooManyRequests = errors.New("etcdserver: too many requests") ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") - ErrKeyNotFound = errors.New("etcdserver: key not found") ErrCorrupt = errors.New("etcdserver: corrupt cluster") ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") + ErrKeyNotFound = txn.ErrKeyNotFound ) type DiscoveryError struct { diff --git a/server/etcdserver/txn.go b/server/etcdserver/txn/txn.go similarity index 96% rename from server/etcdserver/txn.go rename to server/etcdserver/txn/txn.go index 747017093..3a5c2debf 100644 --- a/server/etcdserver/txn.go +++ b/server/etcdserver/txn/txn.go @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package txn import ( "bytes" "context" + "errors" "sort" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -27,6 +28,10 @@ import ( "go.uber.org/zap" ) +var ( + ErrKeyNotFound = errors.New("etcdserver: key not found") +) + func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} @@ -221,7 +226,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit trace = traceutil.New("transaction", lg) ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } - isWrite := !isTxnReadonly(rt) + isWrite := !IsTxnReadonly(rt) // When the transaction contains write operations, we use ReadTx instead of // ConcurrentReadTx to avoid extra overhead of copying buffer. @@ -595,3 +600,31 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool { } return true } + +func IsTxnSerializable(r *pb.TxnRequest) bool { + for _, u := range r.Success { + if r := u.GetRequestRange(); r == nil || !r.Serializable { + return false + } + } + for _, u := range r.Failure { + if r := u.GetRequestRange(); r == nil || !r.Serializable { + return false + } + } + return true +} + +func IsTxnReadonly(r *pb.TxnRequest) bool { + for _, u := range r.Success { + if r := u.GetRequestRange(); r == nil { + return false + } + } + for _, u := range r.Failure { + if r := u.GetRequestRange(); r == nil { + return false + } + } + return true +} diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index bc35c2eff..583ffa387 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/txn" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -128,7 +129,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = Range(ctx, s.Logger(), s.KV(), nil, r) } + get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -154,13 +155,13 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) } func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - if isTxnReadonly(r) { + if txn.IsTxnReadonly(r) { trace := traceutil.New("transaction", s.Logger(), traceutil.Field{Key: "read_only", Value: true}, ) ctx = context.WithValue(ctx, traceutil.TraceKey, trace) - if !isTxnSerializable(r) { + if !txn.IsTxnSerializable(r) { err := s.linearizableReadNotify(ctx) trace.Step("agreement among raft nodes before linearized reading") if err != nil { @@ -179,7 +180,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse }(time.Now()) get := func() { - resp, _, err = Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor) + resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr @@ -195,34 +196,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse return resp.(*pb.TxnResponse), nil } -func isTxnSerializable(r *pb.TxnRequest) bool { - for _, u := range r.Success { - if r := u.GetRequestRange(); r == nil || !r.Serializable { - return false - } - } - for _, u := range r.Failure { - if r := u.GetRequestRange(); r == nil || !r.Serializable { - return false - } - } - return true -} - -func isTxnReadonly(r *pb.TxnRequest) bool { - for _, u := range r.Success { - if r := u.GetRequestRange(); r == nil { - return false - } - } - for _, u := range r.Failure { - if r := u.GetRequestRange(); r == nil { - return false - } - } - return true -} - func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { startTime := time.Now() result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})