mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Encapsulating applier logic: UberApplier coordinates all appliers for server
This PR: - moves wrapping of appliers (due to Alarms) out of server.go into uber_applier.go - clearly devides the application logic into: chain of: a) 'WrapApply' (generic logic across all the methods) b) dispatcher (translation of Apply into specific method like 'Put') c) chain of 'wrappers' of the specific methods (like Put). - when we do recovery (restore from snapshot) we create new instance of appliers. The purpose is to make sure we control all the depencies of the apply process, i.e. we can supply e.g. special instance of 'backend' to the application logic.
This commit is contained in:
parent
cdf9869d70
commit
b7ad746bfe
@ -16,9 +16,6 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
@ -45,7 +42,7 @@ type applyResult struct {
|
||||
resp proto.Message
|
||||
err error
|
||||
// physc signals the physical effect of the request has completed in addition
|
||||
// to being logically reflected by the node. Currently only used for
|
||||
// to being logically reflected by the node. Currently, only used for
|
||||
// Compaction requests.
|
||||
physc <-chan struct{}
|
||||
trace *traceutil.Trace
|
||||
@ -58,9 +55,12 @@ type applierV3Internal interface {
|
||||
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
|
||||
}
|
||||
|
||||
type ApplyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
|
||||
|
||||
// applierV3 is the interface for processing V3 raft messages
|
||||
type applierV3 interface {
|
||||
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
|
||||
WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult
|
||||
//Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
|
||||
|
||||
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||
@ -100,137 +100,8 @@ type applierV3backend struct {
|
||||
s *EtcdServer
|
||||
}
|
||||
|
||||
func (s *EtcdServer) newApplierV3Backend() applierV3 {
|
||||
return &applierV3backend{s: s}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
|
||||
base := &applierV3backend{s: s}
|
||||
return base
|
||||
}
|
||||
|
||||
func (s *EtcdServer) newApplierV3() applierV3 {
|
||||
return newAuthApplierV3(
|
||||
s.AuthStore(),
|
||||
newQuotaApplierV3(s, s.newApplierV3Backend()),
|
||||
s.lessor,
|
||||
)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||
op := "unknown"
|
||||
ar := &applyResult{}
|
||||
defer func(start time.Time) {
|
||||
success := ar.err == nil || ar.err == mvcc.ErrCompacted
|
||||
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||
warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
if !success {
|
||||
warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
}
|
||||
}(time.Now())
|
||||
|
||||
switch {
|
||||
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
|
||||
op = "ClusterVersionSet"
|
||||
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
|
||||
return ar
|
||||
case r.ClusterMemberAttrSet != nil:
|
||||
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
|
||||
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
|
||||
return ar
|
||||
case r.DowngradeInfoSet != nil:
|
||||
op = "DowngradeInfoSet" // Implemented in 3.5.x
|
||||
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
|
||||
return ar
|
||||
}
|
||||
|
||||
if !shouldApplyV3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||
switch {
|
||||
case r.Range != nil:
|
||||
op = "Range"
|
||||
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
|
||||
case r.Put != nil:
|
||||
op = "Put"
|
||||
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
|
||||
case r.DeleteRange != nil:
|
||||
op = "DeleteRange"
|
||||
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
|
||||
case r.Txn != nil:
|
||||
op = "Txn"
|
||||
ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
|
||||
case r.Compaction != nil:
|
||||
op = "Compaction"
|
||||
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
|
||||
case r.LeaseGrant != nil:
|
||||
op = "LeaseGrant"
|
||||
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
|
||||
case r.LeaseRevoke != nil:
|
||||
op = "LeaseRevoke"
|
||||
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
|
||||
case r.LeaseCheckpoint != nil:
|
||||
op = "LeaseCheckpoint"
|
||||
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
|
||||
case r.Alarm != nil:
|
||||
op = "Alarm"
|
||||
ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
|
||||
case r.Authenticate != nil:
|
||||
op = "Authenticate"
|
||||
ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
|
||||
case r.AuthEnable != nil:
|
||||
op = "AuthEnable"
|
||||
ar.resp, ar.err = a.s.applyV3.AuthEnable()
|
||||
case r.AuthDisable != nil:
|
||||
op = "AuthDisable"
|
||||
ar.resp, ar.err = a.s.applyV3.AuthDisable()
|
||||
case r.AuthStatus != nil:
|
||||
ar.resp, ar.err = a.s.applyV3.AuthStatus()
|
||||
case r.AuthUserAdd != nil:
|
||||
op = "AuthUserAdd"
|
||||
ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
|
||||
case r.AuthUserDelete != nil:
|
||||
op = "AuthUserDelete"
|
||||
ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
|
||||
case r.AuthUserChangePassword != nil:
|
||||
op = "AuthUserChangePassword"
|
||||
ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
|
||||
case r.AuthUserGrantRole != nil:
|
||||
op = "AuthUserGrantRole"
|
||||
ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
|
||||
case r.AuthUserGet != nil:
|
||||
op = "AuthUserGet"
|
||||
ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
|
||||
case r.AuthUserRevokeRole != nil:
|
||||
op = "AuthUserRevokeRole"
|
||||
ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
|
||||
case r.AuthRoleAdd != nil:
|
||||
op = "AuthRoleAdd"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
|
||||
case r.AuthRoleGrantPermission != nil:
|
||||
op = "AuthRoleGrantPermission"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
|
||||
case r.AuthRoleGet != nil:
|
||||
op = "AuthRoleGet"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
|
||||
case r.AuthRoleRevokePermission != nil:
|
||||
op = "AuthRoleRevokePermission"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
|
||||
case r.AuthRoleDelete != nil:
|
||||
op = "AuthRoleDelete"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
|
||||
case r.AuthUserList != nil:
|
||||
op = "AuthUserList"
|
||||
ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
|
||||
case r.AuthRoleList != nil:
|
||||
op = "AuthRoleList"
|
||||
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
|
||||
default:
|
||||
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
|
||||
}
|
||||
return ar
|
||||
func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult {
|
||||
return applyFunc(ctx, r, shouldApplyV3)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||
@ -295,9 +166,7 @@ func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.L
|
||||
|
||||
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
||||
resp := &pb.AlarmResponse{}
|
||||
oldCount := len(a.s.alarmStore.Get(ar.Alarm))
|
||||
|
||||
lg := a.s.Logger()
|
||||
switch ar.Action {
|
||||
case pb.AlarmRequest_GET:
|
||||
resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
|
||||
@ -310,39 +179,12 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
|
||||
break
|
||||
}
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
|
||||
if !activated {
|
||||
break
|
||||
}
|
||||
|
||||
lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
|
||||
switch m.Alarm {
|
||||
case pb.AlarmType_CORRUPT:
|
||||
a.s.applyV3 = newApplierV3Corrupt(a)
|
||||
case pb.AlarmType_NOSPACE:
|
||||
a.s.applyV3 = newApplierV3Capped(a)
|
||||
default:
|
||||
lg.Panic("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
|
||||
}
|
||||
case pb.AlarmRequest_DEACTIVATE:
|
||||
m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
|
||||
if m == nil {
|
||||
break
|
||||
}
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
|
||||
if !deactivated {
|
||||
break
|
||||
}
|
||||
|
||||
switch m.Alarm {
|
||||
case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
|
||||
// TODO: check kv hash before deactivating CORRUPT?
|
||||
lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
|
||||
a.s.applyV3 = a.s.newApplierV3()
|
||||
default:
|
||||
lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
|
||||
}
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
@ -358,7 +200,7 @@ type applierV3Capped struct {
|
||||
// with Puts so that the number of keys in the store is capped.
|
||||
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
||||
|
||||
func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
return nil, nil, ErrNoSpace
|
||||
}
|
||||
|
||||
@ -369,7 +211,7 @@ func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnRes
|
||||
return a.applierV3.Txn(ctx, r)
|
||||
}
|
||||
|
||||
func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||
func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||
return nil, ErrNoSpace
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
|
||||
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||
func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult {
|
||||
aa.mu.Lock()
|
||||
defer aa.mu.Unlock()
|
||||
if r.Header != nil {
|
||||
@ -58,7 +58,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membersh
|
||||
return &applyResult{err: err}
|
||||
}
|
||||
}
|
||||
ret := aa.applierV3.Apply(r, shouldApplyV3)
|
||||
ret := aa.applierV3.WrapApply(ctx, r, shouldApplyV3, applyFunc)
|
||||
aa.authInfo.Username = ""
|
||||
aa.authInfo.Revision = 0
|
||||
return ret
|
||||
|
@ -250,11 +250,9 @@ type EtcdServer struct {
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
// applyV3 is the applier with auth and quotas
|
||||
applyV3 applierV3
|
||||
// applyV3Internal is the applier for internal request
|
||||
applyV3Internal applierV3Internal
|
||||
applyWait wait.WaitTime
|
||||
uberApply *uberApplier
|
||||
|
||||
applyWait wait.WaitTime
|
||||
|
||||
kv mvcc.WatchableKV
|
||||
lessor lease.Lessor
|
||||
@ -390,10 +388,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
srv.compactor.Run()
|
||||
}
|
||||
|
||||
srv.applyV3Internal = srv.newApplierV3Internal()
|
||||
if err = srv.restoreAlarms(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srv.uberApply = newUberApplier(srv)
|
||||
|
||||
if srv.Cfg.EnableLeaseCheckpoint {
|
||||
// setting checkpointer enables lease checkpoint feature.
|
||||
@ -1071,6 +1069,10 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
ep.appliedi = apply.snapshot.Metadata.Index
|
||||
ep.snapi = ep.appliedi
|
||||
ep.confState = apply.snapshot.Metadata.ConfState
|
||||
|
||||
// As backends and implementations like alarmsStore changed, we need
|
||||
// to re-bootstrap Appliers.
|
||||
s.uberApply = newUberApplier(s)
|
||||
}
|
||||
|
||||
func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
|
||||
@ -1888,7 +1890,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
removeNeedlessRangeReqs(raftReq.Txn)
|
||||
}
|
||||
applyV3Performed = true
|
||||
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
|
||||
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
|
||||
}
|
||||
|
||||
// do not re-apply applied entries.
|
||||
@ -2292,18 +2294,11 @@ func (s *EtcdServer) Backend() backend.Backend {
|
||||
func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
|
||||
|
||||
func (s *EtcdServer) restoreAlarms() error {
|
||||
s.applyV3 = s.newApplierV3()
|
||||
as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.alarmStore = as
|
||||
if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
|
||||
s.applyV3 = newApplierV3Capped(s.applyV3)
|
||||
}
|
||||
if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
|
||||
s.applyV3 = newApplierV3Corrupt(s.applyV3)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
220
server/etcdserver/uber_applier.go
Normal file
220
server/etcdserver/uber_applier.go
Normal file
@ -0,0 +1,220 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type uberApplier struct {
|
||||
lg *zap.Logger
|
||||
|
||||
alarmStore *v3alarm.AlarmStore
|
||||
warningApplyDuration time.Duration
|
||||
|
||||
// This is the applier that is taking in consideration current alarms
|
||||
applyV3 applierV3
|
||||
|
||||
// This is the applier used for wrapping when alarms change
|
||||
applyV3base applierV3
|
||||
|
||||
// applyV3Internal is the applier for internal requests
|
||||
// (that seems to bypass wrappings)
|
||||
// TODO(ptab): Seems artificial and could be part of the regular stack.
|
||||
applyV3Internal applierV3Internal
|
||||
}
|
||||
|
||||
func newUberApplier(s *EtcdServer) *uberApplier {
|
||||
applyV3base_ := newApplierV3(s)
|
||||
|
||||
ua := &uberApplier{
|
||||
lg: s.lg,
|
||||
alarmStore: s.alarmStore,
|
||||
warningApplyDuration: s.Cfg.WarningApplyDuration,
|
||||
applyV3: applyV3base_,
|
||||
applyV3base: applyV3base_,
|
||||
applyV3Internal: newApplierV3Internal(s),
|
||||
}
|
||||
ua.RestoreAlarms()
|
||||
return ua
|
||||
}
|
||||
|
||||
func newApplierV3Backend(s *EtcdServer) applierV3 {
|
||||
return &applierV3backend{s: s}
|
||||
}
|
||||
|
||||
func newApplierV3Internal(s *EtcdServer) applierV3Internal {
|
||||
base := &applierV3backend{s: s}
|
||||
return base
|
||||
}
|
||||
|
||||
func newApplierV3(s *EtcdServer) applierV3 {
|
||||
return newAuthApplierV3(
|
||||
s.AuthStore(),
|
||||
newQuotaApplierV3(s, newApplierV3Backend(s)),
|
||||
s.lessor,
|
||||
)
|
||||
}
|
||||
|
||||
func (a *uberApplier) RestoreAlarms() {
|
||||
noSpaceAlarms := len(a.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0
|
||||
corruptAlarms := len(a.alarmStore.Get(pb.AlarmType_CORRUPT)) > 0
|
||||
a.applyV3 = a.applyV3base
|
||||
if noSpaceAlarms {
|
||||
a.applyV3 = newApplierV3Capped(a.applyV3)
|
||||
}
|
||||
if corruptAlarms {
|
||||
a.applyV3 = newApplierV3Corrupt(a.applyV3)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||
return a.applyV3.WrapApply(context.TODO(), r, shouldApplyV3, a.dispatch)
|
||||
}
|
||||
|
||||
// This function
|
||||
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||
op := "unknown"
|
||||
ar := &applyResult{}
|
||||
defer func(start time.Time) {
|
||||
success := ar.err == nil || ar.err == mvcc.ErrCompacted
|
||||
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
|
||||
warnOfExpensiveRequest(a.lg, a.warningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
if !success {
|
||||
warnOfFailedRequest(a.lg, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
}
|
||||
}(time.Now())
|
||||
|
||||
switch {
|
||||
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
|
||||
op = "ClusterVersionSet"
|
||||
a.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
|
||||
return ar
|
||||
case r.ClusterMemberAttrSet != nil:
|
||||
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
|
||||
a.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
|
||||
return ar
|
||||
case r.DowngradeInfoSet != nil:
|
||||
op = "DowngradeInfoSet" // Implemented in 3.5.x
|
||||
a.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
|
||||
return ar
|
||||
}
|
||||
|
||||
if !shouldApplyV3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||
switch {
|
||||
case r.Range != nil:
|
||||
op = "Range"
|
||||
ar.resp, ar.err = a.applyV3.Range(ctx, nil, r.Range)
|
||||
case r.Put != nil:
|
||||
op = "Put"
|
||||
ar.resp, ar.trace, ar.err = a.applyV3.Put(ctx, nil, r.Put)
|
||||
case r.DeleteRange != nil:
|
||||
op = "DeleteRange"
|
||||
ar.resp, ar.err = a.applyV3.DeleteRange(nil, r.DeleteRange)
|
||||
case r.Txn != nil:
|
||||
op = "Txn"
|
||||
ar.resp, ar.trace, ar.err = a.applyV3.Txn(ctx, r.Txn)
|
||||
case r.Compaction != nil:
|
||||
op = "Compaction"
|
||||
ar.resp, ar.physc, ar.trace, ar.err = a.applyV3.Compaction(r.Compaction)
|
||||
case r.LeaseGrant != nil:
|
||||
op = "LeaseGrant"
|
||||
ar.resp, ar.err = a.applyV3.LeaseGrant(r.LeaseGrant)
|
||||
case r.LeaseRevoke != nil:
|
||||
op = "LeaseRevoke"
|
||||
ar.resp, ar.err = a.applyV3.LeaseRevoke(r.LeaseRevoke)
|
||||
case r.LeaseCheckpoint != nil:
|
||||
op = "LeaseCheckpoint"
|
||||
ar.resp, ar.err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
|
||||
case r.Alarm != nil:
|
||||
op = "Alarm"
|
||||
ar.resp, ar.err = a.Alarm(r.Alarm)
|
||||
case r.Authenticate != nil:
|
||||
op = "Authenticate"
|
||||
ar.resp, ar.err = a.applyV3.Authenticate(r.Authenticate)
|
||||
case r.AuthEnable != nil:
|
||||
op = "AuthEnable"
|
||||
ar.resp, ar.err = a.applyV3.AuthEnable()
|
||||
case r.AuthDisable != nil:
|
||||
op = "AuthDisable"
|
||||
ar.resp, ar.err = a.applyV3.AuthDisable()
|
||||
case r.AuthStatus != nil:
|
||||
ar.resp, ar.err = a.applyV3.AuthStatus()
|
||||
case r.AuthUserAdd != nil:
|
||||
op = "AuthUserAdd"
|
||||
ar.resp, ar.err = a.applyV3.UserAdd(r.AuthUserAdd)
|
||||
case r.AuthUserDelete != nil:
|
||||
op = "AuthUserDelete"
|
||||
ar.resp, ar.err = a.applyV3.UserDelete(r.AuthUserDelete)
|
||||
case r.AuthUserChangePassword != nil:
|
||||
op = "AuthUserChangePassword"
|
||||
ar.resp, ar.err = a.applyV3.UserChangePassword(r.AuthUserChangePassword)
|
||||
case r.AuthUserGrantRole != nil:
|
||||
op = "AuthUserGrantRole"
|
||||
ar.resp, ar.err = a.applyV3.UserGrantRole(r.AuthUserGrantRole)
|
||||
case r.AuthUserGet != nil:
|
||||
op = "AuthUserGet"
|
||||
ar.resp, ar.err = a.applyV3.UserGet(r.AuthUserGet)
|
||||
case r.AuthUserRevokeRole != nil:
|
||||
op = "AuthUserRevokeRole"
|
||||
ar.resp, ar.err = a.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
|
||||
case r.AuthRoleAdd != nil:
|
||||
op = "AuthRoleAdd"
|
||||
ar.resp, ar.err = a.applyV3.RoleAdd(r.AuthRoleAdd)
|
||||
case r.AuthRoleGrantPermission != nil:
|
||||
op = "AuthRoleGrantPermission"
|
||||
ar.resp, ar.err = a.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
|
||||
case r.AuthRoleGet != nil:
|
||||
op = "AuthRoleGet"
|
||||
ar.resp, ar.err = a.applyV3.RoleGet(r.AuthRoleGet)
|
||||
case r.AuthRoleRevokePermission != nil:
|
||||
op = "AuthRoleRevokePermission"
|
||||
ar.resp, ar.err = a.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
|
||||
case r.AuthRoleDelete != nil:
|
||||
op = "AuthRoleDelete"
|
||||
ar.resp, ar.err = a.applyV3.RoleDelete(r.AuthRoleDelete)
|
||||
case r.AuthUserList != nil:
|
||||
op = "AuthUserList"
|
||||
ar.resp, ar.err = a.applyV3.UserList(r.AuthUserList)
|
||||
case r.AuthRoleList != nil:
|
||||
op = "AuthRoleList"
|
||||
ar.resp, ar.err = a.applyV3.RoleList(r.AuthRoleList)
|
||||
default:
|
||||
a.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
|
||||
}
|
||||
return ar
|
||||
}
|
||||
|
||||
func (a *uberApplier) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
||||
resp, err := a.applyV3.Alarm(ar)
|
||||
|
||||
if ar.Action == pb.AlarmRequest_ACTIVATE ||
|
||||
ar.Action == pb.AlarmRequest_DEACTIVATE {
|
||||
a.RestoreAlarms()
|
||||
}
|
||||
return resp, err
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user