mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: cap new keys on space alarm
This commit is contained in:
parent
9e7f47c490
commit
a403a94d7b
@ -394,22 +394,57 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
|
||||
switch ar.Action {
|
||||
case pb.AlarmRequest_GET:
|
||||
resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
|
||||
return resp, nil
|
||||
case pb.AlarmRequest_ACTIVATE:
|
||||
m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
|
||||
if m != nil {
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
if m == nil {
|
||||
break
|
||||
}
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
switch m.Alarm {
|
||||
case pb.AlarmType_NOSPACE:
|
||||
if len(a.s.alarmStore.Get(m.Alarm)) == 1 {
|
||||
a.s.applyV3 = newApplierV3Capped(a)
|
||||
}
|
||||
default:
|
||||
plog.Warningf("unimplemented alarm activation (%+v)", m)
|
||||
}
|
||||
return resp, nil
|
||||
case pb.AlarmRequest_DEACTIVATE:
|
||||
m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
|
||||
if m != nil {
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
if m == nil {
|
||||
break
|
||||
}
|
||||
resp.Alarms = append(resp.Alarms, m)
|
||||
if m.Alarm == pb.AlarmType_NOSPACE && len(a.s.alarmStore.Get(ar.Alarm)) == 0 {
|
||||
a.s.applyV3 = newQuotaApplierV3(a.s, &applierV3backend{a.s})
|
||||
}
|
||||
return resp, nil
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type applierV3Capped struct {
|
||||
applierV3
|
||||
q backendQuota
|
||||
}
|
||||
|
||||
// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
|
||||
// with Puts so that the number of keys in the store is capped.
|
||||
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
||||
|
||||
func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
|
||||
return nil, ErrNoSpace
|
||||
}
|
||||
|
||||
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||
if a.q.Cost(r) > 0 {
|
||||
return nil, ErrNoSpace
|
||||
}
|
||||
return a.applierV3.Txn(r)
|
||||
}
|
||||
|
||||
func (a *applierV3Capped) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
|
||||
return nil, ErrNoSpace
|
||||
}
|
||||
|
||||
func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
|
||||
|
@ -377,12 +377,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
srv.compactor.Run()
|
||||
}
|
||||
|
||||
as, aserr := alarm.NewAlarmStore(srv)
|
||||
if aserr != nil {
|
||||
return nil, aserr
|
||||
if err := srv.restoreAlarms(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srv.alarmStore = as
|
||||
srv.applyV3 = newQuotaApplierV3(srv, &applierV3backend{srv})
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := &rafthttp.Transport{
|
||||
@ -623,6 +620,10 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
}
|
||||
|
||||
if err := s.restoreAlarms(); err != nil {
|
||||
plog.Panicf("restore alarms error: %v", err)
|
||||
}
|
||||
|
||||
if s.authStore != nil {
|
||||
s.authStore.Recover(newbe)
|
||||
}
|
||||
@ -1020,13 +1021,21 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
||||
s.w.Trigger(req.ID, s.applyRequest(*req))
|
||||
} else {
|
||||
ar := s.applyV3Request(&raftReq)
|
||||
s.w.Trigger(raftReq.ID, ar)
|
||||
if ar.err == ErrNoSpace {
|
||||
plog.Errorf("applying raft message exceeded backend quota")
|
||||
// TODO: send alarm
|
||||
s.errorc <- ar.err
|
||||
return applied, true
|
||||
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|
||||
s.w.Trigger(raftReq.ID, ar)
|
||||
break
|
||||
}
|
||||
plog.Errorf("applying raft message exceeded backend quota")
|
||||
go func() {
|
||||
a := &pb.AlarmRequest{
|
||||
MemberID: int64(s.ID()),
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
}
|
||||
r := pb.InternalRaftRequest{Alarm: a}
|
||||
s.processInternalRaftRequest(context.TODO(), r)
|
||||
s.w.Trigger(raftReq.ID, ar)
|
||||
}()
|
||||
}
|
||||
case raftpb.EntryConfChange:
|
||||
var cc raftpb.ConfChange
|
||||
@ -1333,3 +1342,17 @@ func (s *EtcdServer) Backend() backend.Backend {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
|
||||
|
||||
func (s *EtcdServer) restoreAlarms() error {
|
||||
s.applyV3 = newQuotaApplierV3(s, &applierV3backend{s})
|
||||
|
||||
as, err := alarm.NewAlarmStore(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.alarmStore = as
|
||||
if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
|
||||
s.applyV3 = newApplierV3Capped(s.applyV3)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -757,6 +757,8 @@ type grpcAPI struct {
|
||||
Lease pb.LeaseClient
|
||||
// Watch is the watch API for the client's connection.
|
||||
Watch pb.WatchClient
|
||||
// Maintenance is the maintenance API for the client's connection.
|
||||
Maintenance pb.MaintenanceClient
|
||||
}
|
||||
|
||||
func toGRPC(c *clientv3.Client) grpcAPI {
|
||||
@ -765,5 +767,6 @@ func toGRPC(c *clientv3.Client) grpcAPI {
|
||||
pb.NewKVClient(c.ActiveConnection()),
|
||||
pb.NewLeaseClient(c.ActiveConnection()),
|
||||
pb.NewWatchClient(c.ActiveConnection()),
|
||||
pb.NewMaintenanceClient(c.ActiveConnection()),
|
||||
}
|
||||
}
|
||||
|
@ -542,6 +542,54 @@ func TestV3StorageQuotaApply(t *testing.T) {
|
||||
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
|
||||
t.Fatalf("past-quota instance should reject put")
|
||||
}
|
||||
|
||||
// large quota machine should reject put
|
||||
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
|
||||
t.Fatalf("past-quota instance should reject put")
|
||||
}
|
||||
|
||||
// reset large quota node to ensure alarm persisted
|
||||
backend.InitialMmapSize = oldSize
|
||||
clus.Members[1].Stop(t)
|
||||
clus.Members[1].Restart(t)
|
||||
clus.waitLeader(t, clus.Members)
|
||||
|
||||
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
|
||||
t.Fatalf("alarmed instance should reject put after reset")
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
|
||||
func TestV3AlarmDeactivate(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
kvc := toGRPC(clus.RandClient()).KV
|
||||
mt := toGRPC(clus.RandClient()).Maintenance
|
||||
|
||||
alarmReq := &pb.AlarmRequest{
|
||||
MemberID: 123,
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
}
|
||||
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
key := []byte("abc")
|
||||
smallbuf := make([]byte, 512)
|
||||
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
|
||||
if err == nil && err != rpctypes.ErrNoSpace {
|
||||
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrNoSpace)
|
||||
}
|
||||
|
||||
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
|
||||
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3RangeRequest(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user