From b7ad746bfe6e2e4f7cb3beb89a8dff3c37d92b87 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sat, 2 Apr 2022 21:25:46 +0200 Subject: [PATCH] Encapsulating applier logic: UberApplier coordinates all appliers for server This PR: - moves wrapping of appliers (due to Alarms) out of server.go into uber_applier.go - clearly devides the application logic into: chain of: a) 'WrapApply' (generic logic across all the methods) b) dispatcher (translation of Apply into specific method like 'Put') c) chain of 'wrappers' of the specific methods (like Put). - when we do recovery (restore from snapshot) we create new instance of appliers. The purpose is to make sure we control all the depencies of the apply process, i.e. we can supply e.g. special instance of 'backend' to the application logic. --- server/etcdserver/apply.go | 176 ++---------------------- server/etcdserver/apply_auth.go | 4 +- server/etcdserver/server.go | 23 ++-- server/etcdserver/uber_applier.go | 220 ++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 183 deletions(-) create mode 100644 server/etcdserver/uber_applier.go diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index afef09263..f54099d6b 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -16,9 +16,6 @@ package etcdserver import ( "context" - "fmt" - "strconv" - "time" "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -45,7 +42,7 @@ type applyResult struct { resp proto.Message err error // physc signals the physical effect of the request has completed in addition - // to being logically reflected by the node. Currently only used for + // to being logically reflected by the node. Currently, only used for // Compaction requests. physc <-chan struct{} trace *traceutil.Trace @@ -58,9 +55,12 @@ type applierV3Internal interface { DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) } +type ApplyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult + // applierV3 is the interface for processing V3 raft messages type applierV3 interface { - Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult + WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult + //Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) @@ -100,137 +100,8 @@ type applierV3backend struct { s *EtcdServer } -func (s *EtcdServer) newApplierV3Backend() applierV3 { - return &applierV3backend{s: s} -} - -func (s *EtcdServer) newApplierV3Internal() applierV3Internal { - base := &applierV3backend{s: s} - return base -} - -func (s *EtcdServer) newApplierV3() applierV3 { - return newAuthApplierV3( - s.AuthStore(), - newQuotaApplierV3(s, s.newApplierV3Backend()), - s.lessor, - ) -} - -func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { - op := "unknown" - ar := &applyResult{} - defer func(start time.Time) { - success := ar.err == nil || ar.err == mvcc.ErrCompacted - applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) - warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) - if !success { - warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) - } - }(time.Now()) - - switch { - case r.ClusterVersionSet != nil: // Implemented in 3.5.x - op = "ClusterVersionSet" - a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) - return ar - case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" // Implemented in 3.5.x - a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) - return ar - case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" // Implemented in 3.5.x - a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) - return ar - } - - if !shouldApplyV3 { - return nil - } - - // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls - switch { - case r.Range != nil: - op = "Range" - ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) - case r.Put != nil: - op = "Put" - ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put) - case r.DeleteRange != nil: - op = "DeleteRange" - ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) - case r.Txn != nil: - op = "Txn" - ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn) - case r.Compaction != nil: - op = "Compaction" - ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) - case r.LeaseGrant != nil: - op = "LeaseGrant" - ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) - case r.LeaseRevoke != nil: - op = "LeaseRevoke" - ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke) - case r.LeaseCheckpoint != nil: - op = "LeaseCheckpoint" - ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) - case r.Alarm != nil: - op = "Alarm" - ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm) - case r.Authenticate != nil: - op = "Authenticate" - ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate) - case r.AuthEnable != nil: - op = "AuthEnable" - ar.resp, ar.err = a.s.applyV3.AuthEnable() - case r.AuthDisable != nil: - op = "AuthDisable" - ar.resp, ar.err = a.s.applyV3.AuthDisable() - case r.AuthStatus != nil: - ar.resp, ar.err = a.s.applyV3.AuthStatus() - case r.AuthUserAdd != nil: - op = "AuthUserAdd" - ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd) - case r.AuthUserDelete != nil: - op = "AuthUserDelete" - ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete) - case r.AuthUserChangePassword != nil: - op = "AuthUserChangePassword" - ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword) - case r.AuthUserGrantRole != nil: - op = "AuthUserGrantRole" - ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole) - case r.AuthUserGet != nil: - op = "AuthUserGet" - ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet) - case r.AuthUserRevokeRole != nil: - op = "AuthUserRevokeRole" - ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole) - case r.AuthRoleAdd != nil: - op = "AuthRoleAdd" - ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd) - case r.AuthRoleGrantPermission != nil: - op = "AuthRoleGrantPermission" - ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission) - case r.AuthRoleGet != nil: - op = "AuthRoleGet" - ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet) - case r.AuthRoleRevokePermission != nil: - op = "AuthRoleRevokePermission" - ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission) - case r.AuthRoleDelete != nil: - op = "AuthRoleDelete" - ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete) - case r.AuthUserList != nil: - op = "AuthUserList" - ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList) - case r.AuthRoleList != nil: - op = "AuthRoleList" - ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) - default: - a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) - } - return ar +func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult { + return applyFunc(ctx, r, shouldApplyV3) } func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { @@ -295,9 +166,7 @@ func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.L func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { resp := &pb.AlarmResponse{} - oldCount := len(a.s.alarmStore.Get(ar.Alarm)) - lg := a.s.Logger() switch ar.Action { case pb.AlarmRequest_GET: resp.Alarms = a.s.alarmStore.Get(ar.Alarm) @@ -310,39 +179,12 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) break } resp.Alarms = append(resp.Alarms, m) - activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1 - if !activated { - break - } - - lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) - switch m.Alarm { - case pb.AlarmType_CORRUPT: - a.s.applyV3 = newApplierV3Corrupt(a) - case pb.AlarmType_NOSPACE: - a.s.applyV3 = newApplierV3Capped(a) - default: - lg.Panic("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m))) - } case pb.AlarmRequest_DEACTIVATE: m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm) if m == nil { break } resp.Alarms = append(resp.Alarms, m) - deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0 - if !deactivated { - break - } - - switch m.Alarm { - case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT: - // TODO: check kv hash before deactivating CORRUPT? - lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) - a.s.applyV3 = a.s.newApplierV3() - default: - lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m))) - } default: return nil, nil } @@ -358,7 +200,7 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { return nil, nil, ErrNoSpace } @@ -369,7 +211,7 @@ func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnRes return a.applierV3.Txn(ctx, r) } -func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { +func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { return nil, ErrNoSpace } diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index bf043aa73..f11625e4d 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -42,7 +42,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a return &authApplierV3{applierV3: base, as: as, lessor: lessor} } -func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { +func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *applyResult { aa.mu.Lock() defer aa.mu.Unlock() if r.Header != nil { @@ -58,7 +58,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membersh return &applyResult{err: err} } } - ret := aa.applierV3.Apply(r, shouldApplyV3) + ret := aa.applierV3.WrapApply(ctx, r, shouldApplyV3, applyFunc) aa.authInfo.Username = "" aa.authInfo.Revision = 0 return ret diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8f6c75a1c..0ac4d178d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -250,11 +250,9 @@ type EtcdServer struct { applyV2 ApplierV2 - // applyV3 is the applier with auth and quotas - applyV3 applierV3 - // applyV3Internal is the applier for internal request - applyV3Internal applierV3Internal - applyWait wait.WaitTime + uberApply *uberApplier + + applyWait wait.WaitTime kv mvcc.WatchableKV lessor lease.Lessor @@ -390,10 +388,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.compactor.Run() } - srv.applyV3Internal = srv.newApplierV3Internal() if err = srv.restoreAlarms(); err != nil { return nil, err } + srv.uberApply = newUberApplier(srv) if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. @@ -1071,6 +1069,10 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { ep.appliedi = apply.snapshot.Metadata.Index ep.snapi = ep.appliedi ep.confState = apply.snapshot.Metadata.ConfState + + // As backends and implementations like alarmsStore changed, we need + // to re-bootstrap Appliers. + s.uberApply = newUberApplier(s) } func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) { @@ -1888,7 +1890,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { removeNeedlessRangeReqs(raftReq.Txn) } applyV3Performed = true - ar = s.applyV3.Apply(&raftReq, shouldApplyV3) + ar = s.uberApply.Apply(&raftReq, shouldApplyV3) } // do not re-apply applied entries. @@ -2292,18 +2294,11 @@ func (s *EtcdServer) Backend() backend.Backend { func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore } func (s *EtcdServer) restoreAlarms() error { - s.applyV3 = s.newApplierV3() as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be)) if err != nil { return err } s.alarmStore = as - if len(as.Get(pb.AlarmType_NOSPACE)) > 0 { - s.applyV3 = newApplierV3Capped(s.applyV3) - } - if len(as.Get(pb.AlarmType_CORRUPT)) > 0 { - s.applyV3 = newApplierV3Corrupt(s.applyV3) - } return nil } diff --git a/server/etcdserver/uber_applier.go b/server/etcdserver/uber_applier.go new file mode 100644 index 000000000..f8e41dbc0 --- /dev/null +++ b/server/etcdserver/uber_applier.go @@ -0,0 +1,220 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "context" + "strconv" + "time" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" + "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.uber.org/zap" +) + +type uberApplier struct { + lg *zap.Logger + + alarmStore *v3alarm.AlarmStore + warningApplyDuration time.Duration + + // This is the applier that is taking in consideration current alarms + applyV3 applierV3 + + // This is the applier used for wrapping when alarms change + applyV3base applierV3 + + // applyV3Internal is the applier for internal requests + // (that seems to bypass wrappings) + // TODO(ptab): Seems artificial and could be part of the regular stack. + applyV3Internal applierV3Internal +} + +func newUberApplier(s *EtcdServer) *uberApplier { + applyV3base_ := newApplierV3(s) + + ua := &uberApplier{ + lg: s.lg, + alarmStore: s.alarmStore, + warningApplyDuration: s.Cfg.WarningApplyDuration, + applyV3: applyV3base_, + applyV3base: applyV3base_, + applyV3Internal: newApplierV3Internal(s), + } + ua.RestoreAlarms() + return ua +} + +func newApplierV3Backend(s *EtcdServer) applierV3 { + return &applierV3backend{s: s} +} + +func newApplierV3Internal(s *EtcdServer) applierV3Internal { + base := &applierV3backend{s: s} + return base +} + +func newApplierV3(s *EtcdServer) applierV3 { + return newAuthApplierV3( + s.AuthStore(), + newQuotaApplierV3(s, newApplierV3Backend(s)), + s.lessor, + ) +} + +func (a *uberApplier) RestoreAlarms() { + noSpaceAlarms := len(a.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 + corruptAlarms := len(a.alarmStore.Get(pb.AlarmType_CORRUPT)) > 0 + a.applyV3 = a.applyV3base + if noSpaceAlarms { + a.applyV3 = newApplierV3Capped(a.applyV3) + } + if corruptAlarms { + a.applyV3 = newApplierV3Corrupt(a.applyV3) + } +} + +func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { + return a.applyV3.WrapApply(context.TODO(), r, shouldApplyV3, a.dispatch) +} + +// This function +func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { + op := "unknown" + ar := &applyResult{} + defer func(start time.Time) { + success := ar.err == nil || ar.err == mvcc.ErrCompacted + applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) + warnOfExpensiveRequest(a.lg, a.warningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + if !success { + warnOfFailedRequest(a.lg, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + } + }(time.Now()) + + switch { + case r.ClusterVersionSet != nil: // Implemented in 3.5.x + op = "ClusterVersionSet" + a.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) + return ar + case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" // Implemented in 3.5.x + a.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) + return ar + case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" // Implemented in 3.5.x + a.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + return ar + } + + if !shouldApplyV3 { + return nil + } + + // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls + switch { + case r.Range != nil: + op = "Range" + ar.resp, ar.err = a.applyV3.Range(ctx, nil, r.Range) + case r.Put != nil: + op = "Put" + ar.resp, ar.trace, ar.err = a.applyV3.Put(ctx, nil, r.Put) + case r.DeleteRange != nil: + op = "DeleteRange" + ar.resp, ar.err = a.applyV3.DeleteRange(nil, r.DeleteRange) + case r.Txn != nil: + op = "Txn" + ar.resp, ar.trace, ar.err = a.applyV3.Txn(ctx, r.Txn) + case r.Compaction != nil: + op = "Compaction" + ar.resp, ar.physc, ar.trace, ar.err = a.applyV3.Compaction(r.Compaction) + case r.LeaseGrant != nil: + op = "LeaseGrant" + ar.resp, ar.err = a.applyV3.LeaseGrant(r.LeaseGrant) + case r.LeaseRevoke != nil: + op = "LeaseRevoke" + ar.resp, ar.err = a.applyV3.LeaseRevoke(r.LeaseRevoke) + case r.LeaseCheckpoint != nil: + op = "LeaseCheckpoint" + ar.resp, ar.err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) + case r.Alarm != nil: + op = "Alarm" + ar.resp, ar.err = a.Alarm(r.Alarm) + case r.Authenticate != nil: + op = "Authenticate" + ar.resp, ar.err = a.applyV3.Authenticate(r.Authenticate) + case r.AuthEnable != nil: + op = "AuthEnable" + ar.resp, ar.err = a.applyV3.AuthEnable() + case r.AuthDisable != nil: + op = "AuthDisable" + ar.resp, ar.err = a.applyV3.AuthDisable() + case r.AuthStatus != nil: + ar.resp, ar.err = a.applyV3.AuthStatus() + case r.AuthUserAdd != nil: + op = "AuthUserAdd" + ar.resp, ar.err = a.applyV3.UserAdd(r.AuthUserAdd) + case r.AuthUserDelete != nil: + op = "AuthUserDelete" + ar.resp, ar.err = a.applyV3.UserDelete(r.AuthUserDelete) + case r.AuthUserChangePassword != nil: + op = "AuthUserChangePassword" + ar.resp, ar.err = a.applyV3.UserChangePassword(r.AuthUserChangePassword) + case r.AuthUserGrantRole != nil: + op = "AuthUserGrantRole" + ar.resp, ar.err = a.applyV3.UserGrantRole(r.AuthUserGrantRole) + case r.AuthUserGet != nil: + op = "AuthUserGet" + ar.resp, ar.err = a.applyV3.UserGet(r.AuthUserGet) + case r.AuthUserRevokeRole != nil: + op = "AuthUserRevokeRole" + ar.resp, ar.err = a.applyV3.UserRevokeRole(r.AuthUserRevokeRole) + case r.AuthRoleAdd != nil: + op = "AuthRoleAdd" + ar.resp, ar.err = a.applyV3.RoleAdd(r.AuthRoleAdd) + case r.AuthRoleGrantPermission != nil: + op = "AuthRoleGrantPermission" + ar.resp, ar.err = a.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission) + case r.AuthRoleGet != nil: + op = "AuthRoleGet" + ar.resp, ar.err = a.applyV3.RoleGet(r.AuthRoleGet) + case r.AuthRoleRevokePermission != nil: + op = "AuthRoleRevokePermission" + ar.resp, ar.err = a.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission) + case r.AuthRoleDelete != nil: + op = "AuthRoleDelete" + ar.resp, ar.err = a.applyV3.RoleDelete(r.AuthRoleDelete) + case r.AuthUserList != nil: + op = "AuthUserList" + ar.resp, ar.err = a.applyV3.UserList(r.AuthUserList) + case r.AuthRoleList != nil: + op = "AuthRoleList" + ar.resp, ar.err = a.applyV3.RoleList(r.AuthRoleList) + default: + a.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) + } + return ar +} + +func (a *uberApplier) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { + resp, err := a.applyV3.Alarm(ar) + + if ar.Action == pb.AlarmRequest_ACTIVATE || + ar.Action == pb.AlarmRequest_DEACTIVATE { + a.RestoreAlarms() + } + return resp, err +}