From 27b03f0ed5db0f22f218accf9d4e911d44d08de0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 12 Jul 2016 13:16:35 -0700 Subject: [PATCH] *: deny proposals when there is a huge gap between apply/commit --- etcdserver/api/v3rpc/rpctypes/error.go | 7 +++++-- etcdserver/api/v3rpc/util.go | 2 ++ etcdserver/errors.go | 1 + etcdserver/server.go | 19 +++++++++++++++++++ etcdserver/v3_server.go | 12 ++++++++++++ 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index ec5464cbb..fee5297e6 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -36,7 +36,8 @@ var ( ErrGRPCMemberBadURLs = grpc.Errorf(codes.InvalidArgument, "etcdserver: given member URLs are invalid") ErrGRPCMemberNotFound = grpc.Errorf(codes.NotFound, "etcdserver: member not found") - ErrGRPCRequestTooLarge = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large") + ErrGRPCRequestTooLarge = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large") + ErrGRPCRequestTooManyRequests = grpc.Errorf(codes.ResourceExhausted, "etcdserver: too many requests") ErrGRPCRootUserNotExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not exist") ErrGRPCRootRoleNotExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not have root role") @@ -69,7 +70,8 @@ var ( grpc.ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs, grpc.ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound, - grpc.ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge, + grpc.ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge, + grpc.ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests, grpc.ErrorDesc(ErrGRPCRootUserNotExist): ErrGRPCRootUserNotExist, grpc.ErrorDesc(ErrGRPCRootRoleNotExist): ErrGRPCRootRoleNotExist, @@ -104,6 +106,7 @@ var ( ErrMemberNotFound = Error(ErrGRPCMemberNotFound) ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge) + ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests) ErrRootUserNotExist = Error(ErrGRPCRootUserNotExist) ErrRootRoleNotExist = Error(ErrGRPCRootRoleNotExist) diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 8b44c11e3..a419915a7 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -37,6 +37,8 @@ func togRPCError(err error) error { return rpctypes.ErrGRPCRequestTooLarge case etcdserver.ErrNoSpace: return rpctypes.ErrGRPCNoSpace + case etcdserver.ErrTooManyRequests: + return rpctypes.ErrTooManyRequests case auth.ErrRootUserNotExist: return rpctypes.ErrGRPCRootUserNotExist diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 1b9789fc8..9c74b7195 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -31,6 +31,7 @@ var ( ErrRequestTooLarge = errors.New("etcdserver: request is too large") ErrNoSpace = errors.New("etcdserver: no space") ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token") + ErrTooManyRequests = errors.New("etcdserver: too many requests") ) type DiscoveryError struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index a999f660e..ec51ca218 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -156,6 +156,7 @@ type EtcdServer struct { // inflightSnapshots holds count the number of snapshots currently inflight. inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. + committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. // consistIndex used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. @@ -574,6 +575,16 @@ func (s *EtcdServer) run() { for { select { case ap := <-s.r.apply(): + var ci uint64 + if len(ap.entries) != 0 { + ci = ap.entries[len(ap.entries)-1].Index + } + if ap.snapshot.Metadata.Index > ci { + ci = ap.snapshot.Metadata.Index + } + if ci != 0 { + s.setCommittedIndex(ci) + } f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: @@ -1342,3 +1353,11 @@ func (s *EtcdServer) getAppliedIndex() uint64 { func (s *EtcdServer) setAppliedIndex(v uint64) { atomic.StoreUint64(&s.appliedIndex, v) } + +func (s *EtcdServer) getCommittedIndex() uint64 { + return atomic.LoadUint64(&s.committedIndex) +} + +func (s *EtcdServer) setCommittedIndex(v uint64) { + atomic.StoreUint64(&s.committedIndex, v) +} diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index dcd0f0d8a..ce0e58426 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -36,6 +36,12 @@ const ( // max timeout for waiting a v3 request to go through raft. maxV3RequestTimeout = 5 * time.Second + + // In the health case, there might be a small gap (10s of entries) between + // the applied index and commited index. + // However, if the committed entries are very heavy to apply, the gap might grow. + // We should stop accepting new proposals if the gap growing to a certain point. + maxGapBetweenApplyAndCommitIndex = 1000 ) type RaftKV interface { @@ -506,6 +512,12 @@ func (s *EtcdServer) usernameFromCtx(ctx context.Context) (string, error) { } func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { + ai := s.getAppliedIndex() + ci := s.getCommittedIndex() + if ci > ai+maxGapBetweenApplyAndCommitIndex { + return nil, ErrTooManyRequests + } + r.Header = &pb.RequestHeader{ ID: s.reqIDGen.Next(), }