mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: expose Lessor Promote and Demote interface
This commit is contained in:
parent
99bee2fd29
commit
f5fa9b5384
@ -148,13 +148,21 @@ func (r *raftNode) start(s *EtcdServer) {
|
||||
}
|
||||
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
||||
if rd.RaftState == raft.StateLeader {
|
||||
// TODO: raft should send server a notification through chan when
|
||||
// it promotes or demotes instead of modifying server directly.
|
||||
syncC = r.s.SyncTicker
|
||||
if r.s.lessor != nil {
|
||||
r.s.lessor.Promote()
|
||||
}
|
||||
// TODO: remove the nil checking
|
||||
// current test utility does not provide the stats
|
||||
if r.s.stats != nil {
|
||||
r.s.stats.BecomeLeader()
|
||||
}
|
||||
} else {
|
||||
if r.s.lessor != nil {
|
||||
r.s.lessor.Demote()
|
||||
}
|
||||
syncC = nil
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package lease
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
@ -35,6 +36,7 @@ var (
|
||||
minLeaseTerm = 5 * time.Second
|
||||
|
||||
leaseBucketName = []byte("lease")
|
||||
ErrNotPrimary = errors.New("not a primary lessor")
|
||||
)
|
||||
|
||||
type LeaseID int64
|
||||
@ -56,12 +58,39 @@ type Lessor interface {
|
||||
// given lease will be removed. If the ID does not exist, an error
|
||||
// will be returned.
|
||||
Revoke(id LeaseID) error
|
||||
|
||||
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
|
||||
// the expiration and renew of leases.
|
||||
Promote()
|
||||
|
||||
// Demote demotes the lessor from being the primary lessor.
|
||||
Demote()
|
||||
|
||||
// Renew renews a lease with given ID. If the ID does not exist, an error
|
||||
// will be returned.
|
||||
Renew(id LeaseID) error
|
||||
}
|
||||
|
||||
// lessor implements Lessor interface.
|
||||
// TODO: use clockwork for testability.
|
||||
type lessor struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// primary indicates if this lessor is the primary lessor. The primary
|
||||
// lessor manages lease expiration and renew.
|
||||
//
|
||||
// in etcd, raft leader is the primary. Thus there might be two primary
|
||||
// leaders at the same time (raft allows concurrent leader but with different term)
|
||||
// for at most a leader election timeout.
|
||||
// The old primary leader cannot affect the correctness since its proposal has a
|
||||
// smaller term and will not be committed.
|
||||
//
|
||||
// TODO: raft follower do not forward lease management proposals. There might be a
|
||||
// very small window (within second normally which depends on go scheduling) that
|
||||
// a raft follow is the primary between the raft leader demotion and lessor demotion.
|
||||
// Usually this should not be a problem. Lease should not be that sensitive to timing.
|
||||
primary bool
|
||||
|
||||
// TODO: probably this should be a heap with a secondary
|
||||
// id index.
|
||||
// Now it is O(N) to loop over the leases to find expired ones.
|
||||
@ -101,6 +130,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
|
||||
}
|
||||
l.initAndRecover()
|
||||
|
||||
go l.runLoop()
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
@ -153,6 +184,10 @@ func (le *lessor) Renew(id LeaseID) error {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
if !le.primary {
|
||||
return ErrNotPrimary
|
||||
}
|
||||
|
||||
l := le.leaseMap[id]
|
||||
if l == nil {
|
||||
return fmt.Errorf("lease: cannot find lease %x", id)
|
||||
@ -163,6 +198,20 @@ func (le *lessor) Renew(id LeaseID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (le *lessor) Promote() {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
le.primary = true
|
||||
}
|
||||
|
||||
func (le *lessor) Demote() {
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
|
||||
le.primary = false
|
||||
}
|
||||
|
||||
// Attach attaches items to the lease with given ID. When the lease
|
||||
// expires, the attached items will be automatically removed.
|
||||
// If the given lease does not exist, an error will be returned.
|
||||
@ -192,6 +241,22 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
|
||||
le.initAndRecover()
|
||||
}
|
||||
|
||||
func (le *lessor) runLoop() {
|
||||
// TODO: stop runLoop
|
||||
for {
|
||||
le.mu.Lock()
|
||||
if le.primary {
|
||||
le.revokeExpiredLeases(le.findExpiredLeases())
|
||||
}
|
||||
le.mu.Unlock()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (le *lessor) revokeExpiredLeases(expired []*Lease) {
|
||||
// TODO: send revoke request to these expired lease through raft.
|
||||
}
|
||||
|
||||
// findExpiredLeases loops all the leases in the leaseMap and returns the expired
|
||||
// leases that needed to be revoked.
|
||||
func (le *lessor) findExpiredLeases() []*Lease {
|
||||
@ -202,6 +267,8 @@ func (le *lessor) findExpiredLeases() []*Lease {
|
||||
now := time.Now()
|
||||
|
||||
for _, l := range le.leaseMap {
|
||||
// TODO: probably should change to <= 100-500 millisecond to
|
||||
// make up committing latency.
|
||||
if l.expiry.Sub(now) <= 0 {
|
||||
leases = append(leases, l)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user