From 2f31cc3fbcdc209313ad74ade488d4059a1198e2 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 5 Jul 2021 16:40:17 +0200 Subject: [PATCH] etcdserver: Create AlarmBackend interface --- server/etcdserver/api/v3alarm/alarms.go | 30 ++++++++-------- server/etcdserver/server.go | 2 +- server/storage/schema/alarm.go | 48 +++++++++++++++++++------ 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 4dc56f106..6dfcfd117 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -21,7 +21,6 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" ) @@ -30,6 +29,14 @@ type BackendGetter interface { Backend() backend.Backend } +type AlarmBackend interface { + CreateAlarmBucket() + MustPutAlarm(member *pb.AlarmMember) + MustDeleteAlarm(alarm *pb.AlarmMember) + GetAllAlarms() ([]*pb.AlarmMember, error) + ForceCommit() +} + type alarmSet map[types.ID]*pb.AlarmMember // AlarmStore persists alarms to the backend. @@ -38,14 +45,14 @@ type AlarmStore struct { mu sync.Mutex types map[pb.AlarmType]alarmSet - bg BackendGetter + be AlarmBackend } -func NewAlarmStore(lg *zap.Logger, bg BackendGetter) (*AlarmStore, error) { +func NewAlarmStore(lg *zap.Logger, be AlarmBackend) (*AlarmStore, error) { if lg == nil { lg = zap.NewNop() } - ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), bg: bg} + ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), be: be} err := ret.restore() return ret, err } @@ -59,7 +66,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { return m } - schema.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm) + a.be.MustPutAlarm(newAlarm) return newAlarm } @@ -79,7 +86,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { delete(t, id) - schema.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m) + a.be.MustDeleteAlarm(m) return m } @@ -101,20 +108,15 @@ func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) { } func (a *AlarmStore) restore() error { - b := a.bg.Backend() - tx := b.BatchTx() - - tx.Lock() - schema.UnsafeCreateAlarmBucket(tx) - ms, err := schema.UnsafeGetAllAlarms(tx) - tx.Unlock() + a.be.CreateAlarmBucket() + ms, err := a.be.GetAllAlarms() if err != nil { return err } for _, m := range ms { a.addToMap(m) } - b.ForceCommit() + a.be.ForceCommit() return err } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2a595b451..1f7d7dc09 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2361,7 +2361,7 @@ func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore } func (s *EtcdServer) restoreAlarms() error { s.applyV3 = s.newApplierV3() - as, err := v3alarm.NewAlarmStore(s.lg, s) + as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be)) if err != nil { return err } diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 7400dc470..605bb3a0b 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -20,41 +20,65 @@ import ( "go.uber.org/zap" ) -func UnsafeCreateAlarmBucket(tx backend.BatchTx) { +type alarmBackend struct { + lg *zap.Logger + be backend.Backend +} + +func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { + return &alarmBackend{ + lg: lg, + be: be, + } +} + +func (s *alarmBackend) CreateAlarmBucket() { + tx := s.be.BatchTx() + tx.Lock() + defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } -func MustPutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) { + tx := s.be.BatchTx() tx.Lock() defer tx.Unlock() - MustUnsafePutAlarm(lg, tx, alarm) + s.mustUnsafePutAlarm(tx, alarm) } -func MustUnsafePutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { v, err := alarm.Marshal() if err != nil { - lg.Panic("failed to marshal alarm member", zap.Error(err)) + s.lg.Panic("failed to marshal alarm member", zap.Error(err)) } tx.UnsafePut(Alarm, v, nil) } -func MustDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) { + tx := s.be.BatchTx() tx.Lock() defer tx.Unlock() - MustUnsafeDeleteAlarm(lg, tx, alarm) + s.mustUnsafeDeleteAlarm(tx, alarm) } -func MustUnsafeDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { v, err := alarm.Marshal() if err != nil { - lg.Panic("failed to marshal alarm member", zap.Error(err)) + s.lg.Panic("failed to marshal alarm member", zap.Error(err)) } tx.UnsafeDelete(Alarm, v) } -func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) { +func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) { + tx := s.be.ReadTx() + tx.Lock() + defer tx.Unlock() + return s.unsafeGetAllAlarms(tx) +} + +func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) { ms := []*etcdserverpb.AlarmMember{} err := tx.UnsafeForEach(Alarm, func(k, v []byte) error { var m etcdserverpb.AlarmMember @@ -66,3 +90,7 @@ func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) }) return ms, err } + +func (s alarmBackend) ForceCommit() { + s.be.ForceCommit() +}