From 4991cda20297860a58b8fda34062e2d16e767c62 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 21 Apr 2016 08:48:11 -0700 Subject: [PATCH 1/2] etcdsever: fix the leaky snashot routine issue --- etcdserver/server.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/etcdserver/server.go b/etcdserver/server.go index 6874405c6..0b446e4f4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -208,6 +208,10 @@ type EtcdServer struct { forceVersionC chan struct{} msgSnapC chan raftpb.Message + + // wg is used to wait for the go routines that depends on the server state + // to exit when stopping the server. + wg sync.WaitGroup } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -536,6 +540,8 @@ func (s *EtcdServer) run() { s.r.stop() sched.Stop() + s.wg.Wait() + // kv, lessor and backend can be nil if running without v3 enabled // or running unit tests. if s.lessor != nil { @@ -1089,7 +1095,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.store.Clone() + s.wg.Add(1) go func() { + defer s.wg.Done() + d, err := clone.SaveNoCopy() // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? From c0cf44f1348631ed3c6e6a7885caa1a60ebe9db0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 21 Apr 2016 08:48:33 -0700 Subject: [PATCH 2/2] backedn: protect backend access with lock --- storage/backend/backend.go | 14 ++++++++++++-- storage/backend/batch_tx.go | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 24fef1ad5..2625e4d7d 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -201,6 +201,18 @@ func (b *backend) Commits() int64 { } func (b *backend) Defrag() error { + err := b.defrag() + if err != nil { + return err + } + + // commit to update metadata like db.size + b.batchTx.Commit() + + return nil +} + +func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. @@ -251,8 +263,6 @@ func (b *backend) Defrag() error { if err != nil { log.Fatalf("backend: cannot begin tx (%s)", err) } - // commit to update metadata like db.size - b.batchTx.commit(false) return nil } diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index cea903c3f..24817b3c4 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -160,6 +160,8 @@ func (t *batchTx) commit(stop bool) { // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { + t.backend.mu.RLock() + defer t.backend.mu.RUnlock() atomic.StoreInt64(&t.backend.size, t.tx.Size()) return }