Merge pull request #5927 from xiang90/pacing

*: deny proposals when there is a huge gap between apply/commit
This commit is contained in:
Xiang Li 2016-07-14 11:47:53 -07:00 committed by GitHub
commit b0f2e5e64a
5 changed files with 39 additions and 2 deletions

View File

@ -36,7 +36,8 @@ var (
ErrGRPCMemberBadURLs = grpc.Errorf(codes.InvalidArgument, "etcdserver: given member URLs are invalid")
ErrGRPCMemberNotFound = grpc.Errorf(codes.NotFound, "etcdserver: member not found")
ErrGRPCRequestTooLarge = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large")
ErrGRPCRequestTooLarge = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large")
ErrGRPCRequestTooManyRequests = grpc.Errorf(codes.ResourceExhausted, "etcdserver: too many requests")
ErrGRPCRootUserNotExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not exist")
ErrGRPCRootRoleNotExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not have root role")
@ -69,7 +70,8 @@ var (
grpc.ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
grpc.ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
grpc.ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
grpc.ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
grpc.ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
grpc.ErrorDesc(ErrGRPCRootUserNotExist): ErrGRPCRootUserNotExist,
grpc.ErrorDesc(ErrGRPCRootRoleNotExist): ErrGRPCRootRoleNotExist,
@ -104,6 +106,7 @@ var (
ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
ErrRootUserNotExist = Error(ErrGRPCRootUserNotExist)
ErrRootRoleNotExist = Error(ErrGRPCRootRoleNotExist)

View File

@ -37,6 +37,8 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCRequestTooLarge
case etcdserver.ErrNoSpace:
return rpctypes.ErrGRPCNoSpace
case etcdserver.ErrTooManyRequests:
return rpctypes.ErrTooManyRequests
case auth.ErrRootUserNotExist:
return rpctypes.ErrGRPCRootUserNotExist

View File

@ -31,6 +31,7 @@ var (
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
)
type DiscoveryError struct {

View File

@ -156,6 +156,7 @@ type EtcdServer struct {
// inflightSnapshots holds count the number of snapshots currently inflight.
inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
// consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
@ -574,6 +575,16 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
s.setCommittedIndex(ci)
}
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
@ -1342,3 +1353,11 @@ func (s *EtcdServer) getAppliedIndex() uint64 {
func (s *EtcdServer) setAppliedIndex(v uint64) {
atomic.StoreUint64(&s.appliedIndex, v)
}
func (s *EtcdServer) getCommittedIndex() uint64 {
return atomic.LoadUint64(&s.committedIndex)
}
func (s *EtcdServer) setCommittedIndex(v uint64) {
atomic.StoreUint64(&s.committedIndex, v)
}

View File

@ -36,6 +36,12 @@ const (
// max timeout for waiting a v3 request to go through raft.
maxV3RequestTimeout = 5 * time.Second
// In the health case, there might be a small gap (10s of entries) between
// the applied index and commited index.
// However, if the committed entries are very heavy to apply, the gap might grow.
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 1000
)
type RaftKV interface {
@ -506,6 +512,12 @@ func (s *EtcdServer) usernameFromCtx(ctx context.Context) (string, error) {
}
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, ErrTooManyRequests
}
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}