From 6b1d9fb7ce37ccb7d8de4866f22bb70bd465549e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 12 Jan 2016 09:00:57 -0800 Subject: [PATCH] *: stop lessor when etcdserver is stopped --- etcdserver/server.go | 5 ++++- lease/lessor.go | 30 +++++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 1aaf22672..a79929a53 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -541,8 +541,11 @@ func (s *EtcdServer) run() { defer func() { s.r.stop() - // kv and backend can be nil if running without v3 enabled + // kv, lessor and backend can be nil if running without v3 enabled // or running unit tests. + if s.lessor != nil { + s.lessor.Stop() + } if s.kv != nil { s.kv.Close() } diff --git a/lease/lessor.go b/lease/lessor.go index 2027fa479..23d97b1c7 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -83,8 +83,12 @@ type Lessor interface { // an error will be returned. Renew(id LeaseID) (int64, error) - // ExpiredLeasesC returens a chan that is used to receive expired leases. + // ExpiredLeasesC returns a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease + + // Stop stops the lessor for managing leases. The behavior of calling Stop multiple + // times is undefined. + Stop() } // lessor implements Lessor interface. @@ -124,6 +128,10 @@ type lessor struct { b backend.Backend expiredC chan []*Lease + // stopC is a channel whose closure indicates that the lessor should be stopped. + stopC chan struct{} + // doneC is a channel whose closure indicates that the lessor is stopped. + doneC chan struct{} idgen *idutil.Generator } @@ -144,6 +152,8 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor { b: b, // expiredC is a small buffered chan to avoid unncessary blocking. expiredC: make(chan []*Lease, 16), + stopC: make(chan struct{}), + doneC: make(chan struct{}), idgen: idutil.NewGenerator(lessorID, time.Now()), } l.initAndRecover() @@ -284,8 +294,14 @@ func (le *lessor) ExpiredLeasesC() <-chan []*Lease { return le.expiredC } +func (le *lessor) Stop() { + close(le.stopC) + <-le.doneC +} + func (le *lessor) runLoop() { - // TODO: stop runLoop + defer close(le.doneC) + for { var ls []*Lease @@ -297,6 +313,8 @@ func (le *lessor) runLoop() { if len(ls) != 0 { select { + case <-le.stopC: + return case le.expiredC <- ls: default: // the receiver of expiredC is probably busy handling @@ -305,7 +323,11 @@ func (le *lessor) runLoop() { } } - time.Sleep(500 * time.Millisecond) + select { + case <-time.After(500 * time.Millisecond): + case <-le.stopC: + return + } } } @@ -441,3 +463,5 @@ func (fl *FakeLessor) Demote() {} func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } + +func (fl *FakeLessor) Stop() {}