Merge pull request #9924 from jpbetz/persist-lease-deadline

lease: Persist remainingTTL to prevent indefinite auto-renewal of long lived leases
This commit is contained in:
Joe Betz 2018-07-24 09:39:57 -07:00 committed by GitHub
commit 750b87d622
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1435 additions and 526 deletions

View File

@ -476,6 +476,31 @@ Empty field.
##### message `LeaseCheckpoint` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| ID | ID is the lease ID to checkpoint. | int64 |
| remaining_TTL | Remaining_TTL is the remaining time until expiry of the lease. | int64 |
##### message `LeaseCheckpointRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| checkpoints | | (slice of) LeaseCheckpoint |
##### message `LeaseCheckpointResponse` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |
##### message `LeaseGrantRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
@ -903,6 +928,7 @@ Empty field.
| ----- | ----------- | ---- |
| ID | | int64 |
| TTL | | int64 |
| RemainingTTL | | int64 |

View File

@ -373,7 +373,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
txn := mvs.Write()

View File

@ -58,6 +58,8 @@ type applierV3 interface {
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
@ -130,6 +132,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.Alarm != nil:
ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
case r.Authenticate != nil:
@ -582,6 +586,16 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
}
func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
if err != nil {
return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
}
}
return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
}
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
resp := &pb.AlarmResponse{}
oldCount := len(a.s.alarmStore.Get(ar.Alarm))

View File

@ -140,6 +140,9 @@ type ServerConfig struct {
Debug bool
ForceNewCluster bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
}
// VerifyBootstrap sanity-checks the initial config for bootstrap case

View File

@ -45,6 +45,9 @@
LeaseGrantResponse
LeaseRevokeRequest
LeaseRevokeResponse
LeaseCheckpoint
LeaseCheckpointRequest
LeaseCheckpointResponse
LeaseKeepAliveRequest
LeaseKeepAliveResponse
LeaseTimeToLiveRequest

View File

