From d22c00ccee72532d27f0bfe99f8c29793cbfed68 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 Nov 2023 15:55:45 +0100 Subject: [PATCH] Extract membership applier Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply/apply.go | 26 ++++++++++++++------- server/etcdserver/apply/uber_applier.go | 15 ------------ server/etcdserver/server.go | 31 ++++++++++++++++++++++++- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 1d465bf2b..aeea81365 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -102,12 +102,6 @@ type applierV3 interface { RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) - - // processing internal V3 raft request - - ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) - ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) - DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) } type SnapshotServer interface { @@ -401,7 +395,21 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } -func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) { +type applierMembership struct { + lg *zap.Logger + cluster *membership.RaftCluster + snapshotServer SnapshotServer +} + +func NewApplierMembership(lg *zap.Logger, cluster *membership.RaftCluster, snapshotServer SnapshotServer) *applierMembership { + return &applierMembership{ + lg: lg, + cluster: cluster, + snapshotServer: snapshotServer, + } +} + +func (a *applierMembership) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) { prevVersion := a.cluster.Version() newVersion := semver.Must(semver.NewVersion(r.Ver)) a.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3) @@ -418,7 +426,7 @@ func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRe } } -func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) { +func (a *applierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) { a.cluster.UpdateAttributes( types.ID(r.Member_ID), membership.Attributes{ @@ -429,7 +437,7 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt ) } -func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) { +func (a *applierMembership) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) { d := version.DowngradeInfo{Enabled: false} if r.Enabled { d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} diff --git a/server/etcdserver/apply/uber_applier.go b/server/etcdserver/apply/uber_applier.go index d3184e696..83a7dd61a 100644 --- a/server/etcdserver/apply/uber_applier.go +++ b/server/etcdserver/apply/uber_applier.go @@ -131,21 +131,6 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s } }(time.Now()) - switch { - case r.ClusterVersionSet != nil: // Implemented in 3.5.x - op = "ClusterVersionSet" - a.applyV3.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) - return ar - case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" // Implemented in 3.5.x - a.applyV3.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) - return ar - case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" // Implemented in 3.5.x - a.applyV3.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) - return ar - } - if !shouldApplyV3 { return nil } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4ad4539c6..a24481f0a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -60,6 +60,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/errors" + "go.etcd.io/etcd/server/v3/etcdserver/txn" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" @@ -1874,7 +1875,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership. if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - ar = s.uberApply.Apply(&raftReq, shouldApplyV3) + ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3) } // do not re-toApply applied entries. @@ -1910,6 +1911,34 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership. }) } +func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result { + if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil { + return s.uberApply.Apply(r, shouldApplyV3) + } + membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s) + op := "unknown" + defer func(start time.Time) { + txn.ApplySecObserve("v3", op, true, time.Since(start)) + txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil) + }(time.Now()) + switch { + case r.ClusterVersionSet != nil: + op = "ClusterVersionSet" // Implemented in 3.5.x + membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) + return &apply.Result{} + case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" // Implemented in 3.5.x + membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) + case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" // Implemented in 3.5.x + membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + default: + s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) + return nil + } + return &apply.Result{} +} + func noSideEffect(r *pb.InternalRaftRequest) bool { return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil }