mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
support linearizable renew lease
When etcdserver receives a LeaseRenew request, it may be still in progress of processing the LeaseGrantRequest on exact the same leaseID. Accordingly it may return a TTL=0 to client due to the leaseID not found error. So the leader should wait for the appliedID to be available before processing client requests.
This commit is contained in:
@@ -45,6 +45,10 @@ const (
|
||||
maxGapBetweenApplyAndCommitIndex = 5000
|
||||
traceThreshold = 100 * time.Millisecond
|
||||
readIndexRetryTime = 500 * time.Millisecond
|
||||
|
||||
// The timeout for the node to catch up its applied index, and is used in
|
||||
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
|
||||
applyTimeout = time.Second
|
||||
)
|
||||
|
||||
type RaftKV interface {
|
||||
@@ -271,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
|
||||
return resp.(*pb.LeaseGrantResponse), nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) waitAppliedIndex() error {
|
||||
select {
|
||||
case <-s.ApplyWait():
|
||||
case <-s.stopping:
|
||||
return ErrStopped
|
||||
case <-time.After(applyTimeout):
|
||||
return ErrTimeoutWaitAppliedIndex
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
||||
if err != nil {
|
||||
@@ -280,26 +296,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
}
|
||||
if err != lease.ErrNotPrimary {
|
||||
return -1, err
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
}
|
||||
if err != lease.ErrNotPrimary {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
// renewals don't go through raft; forward to leader manually
|
||||
for cctx.Err() == nil && err != nil {
|
||||
for cctx.Err() == nil {
|
||||
leader, lerr := s.waitLeader(cctx)
|
||||
if lerr != nil {
|
||||
return -1, lerr
|
||||
}
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeasePrefix
|
||||
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
|
||||
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
|
||||
if err == nil || err == lease.ErrLeaseNotFound {
|
||||
return ttl, err
|
||||
}
|
||||
@@ -315,7 +337,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
if s.Leader() == s.ID() {
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// primary; timetolive directly from leader
|
||||
le := s.lessor.Lookup(lease.LeaseID(r.ID))
|
||||
if le == nil {
|
||||
|
||||
Reference in New Issue
Block a user