@ -47,6 +47,7 @@ type InternalRaftRequest struct {
LeaseGrant *LeaseGrantRequest `protobuf:"bytes,8,opt,name=lease_grant,json=leaseGrant" json:"lease_grant,omitempty"`
LeaseRevoke *LeaseRevokeRequest `protobuf:"bytes,9,opt,name=lease_revoke,json=leaseRevoke" json:"lease_revoke,omitempty"`
Alarm *AlarmRequest `protobuf:"bytes,10,opt,name=alarm" json:"alarm,omitempty"`
LeaseCheckpoint *LeaseCheckpointRequest `protobuf:"bytes,11,opt,name=lease_checkpoint,json=leaseCheckpoint" json:"lease_checkpoint,omitempty"`
AuthEnable *AuthEnableRequest `protobuf:"bytes,1000,opt,name=auth_enable,json=authEnable" json:"auth_enable,omitempty"`
AuthDisable *AuthDisableRequest `protobuf:"bytes,1011,opt,name=auth_disable,json=authDisable" json:"auth_disable,omitempty"`
Authenticate *InternalAuthenticateRequest `protobuf:"bytes,1012,opt,name=authenticate" json:"authenticate,omitempty"`
@ -245,17 +246,27 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
}
i += n9
}
if m.LeaseCheckpoint != nil {
dAtA[i] = 0x5a
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.LeaseCheckpoint.Size()))
n10, err := m.LeaseCheckpoint.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n10
}
if m.Header != nil {
dAtA[i] = 0xa2
i++
dAtA[i] = 0x6
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.Header.Size()))
n10, err := m.Header.MarshalTo(dAtA[i:])
n11, err := m.Header.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n10
i += n11
}
if m.AuthEnable != nil {
dAtA[i] = 0xc2
@ -263,11 +274,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x3e
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthEnable.Size()))
n11, err := m.AuthEnable.MarshalTo(dAtA[i:])
n12, err := m.AuthEnable.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n11
i += n12
}
if m.AuthDisable != nil {
dAtA[i] = 0x9a
@ -275,11 +286,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x3f
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthDisable.Size()))
n12, err := m.AuthDisable.MarshalTo(dAtA[i:])
n13, err := m.AuthDisable.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n12
i += n13
}
if m.Authenticate != nil {
dAtA[i] = 0xa2
@ -287,11 +298,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x3f
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.Authenticate.Size()))
n13, err := m.Authenticate.MarshalTo(dAtA[i:])
n14, err := m.Authenticate.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n13
i += n14
}
if m.AuthUserAdd != nil {
dAtA[i] = 0xe2
@ -299,11 +310,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x44
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserAdd.Size()))
n14, err := m.AuthUserAdd.MarshalTo(dAtA[i:])
n15, err := m.AuthUserAdd.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n14
i += n15
}
if m.AuthUserDelete != nil {
dAtA[i] = 0xea
@ -311,11 +322,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x44
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserDelete.Size()))
n15, err := m.AuthUserDelete.MarshalTo(dAtA[i:])
n16, err := m.AuthUserDelete.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n15
i += n16
}
if m.AuthUserGet != nil {
dAtA[i] = 0xf2
@ -323,11 +334,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x44
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserGet.Size()))
n16, err := m.AuthUserGet.MarshalTo(dAtA[i:])
n17, err := m.AuthUserGet.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n16
i += n17
}
if m.AuthUserChangePassword != nil {
dAtA[i] = 0xfa
@ -335,11 +346,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x44
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserChangePassword.Size()))
n17, err := m.AuthUserChangePassword.MarshalTo(dAtA[i:])
n18, err := m.AuthUserChangePassword.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n17
i += n18
}
if m.AuthUserGrantRole != nil {
dAtA[i] = 0x82
@ -347,11 +358,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x45
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserGrantRole.Size()))
n18, err := m.AuthUserGrantRole.MarshalTo(dAtA[i:])
n19, err := m.AuthUserGrantRole.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n18
i += n19
}
if m.AuthUserRevokeRole != nil {
dAtA[i] = 0x8a
@ -359,11 +370,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x45
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserRevokeRole.Size()))
n19, err := m.AuthUserRevokeRole.MarshalTo(dAtA[i:])
n20, err := m.AuthUserRevokeRole.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n19
i += n20
}
if m.AuthUserList != nil {
dAtA[i] = 0x92
@ -371,11 +382,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x45
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthUserList.Size()))
n20, err := m.AuthUserList.MarshalTo(dAtA[i:])
n21, err := m.AuthUserList.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n20
i += n21
}
if m.AuthRoleList != nil {
dAtA[i] = 0x9a
@ -383,11 +394,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x45
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleList.Size()))
n21, err := m.AuthRoleList.MarshalTo(dAtA[i:])
n22, err := m.AuthRoleList.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n21
i += n22
}
if m.AuthRoleAdd != nil {
dAtA[i] = 0x82
@ -395,11 +406,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4b
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleAdd.Size()))
n22, err := m.AuthRoleAdd.MarshalTo(dAtA[i:])
n23, err := m.AuthRoleAdd.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n22
i += n23
}
if m.AuthRoleDelete != nil {
dAtA[i] = 0x8a
@ -407,11 +418,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4b
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleDelete.Size()))
n23, err := m.AuthRoleDelete.MarshalTo(dAtA[i:])
n24, err := m.AuthRoleDelete.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n23
i += n24
}
if m.AuthRoleGet != nil {
dAtA[i] = 0x92
@ -419,11 +430,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4b
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleGet.Size()))
n24, err := m.AuthRoleGet.MarshalTo(dAtA[i:])
n25, err := m.AuthRoleGet.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n24
i += n25
}
if m.AuthRoleGrantPermission != nil {
dAtA[i] = 0x9a
@ -431,11 +442,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4b
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleGrantPermission.Size()))
n25, err := m.AuthRoleGrantPermission.MarshalTo(dAtA[i:])
n26, err := m.AuthRoleGrantPermission.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n25
i += n26
}
if m.AuthRoleRevokePermission != nil {
dAtA[i] = 0xa2
@ -443,11 +454,11 @@ func (m *InternalRaftRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4b
i++
i = encodeVarintRaftInternal(dAtA, i, uint64(m.AuthRoleRevokePermission.Size()))
n26, err := m.AuthRoleRevokePermission.MarshalTo(dAtA[i:])
n27, err := m.AuthRoleRevokePermission.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n26
i += n27
}
return i, nil
}
@ -573,6 +584,10 @@ func (m *InternalRaftRequest) Size() (n int) {
l = m.Alarm.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.LeaseCheckpoint != nil {
l = m.LeaseCheckpoint.Size()
n += 1 + l + sovRaftInternal(uint64(l))
}
if m.Header != nil {
l = m.Header.Size()
n += 2 + l + sovRaftInternal(uint64(l))
@ -1143,6 +1158,39 @@ func (m *InternalRaftRequest) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 11:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseCheckpoint", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRaftInternal
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseCheckpoint == nil {
m.LeaseCheckpoint = &LeaseCheckpointRequest{}
}
if err := m.LeaseCheckpoint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 100:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
@ -2020,58 +2068,60 @@ var (
func init() { proto.RegisterFile("raft_internal.proto", fileDescriptorRaftInternal) }
var fileDescriptorRaftInternal = []byte{
// 837 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x96, 0xdd, 0x52, 0xd3, 0x40,
0x14, 0xc7, 0x69, 0xf9, 0xec, 0xb6, 0x54, 0x5c, 0x40, 0xd7, 0x32, 0x53, 0xa1, 0x8c, 0x8a, 0x5f,
0xe8, 0x94, 0x07, 0xd0, 0x4a, 0x19, 0x60, 0x86, 0x61, 0x98, 0x0c, 0xce, 0x38, 0xe3, 0x45, 0x5c,
0x9a, 0x43, 0x1b, 0x49, 0x93, 0xb8, 0xd9, 0x56, 0x7c, 0x13, 0x1f, 0xc3, 0xaf, 0x87, 0xe0, 0xc2,
0x0f, 0xd4, 0x17, 0x50, 0xbc, 0xf1, 0xca, 0x1b, 0x7d, 0x00, 0x67, 0x3f, 0x92, 0x34, 0x6d, 0xca,
0x5d, 0x72, 0xce, 0xff, 0xfc, 0xce, 0xd9, 0xec, 0x7f, 0xbb, 0x45, 0xb3, 0x8c, 0x1e, 0x72, 0xd3,
0x76, 0x39, 0x30, 0x97, 0x3a, 0xab, 0x3e, 0xf3, 0xb8, 0x87, 0x0b, 0xc0, 0x1b, 0x56, 0x00, 0xac,
0x0b, 0xcc, 0x3f, 0x28, 0xcd, 0x35, 0xbd, 0xa6, 0x27, 0x13, 0xf7, 0xc4, 0x93, 0xd2, 0x94, 0x66,
0x62, 0x8d, 0x8e, 0xe4, 0x98, 0xdf, 0x50, 0x8f, 0x95, 0x67, 0x68, 0xda, 0x80, 0x17, 0x1d, 0x08,
0xf8, 0x16, 0x50, 0x0b, 0x18, 0x2e, 0xa2, 0xec, 0x76, 0x9d, 0x64, 0x16, 0x33, 0x2b, 0x63, 0x46,
0x76, 0xbb, 0x8e, 0x4b, 0x68, 0xaa, 0x13, 0x88, 0x96, 0x6d, 0x20, 0xd9, 0xc5, 0xcc, 0x4a, 0xce,
0x88, 0xde, 0xf1, 0x32, 0x9a, 0xa6, 0x1d, 0xde, 0x32, 0x19, 0x74, 0xed, 0xc0, 0xf6, 0x5c, 0x32,
0x2a, 0xcb, 0x0a, 0x22, 0x68, 0xe8, 0x58, 0xe5, 0x4f, 0x11, 0xcd, 0x6e, 0xeb, 0xa9, 0x0d, 0x7a,
0xc8, 0x75, 0xbb, 0x81, 0x46, 0xd7, 0x50, 0xb6, 0x5b, 0x95, 0x2d, 0xf2, 0xd5, 0xf9, 0xd5, 0xde,
0x75, 0xad, 0xea, 0x12, 0x23, 0xdb, 0xad, 0xe2, 0xfb, 0x68, 0x9c, 0x51, 0xb7, 0x09, 0xb2, 0x57,
0xbe, 0x5a, 0xea, 0x53, 0x8a, 0x54, 0x28, 0x57, 0x42, 0x7c, 0x0b, 0x8d, 0xfa, 0x1d, 0x4e, 0xc6,
0xa4, 0x9e, 0x24, 0xf5, 0x7b, 0x9d, 0x70, 0x1e, 0x43, 0x88, 0xf0, 0x3a, 0x2a, 0x58, 0xe0, 0x00,
0x07, 0x53, 0x35, 0x19, 0x97, 0x45, 0x8b, 0xc9, 0xa2, 0xba, 0x54, 0x24, 0x5a, 0xe5, 0xad, 0x38,
0x26, 0x1a, 0xf2, 0x63, 0x97, 0x4c, 0xa4, 0x35, 0xdc, 0x3f, 0x76, 0xa3, 0x86, 0xfc, 0xd8, 0xc5,
0x0f, 0x10, 0x6a, 0x78, 0x6d, 0x9f, 0x36, 0xb8, 0xf8, 0x7e, 0x93, 0xb2, 0xe4, 0x6a, 0xb2, 0x64,
0x3d, 0xca, 0x87, 0x95, 0x3d, 0x25, 0xf8, 0x21, 0xca, 0x3b, 0x40, 0x03, 0x30, 0x9b, 0x8c, 0xba,
0x9c, 0x4c, 0xa5, 0x11, 0x76, 0x84, 0x60, 0x53, 0xe4, 0x23, 0x82, 0x13, 0x85, 0xc4, 0x9a, 0x15,
0x81, 0x41, 0xd7, 0x3b, 0x02, 0x92, 0x4b, 0x5b, 0xb3, 0x44, 0x18, 0x52, 0x10, 0xad, 0xd9, 0x89,
0x63, 0x62, 0x5b, 0xa8, 0x43, 0x59, 0x9b, 0xa0, 0xb4, 0x6d, 0xa9, 0x89, 0x54, 0xb4, 0x2d, 0x52,
0x88, 0xd7, 0xd0, 0x44, 0x4b, 0x5a, 0x8e, 0x58, 0xb2, 0x64, 0x21, 0x75, 0xcf, 0x95, 0x2b, 0x0d,
0x2d, 0xc5, 0x35, 0x94, 0x97, 0x8e, 0x03, 0x97, 0x1e, 0x38, 0x40, 0x7e, 0xa7, 0x7e, 0xb0, 0x5a,
0x87, 0xb7, 0x36, 0xa4, 0x20, 0x5a, 0x2e, 0x8d, 0x42, 0xb8, 0x8e, 0xa4, 0x3f, 0x4d, 0xcb, 0x0e,
0x24, 0xe3, 0xef, 0x64, 0xda, 0x7a, 0x05, 0xa3, 0xae, 0x14, 0xd1, 0x7a, 0x69, 0x1c, 0xc3, 0xbb,
0x8a, 0x02, 0x2e, 0xb7, 0x1b, 0x94, 0x03, 0xf9, 0xa7, 0x28, 0x37, 0x93, 0x94, 0xd0, 0xf7, 0xb5,
0x1e, 0x69, 0x88, 0x4b, 0xd4, 0xe3, 0x0d, 0x7d, 0x94, 0xc4, 0xd9, 0x32, 0xa9, 0x65, 0x91, 0x8f,
0x53, 0xc3, 0xc6, 0x7a, 0x1c, 0x00, 0xab, 0x59, 0x56, 0x62, 0x2c, 0x1d, 0xc3, 0xbb, 0x68, 0x26,
0xc6, 0x28, 0x4f, 0x92, 0x4f, 0x8a, 0xb4, 0x9c, 0x4e, 0xd2, 0x66, 0xd6, 0xb0, 0x22, 0x4d, 0x84,
0x93, 0x63, 0x35, 0x81, 0x93, 0xcf, 0xe7, 0x8e, 0xb5, 0x09, 0x7c, 0x60, 0xac, 0x4d, 0xe0, 0xb8,
0x89, 0xae, 0xc4, 0x98, 0x46, 0x4b, 0x9c, 0x12, 0xd3, 0xa7, 0x41, 0xf0, 0xd2, 0x63, 0x16, 0xf9,
0xa2, 0x90, 0xb7, 0xd3, 0x91, 0xeb, 0x52, 0xbd, 0xa7, 0xc5, 0x21, 0xfd, 0x12, 0x4d, 0x4d, 0xe3,
0x27, 0x68, 0xae, 0x67, 0x5e, 0x61, 0x6f, 0x93, 0x79, 0x0e, 0x90, 0x53, 0xd5, 0xe3, 0xfa, 0x90,
0xb1, 0xe5, 0xd1, 0xf0, 0xe2, 0xad, 0xbe, 0x48, 0xfb, 0x33, 0xf8, 0x29, 0x9a, 0x8f, 0xc9, 0xea,
0xa4, 0x28, 0xf4, 0x57, 0x85, 0xbe, 0x91, 0x8e, 0xd6, 0x47, 0xa6, 0x87, 0x8d, 0xe9, 0x40, 0x0a,
0x6f, 0xa1, 0x62, 0x0c, 0x77, 0xec, 0x80, 0x93, 0x6f, 0x8a, 0xba, 0x94, 0x4e, 0xdd, 0xb1, 0x03,
0x9e, 0xf0, 0x51, 0x18, 0x8c, 0x48, 0x62, 0x34, 0x45, 0xfa, 0x3e, 0x94, 0x24, 0x5a, 0x0f, 0x90,
0xc2, 0x60, 0xb4, 0xf5, 0x92, 0x24, 0x1c, 0xf9, 0x26, 0x37, 0x6c, 0xeb, 0x45, 0x4d, 0xbf, 0x23,
0x75, 0x2c, 0x72, 0xa4, 0xc4, 0x68, 0x47, 0xbe, 0xcd, 0x0d, 0x73, 0xa4, 0xa8, 0x4a, 0x71, 0x64,
0x1c, 0x4e, 0x8e, 0x25, 0x1c, 0xf9, 0xee, 0xdc, 0xb1, 0xfa, 0x1d, 0xa9, 0x63, 0xf8, 0x39, 0x2a,
0xf5, 0x60, 0xa4, 0x51, 0x7c, 0x60, 0x6d, 0x3b, 0x90, 0xf7, 0xd8, 0x7b, 0xc5, 0xbc, 0x33, 0x84,
0x29, 0xe4, 0x7b, 0x91, 0x3a, 0xe4, 0x5f, 0xa6, 0xe9, 0x79, 0xdc, 0x46, 0x0b, 0x71, 0x2f, 0x6d,
0x9d, 0x9e, 0x66, 0x1f, 0x54, 0xb3, 0xbb, 0xe9, 0xcd, 0x94, 0x4b, 0x06, 0xbb, 0x11, 0x3a, 0x44,
0x50, 0xb9, 0x80, 0xa6, 0x37, 0xda, 0x3e, 0x7f, 0x65, 0x40, 0xe0, 0x7b, 0x6e, 0x00, 0x15, 0x1f,
0x2d, 0x9c, 0xf3, 0x43, 0x84, 0x31, 0x1a, 0x93, 0xb7, 0x7b, 0x46, 0xde, 0xee, 0xf2, 0x59, 0xdc,
0xfa, 0xd1, 0xf9, 0xd4, 0xb7, 0x7e, 0xf8, 0x8e, 0x97, 0x50, 0x21, 0xb0, 0xdb, 0xbe, 0x03, 0x26,
0xf7, 0x8e, 0x40, 0x5d, 0xfa, 0x39, 0x23, 0xaf, 0x62, 0xfb, 0x22, 0xf4, 0x68, 0xee, 0xe4, 0x67,
0x79, 0xe4, 0xe4, 0xac, 0x9c, 0x39, 0x3d, 0x2b, 0x67, 0x7e, 0x9c, 0x95, 0x33, 0xaf, 0x7f, 0x95,
0x47, 0x0e, 0x26, 0xe4, 0x5f, 0x8e, 0xb5, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, 0xff, 0xc9, 0xfc,
0x0e, 0xca, 0x08, 0x00, 0x00,
// 867 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x96, 0xcb, 0x52, 0x33, 0x45,
0x14, 0xc7, 0x49, 0xb8, 0xa6, 0x13, 0x2e, 0x36, 0xa0, 0x6d, 0xa8, 0x8a, 0x10, 0xbc, 0xe0, 0x0d,
0xad, 0xf0, 0x00, 0x1a, 0x13, 0x0a, 0xa8, 0xa2, 0x90, 0x9a, 0xc2, 0x2a, 0xab, 0x5c, 0x8c, 0xcd,
0xcc, 0x21, 0x19, 0x99, 0xcc, 0x8c, 0x3d, 0x9d, 0x88, 0x6f, 0xe2, 0x63, 0x78, 0xdb, 0xbb, 0x65,
0xe1, 0x05, 0xf5, 0x05, 0x14, 0x37, 0xee, 0xbf, 0xef, 0x01, 0xbe, 0xea, 0xcb, 0xf4, 0x64, 0x92,
0x0e, 0xbb, 0xc9, 0x39, 0xff, 0xf3, 0xfb, 0x9f, 0x99, 0x3e, 0x07, 0x1a, 0x6d, 0x32, 0x7a, 0xc3,
0xdd, 0x20, 0xe2, 0xc0, 0x22, 0x1a, 0x1e, 0x26, 0x2c, 0xe6, 0x31, 0xae, 0x01, 0xf7, 0xfc, 0x14,
0xd8, 0x08, 0x58, 0x72, 0x5d, 0xdf, 0xea, 0xc5, 0xbd, 0x58, 0x26, 0x3e, 0x10, 0x4f, 0x4a, 0x53,
0xdf, 0xc8, 0x35, 0x3a, 0x52, 0x61, 0x89, 0xa7, 0x1e, 0x9b, 0x5f, 0xa2, 0x55, 0x07, 0xbe, 0x1e,
0x42, 0xca, 0x4f, 0x81, 0xfa, 0xc0, 0xf0, 0x1a, 0x2a, 0x9f, 0x75, 0x49, 0x69, 0xb7, 0x74, 0xb0,
0xe0, 0x94, 0xcf, 0xba, 0xb8, 0x8e, 0x56, 0x86, 0xa9, 0xb0, 0x1c, 0x00, 0x29, 0xef, 0x96, 0x0e,
0x2a, 0x8e, 0xf9, 0x8d, 0xf7, 0xd1, 0x2a, 0x1d, 0xf2, 0xbe, 0xcb, 0x60, 0x14, 0xa4, 0x41, 0x1c,
0x91, 0x79, 0x59, 0x56, 0x13, 0x41, 0x47, 0xc7, 0x9a, 0xbf, 0xac, 0xa3, 0xcd, 0x33, 0xdd, 0xb5,
0x43, 0x6f, 0xb8, 0xb6, 0x9b, 0x32, 0x7a, 0x03, 0x95, 0x47, 0x2d, 0x69, 0x51, 0x6d, 0x6d, 0x1f,
0x8e, 0xbf, 0xd7, 0xa1, 0x2e, 0x71, 0xca, 0xa3, 0x16, 0xfe, 0x10, 0x2d, 0x32, 0x1a, 0xf5, 0x40,
0x7a, 0x55, 0x5b, 0xf5, 0x09, 0xa5, 0x48, 0x65, 0x72, 0x25, 0xc4, 0xef, 0xa0, 0xf9, 0x64, 0xc8,
0xc9, 0x82, 0xd4, 0x93, 0xa2, 0xfe, 0x72, 0x98, 0xf5, 0xe3, 0x08, 0x11, 0xee, 0xa0, 0x9a, 0x0f,
0x21, 0x70, 0x70, 0x95, 0xc9, 0xa2, 0x2c, 0xda, 0x2d, 0x16, 0x75, 0xa5, 0xa2, 0x60, 0x55, 0xf5,
0xf3, 0x98, 0x30, 0xe4, 0x77, 0x11, 0x59, 0xb2, 0x19, 0x5e, 0xdd, 0x45, 0xc6, 0x90, 0xdf, 0x45,
0xf8, 0x23, 0x84, 0xbc, 0x78, 0x90, 0x50, 0x8f, 0x8b, 0xef, 0xb7, 0x2c, 0x4b, 0x5e, 0x2b, 0x96,
0x74, 0x4c, 0x3e, 0xab, 0x1c, 0x2b, 0xc1, 0x1f, 0xa3, 0x6a, 0x08, 0x34, 0x05, 0xb7, 0xc7, 0x68,
0xc4, 0xc9, 0x8a, 0x8d, 0x70, 0x2e, 0x04, 0x27, 0x22, 0x6f, 0x08, 0xa1, 0x09, 0x89, 0x77, 0x56,
0x04, 0x06, 0xa3, 0xf8, 0x16, 0x48, 0xc5, 0xf6, 0xce, 0x12, 0xe1, 0x48, 0x81, 0x79, 0xe7, 0x30,
0x8f, 0x89, 0x63, 0xa1, 0x21, 0x65, 0x03, 0x82, 0x6c, 0xc7, 0xd2, 0x16, 0x29, 0x73, 0x2c, 0x52,
0x88, 0x3f, 0x45, 0x1b, 0xca, 0xd6, 0xeb, 0x83, 0x77, 0x9b, 0xc4, 0x41, 0xc4, 0x49, 0x55, 0x16,
0xbf, 0x6e, 0xb1, 0xee, 0x18, 0x51, 0x86, 0x59, 0x0f, 0x8b, 0x71, 0x7c, 0x84, 0x96, 0xfa, 0x72,
0x86, 0x89, 0x2f, 0x31, 0x3b, 0xd6, 0x21, 0x52, 0x63, 0xee, 0x68, 0x29, 0x6e, 0xa3, 0xaa, 0x1c,
0x61, 0x88, 0xe8, 0x75, 0x08, 0xe4, 0x7f, 0xeb, 0x09, 0xb4, 0x87, 0xbc, 0x7f, 0x2c, 0x05, 0xe6,
0xfb, 0x51, 0x13, 0xc2, 0x5d, 0x24, 0x07, 0xde, 0xf5, 0x83, 0x54, 0x32, 0x9e, 0x2d, 0xdb, 0x3e,
0xa0, 0x60, 0x74, 0x95, 0xc2, 0x7c, 0x40, 0x9a, 0xc7, 0xf0, 0x85, 0xa2, 0x40, 0xc4, 0x03, 0x8f,
0x72, 0x20, 0xcf, 0x15, 0xe5, 0xed, 0x22, 0x25, 0x5b, 0xa4, 0xf6, 0x98, 0x34, 0xc3, 0x15, 0xea,
0xf1, 0xb1, 0xde, 0x4d, 0xb1, 0xac, 0x2e, 0xf5, 0x7d, 0xf2, 0xeb, 0xca, 0xac, 0xb6, 0x3e, 0x4b,
0x81, 0xb5, 0x7d, 0xbf, 0xd0, 0x96, 0x8e, 0xe1, 0x0b, 0xb4, 0x91, 0x63, 0xd4, 0x90, 0x93, 0xdf,
0x14, 0x69, 0xdf, 0x4e, 0xd2, 0xdb, 0xa1, 0x61, 0x6b, 0xb4, 0x10, 0x2e, 0xb6, 0xd5, 0x03, 0x4e,
0x7e, 0x7f, 0xb2, 0xad, 0x13, 0xe0, 0x53, 0x6d, 0x9d, 0x00, 0xc7, 0x3d, 0xf4, 0x6a, 0x8e, 0xf1,
0xfa, 0x62, 0xed, 0xdc, 0x84, 0xa6, 0xe9, 0x37, 0x31, 0xf3, 0xc9, 0x1f, 0x0a, 0xf9, 0xae, 0x1d,
0xd9, 0x91, 0xea, 0x4b, 0x2d, 0xce, 0xe8, 0x2f, 0x53, 0x6b, 0x1a, 0x7f, 0x8e, 0xb6, 0xc6, 0xfa,
0x15, 0xfb, 0xe2, 0xb2, 0x38, 0x04, 0xf2, 0xa0, 0x3c, 0xde, 0x9c, 0xd1, 0xb6, 0xdc, 0xb5, 0x38,
0x3f, 0xea, 0x97, 0xe8, 0x64, 0x06, 0x7f, 0x81, 0xb6, 0x73, 0xb2, 0x5a, 0x3d, 0x85, 0xfe, 0x53,
0xa1, 0xdf, 0xb2, 0xa3, 0xf5, 0x0e, 0x8e, 0xb1, 0x31, 0x9d, 0x4a, 0xe1, 0x53, 0xb4, 0x96, 0xc3,
0xc3, 0x20, 0xe5, 0xe4, 0x2f, 0x45, 0xdd, 0xb3, 0x53, 0xcf, 0x83, 0x94, 0x17, 0xe6, 0x28, 0x0b,
0x1a, 0x92, 0x68, 0x4d, 0x91, 0xfe, 0x9e, 0x49, 0x12, 0xd6, 0x53, 0xa4, 0x2c, 0x68, 0x8e, 0x5e,
0x92, 0xc4, 0x44, 0x7e, 0x5f, 0x99, 0x75, 0xf4, 0xa2, 0x66, 0x72, 0x22, 0x75, 0xcc, 0x4c, 0xa4,
0xc4, 0xe8, 0x89, 0xfc, 0xa1, 0x32, 0x6b, 0x22, 0x45, 0x95, 0x65, 0x22, 0xf3, 0x70, 0xb1, 0x2d,
0x31, 0x91, 0x3f, 0x3e, 0xd9, 0xd6, 0xe4, 0x44, 0xea, 0x18, 0xfe, 0x0a, 0xd5, 0xc7, 0x30, 0x72,
0x50, 0x12, 0x60, 0x83, 0x20, 0x95, 0xff, 0x18, 0x7f, 0x52, 0xcc, 0xf7, 0x66, 0x30, 0x85, 0xfc,
0xd2, 0xa8, 0x33, 0xfe, 0x2b, 0xd4, 0x9e, 0xc7, 0x03, 0xb4, 0x93, 0x7b, 0xe9, 0xd1, 0x19, 0x33,
0xfb, 0x59, 0x99, 0xbd, 0x6f, 0x37, 0x53, 0x53, 0x32, 0xed, 0x46, 0xe8, 0x0c, 0x41, 0x73, 0x1d,
0xad, 0x1e, 0x0f, 0x12, 0xfe, 0xad, 0x03, 0x69, 0x12, 0x47, 0x29, 0x34, 0x13, 0xb4, 0xf3, 0xc4,
0x1f, 0x22, 0x8c, 0xd1, 0x82, 0xbc, 0x2e, 0x94, 0xe4, 0x75, 0x41, 0x3e, 0x8b, 0x6b, 0x84, 0xd9,
0x4f, 0x7d, 0x8d, 0xc8, 0x7e, 0xe3, 0x3d, 0x54, 0x4b, 0x83, 0x41, 0x12, 0x82, 0xcb, 0xe3, 0x5b,
0x50, 0xb7, 0x88, 0x8a, 0x53, 0x55, 0xb1, 0x2b, 0x11, 0xfa, 0x64, 0xeb, 0xfe, 0xdf, 0xc6, 0xdc,
0xfd, 0x63, 0xa3, 0xf4, 0xf0, 0xd8, 0x28, 0xfd, 0xf3, 0xd8, 0x28, 0x7d, 0xf7, 0x5f, 0x63, 0xee,
0x7a, 0x49, 0xde, 0x61, 0x8e, 0x5e, 0x04, 0x00, 0x00, 0xff, 0xff, 0xed, 0x36, 0xf0, 0x6f, 0x1b,
0x09, 0x00, 0x00,
}

View File

@ -37,6 +37,8 @@ message InternalRaftRequest {
AlarmRequest alarm = 10;
LeaseCheckpointRequest lease_checkpoint = 11;
AuthEnableRequest auth_enable = 1000;
AuthDisableRequest auth_disable = 1011;
@ -71,4 +73,3 @@ message InternalAuthenticateRequest {
// simple_token is generated in API layer (etcdserver/v3_server.go)
string simple_token = 3;
}

File diff suppressed because it is too large Load Diff

View File

@ -776,6 +776,22 @@ message LeaseRevokeResponse {
ResponseHeader header = 1;
}
message LeaseCheckpoint {
// ID is the lease ID to checkpoint.
int64 ID = 1;
// Remaining_TTL is the remaining time until expiry of the lease.
int64 remaining_TTL = 2;
}
message LeaseCheckpointRequest {
repeated LeaseCheckpoint checkpoints = 1;
}
message LeaseCheckpointResponse {
ResponseHeader header = 1;
}
message LeaseKeepAliveRequest {
// ID is the lease ID for the lease to keep alive.
int64 ID = 1;

View File

@ -519,7 +519,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
@ -576,6 +576,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
})
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,

View File

@ -148,6 +148,8 @@ type ClusterConfig struct {
// UseIP is true to use only IP for gRPC requests.
UseIP bool
LeaseCheckpointInterval time.Duration
}
type cluster struct {
@ -290,6 +292,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
@ -575,6 +578,7 @@ type memberConfig struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
leaseCheckpointInterval time.Duration
}
// mustNewMember return an inited member with the given name. If peerTLS is
@ -665,6 +669,7 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.useIP = mcfg.useIP
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.InitialCorruptCheck = true

View File

@ -25,7 +25,9 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// TestV3LeasePrmote ensures the newly elected leader can promote itself
@ -222,6 +224,56 @@ func TestV3LeaseKeepAlive(t *testing.T) {
})
}
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) {
var ttl int64 = 300
leaseInterval := 2 * time.Second
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3, LeaseCheckpointInterval: leaseInterval})
defer clus.Terminate(t)
// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
if err != nil {
t.Fatal(err)
}
// wait for a checkpoint to occur
time.Sleep(leaseInterval + 1*time.Second)
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
time.Sleep(250 * time.Millisecond)
// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
}
}
}
expectedTTL := ttl - int64(leaseInterval.Seconds())
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
}
}
// TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster.
func TestV3LeaseExists(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -14,10 +14,13 @@
package lease
// LeaseWithTime contains lease object with expire information.
// LeaseWithTime contains lease object with a time.
// For the lessor's lease heap, time identifies the lease expiration time.
// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
type LeaseWithTime struct {
id LeaseID
expiration int64
// Unix nanos timestamp.
time int64
index int
}
@ -26,7 +29,7 @@ type LeaseQueue []*LeaseWithTime
func (pq LeaseQueue) Len() int { return len(pq) }
func (pq LeaseQueue) Less(i, j int) bool {
return pq[i].expiration < pq[j].expiration
return pq[i].time < pq[j].time
}
func (pq LeaseQueue) Swap(i, j int) {

View File

@ -34,7 +34,7 @@ func TestLeaseQueue(t *testing.T) {
exp = time.Now().UnixNano()
}
le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)}
heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), expiration: exp})
heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), time: exp})
}
// first element must be front

View File

@ -24,14 +24,16 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)
func TestRenewHTTP(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -51,11 +53,12 @@ func TestRenewHTTP(t *testing.T) {
}
func TestTimeToLiveHTTP(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -92,11 +95,12 @@ func TestTimeToLiveHTTPTimeout(t *testing.T) {
}
func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {

View File

@ -42,6 +42,7 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Lease struct {
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
TTL int64 `protobuf:"varint,2,opt,name=TTL,proto3" json:"TTL,omitempty"`
RemainingTTL int64 `protobuf:"varint,3,opt,name=RemainingTTL,proto3" json:"RemainingTTL,omitempty"`
}
func (m *Lease) Reset() { *m = Lease{} }
@ -97,6 +98,11 @@ func (m *Lease) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintLease(dAtA, i, uint64(m.TTL))
}
if m.RemainingTTL != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintLease(dAtA, i, uint64(m.RemainingTTL))
}
return i, nil
}
@ -174,6 +180,9 @@ func (m *Lease) Size() (n int) {
if m.TTL != 0 {
n += 1 + sovLease(uint64(m.TTL))
}
if m.RemainingTTL != 0 {
n += 1 + sovLease(uint64(m.RemainingTTL))
}
return n
}
@ -277,6 +286,25 @@ func (m *Lease) Unmarshal(dAtA []byte) error {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RemainingTTL", wireType)
}
m.RemainingTTL = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLease
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.RemainingTTL |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipLease(dAtA[iNdEx:])
@ -572,20 +600,21 @@ var (
func init() { proto.RegisterFile("lease.proto", fileDescriptorLease) }
var fileDescriptorLease = []byte{
// 233 bytes of a gzipped FileDescriptorProto
// 253 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c,
0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2,
0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45,
0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92,
0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48,
0x21, 0xea, 0x94, 0x7c, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48,
0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x79, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8,
0x48, 0x30, 0x81, 0x05, 0x40, 0x4c, 0xa5, 0x12, 0x2e, 0x11, 0xb0, 0x52, 0xcf, 0xbc, 0x92, 0xd4,
0xa2, 0xbc, 0xc4, 0x9c, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0xa1, 0x18, 0x2e, 0x31, 0xb0,
0x78, 0x48, 0x66, 0x6e, 0x6a, 0x48, 0xbe, 0x4f, 0x66, 0x59, 0x2a, 0x54, 0x06, 0x6c, 0x1a, 0xb7,
0x91, 0x8a, 0x1e, 0xb2, 0xdd, 0x7a, 0xd8, 0xd5, 0x06, 0xe1, 0x30, 0x43, 0xa9, 0x82, 0x4b, 0x14,
0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71, 0x0c, 0x2d, 0x10, 0x29,
0xa8, 0xbd, 0xaa, 0x04, 0xec, 0x85, 0x28, 0x0e, 0xc2, 0x65, 0x8a, 0x93, 0xc4, 0x89, 0x87, 0x72,
0x0c, 0x17, 0x1e, 0xca, 0x31, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47,
0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xc3, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff,
0xff, 0x9f, 0xf2, 0x42, 0xe0, 0x91, 0x01, 0x00, 0x00,
0x48, 0x30, 0x81, 0x05, 0x40, 0x4c, 0x21, 0x25, 0x2e, 0x9e, 0xa0, 0xd4, 0xdc, 0xc4, 0xcc, 0xbc,
0xcc, 0xbc, 0x74, 0x90, 0x14, 0x33, 0x58, 0x0a, 0x45, 0x4c, 0xa9, 0x84, 0x4b, 0x04, 0x6c, 0x9c,
0x67, 0x5e, 0x49, 0x6a, 0x51, 0x5e, 0x62, 0x4e, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x50,
0x0c, 0x97, 0x18, 0x58, 0x3c, 0x24, 0x33, 0x37, 0x35, 0x24, 0xdf, 0x27, 0xb3, 0x2c, 0x15, 0x2a,
0x03, 0xb6, 0x91, 0xdb, 0x48, 0x45, 0x0f, 0xd9, 0x7d, 0x7a, 0xd8, 0xd5, 0x06, 0xe1, 0x30, 0x43,
0xa9, 0x82, 0x4b, 0x14, 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71,
0x0c, 0x2d, 0x10, 0x29, 0xa8, 0xbd, 0xaa, 0x04, 0xec, 0x85, 0x28, 0x0e, 0xc2, 0x65, 0x8a, 0x93,
0xc4, 0x89, 0x87, 0x72, 0x0c, 0x17, 0x1e, 0xca, 0x31, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91,
0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xc3, 0xd7, 0x18,
0x10, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x9f, 0x8b, 0x6c, 0xb5, 0x01, 0x00, 0x00,
}

View File

@ -13,6 +13,7 @@ option (gogoproto.goproto_enum_prefix_all) = false;
message Lease {
int64 ID = 1;
int64 TTL = 2;
int64 RemainingTTL = 3;
}
message LeaseInternalRequest {

View File

@ -16,6 +16,7 @@ package lease
import (
"container/heap"
"context"
"encoding/binary"
"errors"
"math"
@ -23,8 +24,10 @@ import (
"sync"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)
// NoLease is a special LeaseID representing the absence of a lease.
@ -41,6 +44,12 @@ var (
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000
// maximum number of lease checkpoints to batch into a single consensus log entry
maxLeaseCheckpointBatchSize = 1000
ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
ErrLeaseExists = errors.New("lease already exists")
@ -57,6 +66,10 @@ type TxnDelete interface {
// RangeDeleter is a TxnDelete constructor.
type RangeDeleter func() TxnDelete
// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
type LeaseID int64
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
@ -66,6 +79,8 @@ type Lessor interface {
// new TxnDeletes.
SetRangeDeleter(rd RangeDeleter)
SetCheckpointer(cp Checkpointer)
// Grant grants a lease that expires at least after TTL seconds.
Grant(id LeaseID, ttl int64) (*Lease, error)
// Revoke revokes a lease with given ID. The item attached to the
@ -73,6 +88,10 @@ type Lessor interface {
// will be returned.
Revoke(id LeaseID) error
// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
// the expiry of leases to less than the full TTL when possible.
Checkpoint(id LeaseID, remainingTTL int64) error
// Attach attaches given leaseItem to the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Attach(id LeaseID, items []LeaseItem) error
@ -125,12 +144,17 @@ type lessor struct {
leaseMap map[LeaseID]*Lease
leaseHeap LeaseQueue
leaseCheckpointHeap LeaseQueue
itemMap map[LeaseItem]LeaseID
// When a lease expires, the lessor will delete the
// leased range (or key) by the RangeDeleter.
rd RangeDeleter
// When a lease's deadline should be persisted to preserve the remaining TTL across leader
// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
cp Checkpointer
// backend to persist leases. We only persist lease ID and expiry for now.
// The leased items can be recovered by iterating all the keys in kv.
b backend.Backend
@ -144,23 +168,40 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}
lg *zap.Logger
// Wait duration between lease checkpoints.
checkpointInterval time.Duration
}
func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
return newLessor(b, minLeaseTTL)
type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
}
func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
}
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
if checkpointInterval == 0 {
checkpointInterval = 5 * time.Minute
}
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: minLeaseTTL,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
}
l.initAndRecover()
@ -193,6 +234,13 @@ func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
le.rd = rd
}
func (le *lessor) SetCheckpointer(cp Checkpointer) {
le.mu.Lock()
defer le.mu.Unlock()
le.cp = cp
}
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
if id == NoLease {
return nil, ErrLeaseNotFound
@ -229,12 +277,17 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}
le.leaseMap[id] = l
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
l.persistTo(le.b)
leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()
if le.isPrimary() {
le.scheduleCheckpointIfNeeded(l)
}
return l, nil
}
@ -278,6 +331,21 @@ func (le *lessor) Revoke(id LeaseID) error {
return nil
}
func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
le.mu.Lock()
defer le.mu.Unlock()
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
}
}
return nil
}
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
@ -316,8 +384,15 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
}
}
// Clear remaining TTL when we renew if it is set
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if le.cp != nil && l.remainingTTL > 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
}
l.refresh(0)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
leaseRenewed.Inc()
@ -355,7 +430,7 @@ func (le *lessor) Promote(extend time.Duration) {
// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
@ -393,8 +468,9 @@ func (le *lessor) Promote(extend time.Duration) {
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
le.scheduleCheckpointIfNeeded(l)
}
}
@ -413,6 +489,8 @@ func (le *lessor) Demote() {
l.forever()
}
le.clearScheduledLeasesCheckpoints()
if le.demotec != nil {
close(le.demotec)
le.demotec = nil
@ -491,6 +569,20 @@ func (le *lessor) runLoop() {
defer close(le.doneC)
for {
le.revokeExpiredLeases()
le.checkpointScheduledLeases()
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}
// revokeExpiredLeases finds all leases past their expiry and sends them to epxired channel for
// to be revoked.
func (le *lessor) revokeExpiredLeases() {
var ls []*Lease
// rate limit
@ -513,15 +605,34 @@ func (le *lessor) runLoop() {
// let's try this next time after 500ms
}
}
}
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
// submits them to the checkpointer to persist them to the consensus log.
func (le *lessor) checkpointScheduledLeases() {
var cps []*pb.LeaseCheckpoint
// rate limit
for i := 0; i < leaseCheckpointRate/2; i++ {
le.mu.Lock()
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
}
le.mu.Unlock()
if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
}
}
}
func (le *lessor) clearScheduledLeasesCheckpoints() {
le.leaseCheckpointHeap = make(LeaseQueue, 0)
}
// expireExists returns true if expiry items exist.
// It pops only when expiry item exists.
// "next" is true, to indicate that it may exist in next attempt.
@ -539,7 +650,7 @@ func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
return nil, false, true
}
if time.Now().UnixNano() < item.expiration {
if time.Now().UnixNano() < item.time /* expiration time */ {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l, false, false
@ -580,6 +691,61 @@ func (le *lessor) findExpiredLeases(limit int) []*Lease {
return leases
}
func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
if le.cp == nil {
return
}
if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
if le.lg != nil {
le.lg.Debug("Scheduling lease checkpoint",
zap.Int64("leaseID", int64(lease.ID)),
zap.Duration("intervalSeconds", le.checkpointInterval),
)
}
heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval).UnixNano(),
})
}
}
func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint {
if le.cp == nil {
return nil
}
now := time.Now()
cps := []*pb.LeaseCheckpoint{}
for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit {
lt := le.leaseCheckpointHeap[0]
if lt.time /* next checkpoint time */ > now.UnixNano() {
return cps
}
heap.Pop(&le.leaseCheckpointHeap)
var l *Lease
var ok bool
if l, ok = le.leaseMap[lt.id]; !ok {
continue
}
if !now.Before(l.expiry) {
continue
}
remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds()))
if remainingTTL >= l.ttl {
continue
}
if le.lg != nil {
le.lg.Debug("Checkpointing lease",
zap.Int64("leaseID", int64(lt.id)),
zap.Int64("remainingTTL", remainingTTL),
)
}
cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL})
}
return cps
}
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
@ -609,6 +775,7 @@ func (le *lessor) initAndRecover() {
}
}
heap.Init(&le.leaseHeap)
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()
le.b.ForceCommit()
@ -616,7 +783,8 @@ func (le *lessor) initAndRecover() {
type Lease struct {
ID LeaseID
ttl int64 // time to live in seconds
ttl int64 // time to live of the lease in seconds
remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
// expiryMu protects concurrent accesses to expiry
expiryMu sync.RWMutex
// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
@ -635,7 +803,7 @@ func (l *Lease) expired() bool {
func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl}
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
@ -651,9 +819,18 @@ func (l *Lease) TTL() int64 {
return l.ttl
}
// RemainingTTL returns the last checkpointed remaining TTL of the lease.
// TODO(jpbetz): do not expose this utility method
func (l *Lease) RemainingTTL() int64 {
if l.remainingTTL > 0 {
return l.remainingTTL
}
return l.ttl
}
// refresh refreshes the expiry of the lease.
func (l *Lease) refresh(extend time.Duration) {
newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second)
newExpiry := time.Now().Add(extend + time.Duration(l.RemainingTTL())*time.Second)
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
l.expiry = newExpiry
@ -703,10 +880,14 @@ type FakeLessor struct{}
func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {}
func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil }
func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 }

