mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6543 from xiang90/improve_txn
etcdserver: use linearizableReadNotify for txn
This commit is contained in:
commit
d80c13555a
@ -165,6 +165,41 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
|
// TODO: remove this checking when we release etcd 3.2
|
||||||
|
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
||||||
|
return s.legacyTxn(ctx, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
if isTxnReadonly(r) {
|
||||||
|
if !isTxnSerializable(r) {
|
||||||
|
err := s.linearizableReadNotify(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var resp *pb.TxnResponse
|
||||||
|
var err error
|
||||||
|
chk := func(ai *auth.AuthInfo) error {
|
||||||
|
return checkTxnAuth(s.authStore, ai, r)
|
||||||
|
}
|
||||||
|
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||||
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
|
return nil, serr
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if result.err != nil {
|
||||||
|
return nil, result.err
|
||||||
|
}
|
||||||
|
return result.resp.(*pb.TxnResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: remove this func when we release etcd 3.2
|
||||||
|
func (s *EtcdServer) legacyTxn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
if isTxnSerializable(r) {
|
if isTxnSerializable(r) {
|
||||||
var resp *pb.TxnResponse
|
var resp *pb.TxnResponse
|
||||||
var err error
|
var err error
|
||||||
@ -177,7 +212,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
|||||||
}
|
}
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
// TODO: readonly Txn do not need to go through raft
|
|
||||||
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -202,6 +236,20 @@ func isTxnSerializable(r *pb.TxnRequest) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isTxnReadonly(r *pb.TxnRequest) bool {
|
||||||
|
for _, u := range r.Success {
|
||||||
|
if r := u.GetRequestRange(); r == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, u := range r.Failure {
|
||||||
|
if r := u.GetRequestRange(); r == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
||||||
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
|
||||||
if r.Physical && result != nil && result.physc != nil {
|
if r.Physical && result != nil && result.physc != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user