diff --git a/etcdserver/server.go b/etcdserver/server.go index 6874405c6..0b446e4f4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -208,6 +208,10 @@ type EtcdServer struct { forceVersionC chan struct{} msgSnapC chan raftpb.Message + + // wg is used to wait for the go routines that depends on the server state + // to exit when stopping the server. + wg sync.WaitGroup } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -536,6 +540,8 @@ func (s *EtcdServer) run() { s.r.stop() sched.Stop() + s.wg.Wait() + // kv, lessor and backend can be nil if running without v3 enabled // or running unit tests. if s.lessor != nil { @@ -1089,7 +1095,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.store.Clone() + s.wg.Add(1) go func() { + defer s.wg.Done() + d, err := clone.SaveNoCopy() // TODO: current store will never fail to do a snapshot // what should we do if the store might fail? diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 24fef1ad5..2625e4d7d 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -201,6 +201,18 @@ func (b *backend) Commits() int64 { } func (b *backend) Defrag() error { + err := b.defrag() + if err != nil { + return err + } + + // commit to update metadata like db.size + b.batchTx.Commit() + + return nil +} + +func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. @@ -251,8 +263,6 @@ func (b *backend) Defrag() error { if err != nil { log.Fatalf("backend: cannot begin tx (%s)", err) } - // commit to update metadata like db.size - b.batchTx.commit(false) return nil } diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index cea903c3f..24817b3c4 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -160,6 +160,8 @@ func (t *batchTx) commit(stop bool) { // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { + t.backend.mu.RLock() + defer t.backend.mu.RUnlock() atomic.StoreInt64(&t.backend.size, t.tx.Size()) return }