etcdserver: Create AlarmBackend interface

This commit is contained in:
Marek Siarkowicz 2021-07-05 16:40:17 +02:00
parent a97e48e08d
commit 2f31cc3fbc
3 changed files with 55 additions and 25 deletions

View File

@ -21,7 +21,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -30,6 +29,14 @@ type BackendGetter interface {
Backend() backend.Backend Backend() backend.Backend
} }
type AlarmBackend interface {
CreateAlarmBucket()
MustPutAlarm(member *pb.AlarmMember)
MustDeleteAlarm(alarm *pb.AlarmMember)
GetAllAlarms() ([]*pb.AlarmMember, error)
ForceCommit()
}
type alarmSet map[types.ID]*pb.AlarmMember type alarmSet map[types.ID]*pb.AlarmMember
// AlarmStore persists alarms to the backend. // AlarmStore persists alarms to the backend.
@ -38,14 +45,14 @@ type AlarmStore struct {
mu sync.Mutex mu sync.Mutex
types map[pb.AlarmType]alarmSet types map[pb.AlarmType]alarmSet
bg BackendGetter be AlarmBackend
} }
func NewAlarmStore(lg *zap.Logger, bg BackendGetter) (*AlarmStore, error) { func NewAlarmStore(lg *zap.Logger, be AlarmBackend) (*AlarmStore, error) {
if lg == nil { if lg == nil {
lg = zap.NewNop() lg = zap.NewNop()
} }
ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), bg: bg} ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), be: be}
err := ret.restore() err := ret.restore()
return ret, err return ret, err
} }
@ -59,7 +66,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
return m return m
} }
schema.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm) a.be.MustPutAlarm(newAlarm)
return newAlarm return newAlarm
} }
@ -79,7 +86,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
delete(t, id) delete(t, id)
schema.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m) a.be.MustDeleteAlarm(m)
return m return m
} }
@ -101,20 +108,15 @@ func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
} }
func (a *AlarmStore) restore() error { func (a *AlarmStore) restore() error {
b := a.bg.Backend() a.be.CreateAlarmBucket()
tx := b.BatchTx() ms, err := a.be.GetAllAlarms()
tx.Lock()
schema.UnsafeCreateAlarmBucket(tx)
ms, err := schema.UnsafeGetAllAlarms(tx)
tx.Unlock()
if err != nil { if err != nil {
return err return err
} }
for _, m := range ms { for _, m := range ms {
a.addToMap(m) a.addToMap(m)
} }
b.ForceCommit() a.be.ForceCommit()
return err return err
} }

View File

@ -2361,7 +2361,7 @@ func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
func (s *EtcdServer) restoreAlarms() error { func (s *EtcdServer) restoreAlarms() error {
s.applyV3 = s.newApplierV3() s.applyV3 = s.newApplierV3()
as, err := v3alarm.NewAlarmStore(s.lg, s) as, err := v3alarm.NewAlarmStore(s.lg, schema.NewAlarmBackend(s.lg, s.be))
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,41 +20,65 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func UnsafeCreateAlarmBucket(tx backend.BatchTx) { type alarmBackend struct {
lg *zap.Logger
be backend.Backend
}
func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {
return &alarmBackend{
lg: lg,
be: be,
}
}
func (s *alarmBackend) CreateAlarmBucket() {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Alarm) tx.UnsafeCreateBucket(Alarm)
} }
func MustPutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
MustUnsafePutAlarm(lg, tx, alarm) s.mustUnsafePutAlarm(tx, alarm)
} }
func MustUnsafePutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal() v, err := alarm.Marshal()
if err != nil { if err != nil {
lg.Panic("failed to marshal alarm member", zap.Error(err)) s.lg.Panic("failed to marshal alarm member", zap.Error(err))
} }
tx.UnsafePut(Alarm, v, nil) tx.UnsafePut(Alarm, v, nil)
} }
func MustDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
MustUnsafeDeleteAlarm(lg, tx, alarm) s.mustUnsafeDeleteAlarm(tx, alarm)
} }
func MustUnsafeDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal() v, err := alarm.Marshal()
if err != nil { if err != nil {
lg.Panic("failed to marshal alarm member", zap.Error(err)) s.lg.Panic("failed to marshal alarm member", zap.Error(err))
} }
tx.UnsafeDelete(Alarm, v) tx.UnsafeDelete(Alarm, v)
} }
func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) { func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) {
tx := s.be.ReadTx()
tx.Lock()
defer tx.Unlock()
return s.unsafeGetAllAlarms(tx)
}
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
ms := []*etcdserverpb.AlarmMember{} ms := []*etcdserverpb.AlarmMember{}
err := tx.UnsafeForEach(Alarm, func(k, v []byte) error { err := tx.UnsafeForEach(Alarm, func(k, v []byte) error {
var m etcdserverpb.AlarmMember var m etcdserverpb.AlarmMember
@ -66,3 +90,7 @@ func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error)
}) })
return ms, err return ms, err
} }
func (s alarmBackend) ForceCommit() {
s.be.ForceCommit()
}