mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: apply serialized requests outside auth apply lock
Fixes #6010
This commit is contained in:
parent
79d25a6884
commit
06da46c4ee
@ -92,7 +92,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
|
||||
return aa.applierV3.DeleteRange(txnID, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
|
||||
func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error {
|
||||
for _, requ := range reqs {
|
||||
switch tv := requ.Request.(type) {
|
||||
case *pb.RequestOp_RequestRange:
|
||||
@ -100,7 +100,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := aa.as.IsRangePermitted(&aa.authInfo, tv.RequestRange.Key, tv.RequestRange.RangeEnd); err != nil {
|
||||
if err := as.IsRangePermitted(ai, tv.RequestRange.Key, tv.RequestRange.RangeEnd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := aa.as.IsPutPermitted(&aa.authInfo, tv.RequestPut.Key); err != nil {
|
||||
if err := as.IsPutPermitted(ai, tv.RequestPut.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -119,13 +119,13 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
|
||||
}
|
||||
|
||||
if tv.RequestDeleteRange.PrevKv {
|
||||
err := aa.as.IsRangePermitted(&aa.authInfo, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
|
||||
err := as.IsRangePermitted(ai, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := aa.as.IsDeleteRangePermitted(&aa.authInfo, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
|
||||
err := as.IsDeleteRangePermitted(ai, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -135,20 +135,25 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error {
|
||||
for _, c := range rt.Compare {
|
||||
if err := aa.as.IsRangePermitted(&aa.authInfo, c.Key, nil); err != nil {
|
||||
return nil, err
|
||||
if err := as.IsRangePermitted(ai, c.Key, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := checkTxnReqsPermission(as, ai, rt.Success); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkTxnReqsPermission(as, ai, rt.Failure); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := aa.checkTxnReqsPermission(rt.Success); err != nil {
|
||||
func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
if err := checkTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := aa.checkTxnReqsPermission(rt.Failure); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return aa.applierV3.Txn(rt)
|
||||
}
|
||||
|
||||
|
@ -181,13 +181,16 @@ type EtcdServer struct {
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
applyV3 applierV3
|
||||
kv mvcc.ConsistentWatchableKV
|
||||
lessor lease.Lessor
|
||||
bemu sync.Mutex
|
||||
be backend.Backend
|
||||
authStore auth.AuthStore
|
||||
alarmStore *alarm.AlarmStore
|
||||
// applyV3 is the applier with auth and quotas
|
||||
applyV3 applierV3
|
||||
// applyV3Base is the core applier without auth or quotas
|
||||
applyV3Base applierV3
|
||||
kv mvcc.ConsistentWatchableKV
|
||||
lessor lease.Lessor
|
||||
bemu sync.Mutex
|
||||
be backend.Backend
|
||||
authStore auth.AuthStore
|
||||
alarmStore *alarm.AlarmStore
|
||||
|
||||
stats *stats.ServerStats
|
||||
lstats *stats.LeaderStats
|
||||
@ -405,6 +408,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
srv.compactor.Run()
|
||||
}
|
||||
|
||||
srv.applyV3Base = &applierV3backend{srv}
|
||||
if err = srv.restoreAlarms(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -84,41 +84,19 @@ type Authenticator interface {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
var result *applyResult
|
||||
var err error
|
||||
|
||||
if r.Serializable {
|
||||
for {
|
||||
authInfo, err := s.authInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hdr := &pb.RequestHeader{}
|
||||
if authInfo != nil {
|
||||
hdr.Username = authInfo.Username
|
||||
hdr.AuthRevision = authInfo.Revision
|
||||
}
|
||||
|
||||
result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Range: r})
|
||||
|
||||
if result.err != nil {
|
||||
if result.err == auth.ErrAuthOldRevision {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if authInfo == nil || authInfo.Revision == s.authStore.Revision() {
|
||||
break
|
||||
}
|
||||
|
||||
// The revision that authorized this request is obsolete.
|
||||
// For avoiding TOCTOU problem, retry of the request is required.
|
||||
var resp *pb.RangeResponse
|
||||
var err error
|
||||
chk := func(ai *auth.AuthInfo) error {
|
||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||
}
|
||||
} else {
|
||||
result, err = s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
|
||||
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
return nil, serr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -151,41 +129,19 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
var result *applyResult
|
||||
var err error
|
||||
|
||||
if isTxnSerializable(r) {
|
||||
for {
|
||||
authInfo, err := s.authInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hdr := &pb.RequestHeader{}
|
||||
if authInfo != nil {
|
||||
hdr.Username = authInfo.Username
|
||||
hdr.AuthRevision = authInfo.Revision
|
||||
}
|
||||
|
||||
result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Txn: r})
|
||||
|
||||
if result.err != nil {
|
||||
if result.err == auth.ErrAuthOldRevision {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if authInfo == nil || authInfo.Revision == s.authStore.Revision() {
|
||||
break
|
||||
}
|
||||
|
||||
// The revision that authorized this request is obsolete.
|
||||
// For avoiding TOCTOU problem, retry of this request is required.
|
||||
var resp *pb.TxnResponse
|
||||
var err error
|
||||
chk := func(ai *auth.AuthInfo) error {
|
||||
return checkTxnAuth(s.authStore, ai, r)
|
||||
}
|
||||
} else {
|
||||
result, err = s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
||||
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
return nil, serr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -551,6 +507,33 @@ func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
|
||||
return authInfo, nil
|
||||
}
|
||||
|
||||
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
|
||||
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
|
||||
for {
|
||||
ai, err := s.authInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ai == nil {
|
||||
// chk expects non-nil AuthInfo; use empty credentials
|
||||
ai = &auth.AuthInfo{}
|
||||
}
|
||||
if err = chk(ai); err != nil {
|
||||
if err == auth.ErrAuthOldRevision {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
// fetch response for serialized request
|
||||
get()
|
||||
// empty credentials or current auth info means no need to retry
|
||||
if ai.Revision == 0 || ai.Revision == s.authStore.Revision() {
|
||||
return nil
|
||||
}
|
||||
// avoid TOCTOU error, retry of the request is required.
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
|
||||
ai := s.getAppliedIndex()
|
||||
ci := s.getCommittedIndex()
|
||||
|
Loading…
x
Reference in New Issue
Block a user