Extract membership applier

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-11-24 15:55:45 +01:00
parent 6db5e00103
commit d22c00ccee
3 changed files with 47 additions and 25 deletions

View File

@ -102,12 +102,6 @@ type applierV3 interface {
RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
// processing internal V3 raft request
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
}
type SnapshotServer interface {
@ -401,7 +395,21 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
return resp, err
}
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
type applierMembership struct {
lg *zap.Logger
cluster *membership.RaftCluster
snapshotServer SnapshotServer
}
func NewApplierMembership(lg *zap.Logger, cluster *membership.RaftCluster, snapshotServer SnapshotServer) *applierMembership {
return &applierMembership{
lg: lg,
cluster: cluster,
snapshotServer: snapshotServer,
}
}
func (a *applierMembership) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
prevVersion := a.cluster.Version()
newVersion := semver.Must(semver.NewVersion(r.Ver))
a.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
@ -418,7 +426,7 @@ func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRe
}
}
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
func (a *applierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
a.cluster.UpdateAttributes(
types.ID(r.Member_ID),
membership.Attributes{
@ -429,7 +437,7 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
)
}
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
func (a *applierMembership) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
d := version.DowngradeInfo{Enabled: false}
if r.Enabled {
d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}

View File

@ -131,21 +131,6 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
}
}(time.Now())
switch {
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
op = "ClusterVersionSet"
a.applyV3.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return ar
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.applyV3.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return ar
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.applyV3.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return ar
}
if !shouldApplyV3 {
return nil
}

View File

@ -60,6 +60,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
@ -1874,7 +1875,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
}
// do not re-toApply applied entries.
@ -1910,6 +1911,34 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
})
}
func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result {
if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil {
return s.uberApply.Apply(r, shouldApplyV3)
}
membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s)
op := "unknown"
defer func(start time.Time) {
txn.ApplySecObserve("v3", op, true, time.Since(start))
txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil)
}(time.Now())
switch {
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet" // Implemented in 3.5.x
membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return &apply.Result{}
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
default:
s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
return nil
}
return &apply.Result{}
}
func noSideEffect(r *pb.InternalRaftRequest) bool {
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
}