diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 52a09ee0d..7515363ab 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -30,9 +30,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { } grpcServer := grpc.NewServer(opts...) - pb.RegisterKVServer(grpcServer, NewKVServer(s)) + pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) pb.RegisterWatchServer(grpcServer, NewWatchServer(s)) - pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s)) + pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index e802744d9..5c6c07bf2 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -297,6 +297,8 @@ func togRPCError(err error) error { // TODO: handle error from raft and timeout case etcdserver.ErrRequestTooLarge: return rpctypes.ErrRequestTooLarge + case etcdserver.ErrNoSpace: + return rpctypes.ErrNoSpace default: return grpc.Errorf(codes.Internal, err.Error()) } diff --git a/etcdserver/api/v3rpc/quota.go b/etcdserver/api/v3rpc/quota.go new file mode 100644 index 000000000..1892d1bde --- /dev/null +++ b/etcdserver/api/v3rpc/quota.go @@ -0,0 +1,61 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" +) + +type quotaKVServer struct { + pb.KVServer + q etcdserver.Quota +} + +func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer { + return "aKVServer{NewKVServer(s), etcdserver.NewBackendQuota(s)} +} + +func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + if !s.q.Available(r) { + return nil, rpctypes.ErrNoSpace + } + return s.KVServer.Put(ctx, r) +} + +func (s *quotaKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + if !s.q.Available(r) { + return nil, rpctypes.ErrNoSpace + } + return s.KVServer.Txn(ctx, r) +} + +type quotaLeaseServer struct { + pb.LeaseServer + q etcdserver.Quota +} + +func (s *quotaLeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + if !s.q.Available(cr) { + return nil, rpctypes.ErrNoSpace + } + return s.LeaseServer.LeaseCreate(ctx, cr) +} + +func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { + return "aLeaseServer{NewLeaseServer(s), etcdserver.NewBackendQuota(s)} +} diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 00ab1b07d..fb5bb22a9 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -25,6 +25,7 @@ var ( ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "etcdserver: duplicate key given in txn request") ErrCompacted = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision has been compacted") ErrFutureRev = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision is a future revision") + ErrNoSpace = grpc.Errorf(codes.ResourceExhausted, "etcdserver: storage: database space exceeded") ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found") ErrLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists") diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 4bdf13218..cb2172a8e 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -394,6 +394,42 @@ func (a *applierV3backend) UserAdd(r *pb.UserAddRequest) (*pb.UserAddResponse, e return a.s.AuthStore().UserAdd(r) } +type quotaApplierV3 struct { + applierV3 + q Quota +} + +func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { + return "aApplierV3{app, NewBackendQuota(s)} +} + +func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { + ok := a.q.Available(p) + resp, err := a.applierV3.Put(txnID, p) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + +func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { + ok := a.q.Available(rt) + resp, err := a.applierV3.Txn(rt) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + +func (a *quotaApplierV3) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + ok := a.q.Available(lc) + resp, err := a.applierV3.LeaseCreate(lc) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + type kvSort struct{ kvs []storagepb.KeyValue } func (s *kvSort) Swap(i, j int) { diff --git a/etcdserver/errors.go b/etcdserver/errors.go index eef57ccf3..07657f011 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -35,6 +35,7 @@ var ( ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrRequestTooLarge = errors.New("etcdserver: request is too large") + ErrNoSpace = errors.New("etcdserver: no space") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/quota.go b/etcdserver/quota.go new file mode 100644 index 000000000..4e20e851b --- /dev/null +++ b/etcdserver/quota.go @@ -0,0 +1,95 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage/backend" +) + +// Quota represents an arbitrary quota against arbitrary requests. Each request +// costs some charge; if there is not enough remaining charge, then there are +// too few resources available within the quota to apply the request. +type Quota interface { + // Available judges whether the given request fits within the quota. + Available(req interface{}) bool + // Cost computes the charge against the quota for a given request. + Cost(req interface{}) int + // Remaining is the amount of charge left for the quota. + Remaining() int64 +} + +type backendQuota struct { + s *EtcdServer + maxBackendBytes int64 +} + +const ( + // leaseOverhead is an estimate for the cost of storing a lease + leaseOverhead = 64 + // kvOverhead is an estimate for the cost of storing a key's metadata + kvOverhead = 256 +) + +func NewBackendQuota(s *EtcdServer) Quota { + return &backendQuota{s, backend.InitialMmapSize} +} + +func (b *backendQuota) Available(v interface{}) bool { + // TODO: maybe optimize backend.Size() + return b.s.Backend().Size()+int64(b.Cost(v)) < b.maxBackendBytes +} + +func (b *backendQuota) Cost(v interface{}) int { + switch r := v.(type) { + case *pb.PutRequest: + return costPut(r) + case *pb.TxnRequest: + return costTxn(r) + case *pb.LeaseCreateRequest: + return leaseOverhead + default: + panic("unexpected cost") + } +} + +func costPut(r *pb.PutRequest) int { return kvOverhead + len(r.Key) + len(r.Value) } + +func costTxnReq(u *pb.RequestUnion) int { + r := u.GetRequestPut() + if r == nil { + return 0 + } + return costPut(r) +} + +func costTxn(r *pb.TxnRequest) int { + sizeSuccess := 0 + for _, u := range r.Success { + sizeSuccess += costTxnReq(u) + } + sizeFailure := 0 + for _, u := range r.Failure { + sizeFailure += costTxnReq(u) + } + if sizeFailure > sizeSuccess { + return sizeFailure + } + return sizeSuccess +} + +func (b *backendQuota) Remaining() int64 { + return b.maxBackendBytes - b.s.Backend().Size() +} diff --git a/etcdserver/server.go b/etcdserver/server.go index d342b0f34..b18d48a1c 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -374,7 +374,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) srv.compactor.Run() } - srv.applyV3 = &applierV3backend{srv} + srv.applyV3 = newQuotaApplierV3(srv, &applierV3backend{srv}) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -1007,13 +1007,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var r pb.Request pbutil.MustUnmarshal(&r, e.Data) s.w.Trigger(r.ID, s.applyRequest(r)) + } else if raftReq.V2 != nil { + req := raftReq.V2 + s.w.Trigger(req.ID, s.applyRequest(*req)) } else { - switch { - case raftReq.V2 != nil: - req := raftReq.V2 - s.w.Trigger(req.ID, s.applyRequest(*req)) - default: - s.w.Trigger(raftReq.ID, s.applyV3Request(&raftReq)) + ar := s.applyV3Request(&raftReq) + s.w.Trigger(raftReq.ID, ar) + if ar.err == ErrNoSpace { + plog.Errorf("applying raft message exceeded backend quota") + // TODO: send alarm + s.errorc <- ar.err + return applied, true } } case raftpb.EntryConfChange: diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 3cc2cbf5c..7dce2edce 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/storage/backend" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -455,6 +456,94 @@ func TestV3Hash(t *testing.T) { } } +// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer +func TestV3StorageQuotaAPI(t *testing.T) { + oldSize := backend.InitialMmapSize + defer func() { + backend.InitialMmapSize = oldSize + testutil.AfterTest(t) + }() + + backend.InitialMmapSize = 64 * 1024 + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + kvc := toGRPC(clus.RandClient()).KV + + key := []byte("abc") + + // test small put that fits in quota + smallbuf := make([]byte, 512) + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { + t.Fatal(err) + } + + // test big put + bigbuf := make([]byte, 64*1024) + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) + if err == nil || err != rpctypes.ErrNoSpace { + t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrNoSpace) + } + + // test big txn + puttxn := &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: key, + Value: bigbuf, + }, + }, + } + txnreq := &pb.TxnRequest{} + txnreq.Success = append(txnreq.Success, puttxn) + _, txnerr := kvc.Txn(context.TODO(), txnreq) + if txnerr == nil || err != rpctypes.ErrNoSpace { + t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrNoSpace) + } +} + +// TestV3StorageQuotaApply tests the V3 server respects quotas during apply +func TestV3StorageQuotaApply(t *testing.T) { + oldSize := backend.InitialMmapSize + defer func() { + backend.InitialMmapSize = oldSize + testutil.AfterTest(t) + }() + + clus := NewClusterV3(t, &ClusterConfig{Size: 2}) + defer clus.Terminate(t) + kvc0 := toGRPC(clus.Client(0)).KV + kvc1 := toGRPC(clus.Client(1)).KV + + // force a node to have a different quota + backend.InitialMmapSize = 64 * 1024 + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + + key := []byte("abc") + + // test small put still works + smallbuf := make([]byte, 1024) + _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) + if serr != nil { + t.Fatal(serr) + } + + // test big put + bigbuf := make([]byte, 64*1024) + _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) + if err != nil { + t.Fatal(err) + } + + // small quota machine should reject put + // first, synchronize with the cluster via quorum get + kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("past-quota instance should reject put") + } +} + func TestV3RangeRequest(t *testing.T) { defer testutil.AfterTest(t) tests := []struct {