*: stop lessor when etcdserver is stopped

This commit is contained in:
Xiang Li 2016-01-12 09:00:57 -08:00
parent 374b14e471
commit 6b1d9fb7ce
2 changed files with 31 additions and 4 deletions

View File

@ -541,8 +541,11 @@ func (s *EtcdServer) run() {
defer func() {
s.r.stop()
// kv and backend can be nil if running without v3 enabled
// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
if s.lessor != nil {
s.lessor.Stop()
}
if s.kv != nil {
s.kv.Close()
}

View File

@ -83,8 +83,12 @@ type Lessor interface {
// an error will be returned.
Renew(id LeaseID) (int64, error)
// ExpiredLeasesC returens a chan that is used to receive expired leases.
// ExpiredLeasesC returns a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease
// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
// times is undefined.
Stop()
}
// lessor implements Lessor interface.
@ -124,6 +128,10 @@ type lessor struct {
b backend.Backend
expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}
idgen *idutil.Generator
}
@ -144,6 +152,8 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor {
b: b,
// expiredC is a small buffered chan to avoid unncessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
idgen: idutil.NewGenerator(lessorID, time.Now()),
}
l.initAndRecover()
@ -284,8 +294,14 @@ func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
return le.expiredC
}
func (le *lessor) Stop() {
close(le.stopC)
<-le.doneC
}
func (le *lessor) runLoop() {
// TODO: stop runLoop
defer close(le.doneC)
for {
var ls []*Lease
@ -297,6 +313,8 @@ func (le *lessor) runLoop() {
if len(ls) != 0 {
select {
case <-le.stopC:
return
case le.expiredC <- ls:
default:
// the receiver of expiredC is probably busy handling
@ -305,7 +323,11 @@ func (le *lessor) runLoop() {
}
}
time.Sleep(500 * time.Millisecond)
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}
@ -441,3 +463,5 @@ func (fl *FakeLessor) Demote() {}
func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
func (fl *FakeLessor) Stop() {}