View File

@ -19,6 +19,7 @@ import (
"testing"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)
func BenchmarkLessorFindExpired1(b *testing.B) { benchmarkLessorFindExpired(1, b) }
@ -54,8 +55,9 @@ func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000,
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }
func benchmarkLessorFindExpired(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
le.Promote(0)
@ -71,8 +73,9 @@ func benchmarkLessorFindExpired(size int, b *testing.B) {
}
func benchmarkLessorGrant(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
@ -85,8 +88,9 @@ func benchmarkLessorGrant(size int, b *testing.B) {
}
func benchmarkLessorRevoke(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
@ -102,8 +106,9 @@ func benchmarkLessorRevoke(size int, b *testing.B) {
}
func benchmarkLessorRenew(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {

View File

@ -15,6 +15,7 @@
package lease
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -25,7 +26,9 @@ import (
"testing"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)
const (
@ -37,11 +40,12 @@ const (
// The granted lease should have a unique ID with a term
// that is greater than minLeaseTTL.
func TestLessorGrant(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -98,11 +102,12 @@ func TestLessorGrant(t *testing.T) {
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
// from concurrent map writes on 'itemSet'.
func TestLeaseConcurrentKeys(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -146,11 +151,12 @@ func TestLeaseConcurrentKeys(t *testing.T) {
// the backend.
// The revoked lease cannot be got from Lessor again.
func TestLessorRevoke(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
@ -198,11 +204,12 @@ func TestLessorRevoke(t *testing.T) {
// TestLessorRenew ensures Lessor can renew an existing lease.
func TestLessorRenew(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -234,12 +241,13 @@ func TestLessorRenew(t *testing.T) {
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
lg := zap.NewNop()
leaseRevokeRate = 10
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -258,7 +266,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(be, minLeaseTTL)
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@ -283,11 +291,12 @@ func TestLessorRenewExtendPileup(t *testing.T) {
}
func TestLessorDetach(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -323,11 +332,12 @@ func TestLessorDetach(t *testing.T) {
// TestLessorRecover ensures Lessor recovers leases from
// persist backend.
func TestLessorRecover(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
@ -336,7 +346,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
nle := newLessor(be, minLeaseTTL)
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
@ -350,13 +360,14 @@ func TestLessorRecover(t *testing.T) {
}
func TestLessorExpire(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
testMinTTL := int64(1)
le := newLessor(be, testMinTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -402,13 +413,14 @@ func TestLessorExpire(t *testing.T) {
}
func TestLessorExpireAndDemote(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
testMinTTL := int64(1)
le := newLessor(be, testMinTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -452,11 +464,12 @@ func TestLessorExpireAndDemote(t *testing.T) {
}
func TestLessorMaxTTL(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1)
@ -465,6 +478,62 @@ func TestLessorMaxTTL(t *testing.T) {
}
}
func TestLessorCheckpointScheduling(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
close(checkpointedC)
if len(lc.Checkpoints) != 1 {
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
}
c := lc.Checkpoints[0]
if c.Remaining_TTL != 1 {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
})
defer le.Stop()
le.Promote(0)
_, err := le.Grant(1, 2)
if err != nil {
t.Fatal(err)
}
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select {
case <-checkpointedC:
case <-time.After(2 * time.Second):
t.Fatal("expected checkpointer to be called, but it was not")
}
}
func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
t.Fatal(err)
}
le.Checkpoint(l.ID, 5)
le.Promote(0)
remaining := l.Remaining().Seconds()
if !(remaining > 4 && remaining < 5) {
t.Fatalf("expected expiry to be less than 1s in the future, but got %f seconds", remaining)
}
}
type fakeDeleter struct {
deleted []string
tx backend.BatchTx

View File

@ -45,7 +45,7 @@ func main() {
entrytype := flag.String("entry-type", "", `If set, filters output by entry type. Must be one or more than one of:
ConfigChange, Normal, Request, InternalRaftRequest,
IRRRange, IRRPut, IRRDeleteRange, IRRTxn,
IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke`)
IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`)
streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process
hex encoded lines of binary input (from etcd-dump-logs)
and output a hex encoded line of binary for each input line`)
@ -203,6 +203,11 @@ func passIRRLeaseRevoke(entry raftpb.Entry) (bool, string) {
return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseRevoke != nil, "InternalRaftRequest"
}
func passIRRLeaseCheckpoint(entry raftpb.Entry) (bool, string) {
var rr etcdserverpb.InternalRaftRequest
return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseCheckpoint != nil, "InternalRaftRequest"
}
func passRequest(entry raftpb.Entry) (bool, string) {
var rr1 etcdserverpb.Request
var rr2 etcdserverpb.InternalRaftRequest
@ -272,6 +277,7 @@ func evaluateEntrytypeFlag(entrytype string) []EntryFilter {
"IRRCompaction": {passIRRCompaction},
"IRRLeaseGrant": {passIRRLeaseGrant},
"IRRLeaseRevoke": {passIRRLeaseRevoke},
"IRRLeaseCheckpoint": {passIRRLeaseCheckpoint},
}
filters := make([]EntryFilter, 0)
if len(entrytypelist) == 0 {
@ -288,7 +294,7 @@ func evaluateEntrytypeFlag(entrytype string) []EntryFilter {
Please set entry-type to one or more of the following:
ConfigChange, Normal, Request, InternalRaftRequest,
IRRRange, IRRPut, IRRDeleteRange, IRRTxn,
IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke`, et)
IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`, et)
}
}