diff --git a/etcdserver/server.go b/etcdserver/server.go index aec1f22e7..1c9514b60 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -24,7 +24,6 @@ import ( "os" "path" "regexp" - "sync" "sync/atomic" "time" @@ -167,8 +166,8 @@ type EtcdServer struct { store store.Store - kvMu sync.RWMutex - kv dstorage.ConsistentWatchableKV + kv dstorage.ConsistentWatchableKV + be backend.Backend stats *stats.ServerStats lstats *stats.LeaderStats @@ -359,11 +358,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if cfg.V3demo { - be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) - srv.kv = dstorage.New(be, &srv.consistIndex) - if err := srv.kv.Restore(); err != nil { - plog.Fatalf("v3 storage restore error: %v", err) - } + srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + srv.kv = dstorage.New(srv.be, &srv.consistIndex) } // TODO: move transport initialization near the definition of remote @@ -542,6 +538,14 @@ func (s *EtcdServer) run() { defer func() { s.r.stop() + // kv and backend can be nil if runing without v3 enabled + // or running unit tests. + if s.kv != nil { + s.kv.Close() + } + if s.be != nil { + s.be.Close() + } close(s.done) <-appdonec }() @@ -586,21 +590,21 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } newbe := backend.NewDefaultBackend(fn) - newKV := dstorage.New(newbe, &s.consistIndex) - if err := newKV.Restore(); err != nil { + if err := s.kv.Restore(newbe); err != nil { plog.Panicf("restore KV error: %v", err) } - oldKV := s.swapKV(newKV) - - // Closing oldKV might block until all the txns - // on the kv are finished. - // We do not want to wait on closing the old kv. + // Closing old backend might block until all the txns + // on the backend are finished. + // We do not want to wait on closing the old backend. + oldbe := s.be go func() { - if err := oldKV.Close(); err != nil { - plog.Panicf("close KV error: %v", err) + if err := oldbe.Close(); err != nil { + plog.Panicf("close backend error: %v", err) } }() + + s.be = newbe } if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) @@ -1277,16 +1281,4 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { - s.kvMu.RLock() - defer s.kvMu.RUnlock() - return s.kv -} - -func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV { - s.kvMu.Lock() - defer s.kvMu.Unlock() - old := s.kv - s.kv = kv - return old -} +func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index c89a6395d..d85d22a4d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -867,10 +867,10 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer func() { - be.Close() os.RemoveAll(tmpPath) }() s.kv = dstorage.New(be, &s.consistIndex) + s.be = be s.start() defer s.Stop() diff --git a/storage/kv.go b/storage/kv.go index c190f5671..134703122 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -71,7 +71,8 @@ type KV interface { // Commit commits txns into the underlying backend. Commit() - Restore() error + // Restore restores the KV store from a backend. + Restore(b backend.Backend) error Close() error } diff --git a/storage/kv_test.go b/storage/kv_test.go index 1f16f1a46..130db3d2d 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -676,8 +676,8 @@ func TestKVRestore(t *testing.T) { } s.Close() + // ns should recover the the previous state from backend. ns := NewStore(b) - ns.Restore() // wait for possible compaction to finish testutil.WaitSchedule() var nkvss [][]storagepb.KeyValue @@ -724,7 +724,6 @@ func TestKVSnapshot(t *testing.T) { ns := NewStore(b) defer ns.Close() - ns.Restore() kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0) if err != nil { t.Errorf("unexpect range error (%v)", err) diff --git a/storage/kvstore.go b/storage/kvstore.go index 87b9aacbc..d83533472 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -84,6 +84,11 @@ func NewStore(b backend.Backend) *store { tx.Unlock() s.b.ForceCommit() + if err := s.restore(); err != nil { + // TODO: return the error instead of panic here? + panic("failed to recover store from backend") + } + return s } @@ -237,10 +242,28 @@ func (s *store) Snapshot() Snapshot { func (s *store) Commit() { s.b.ForceCommit() } -func (s *store) Restore() error { +func (s *store) Restore(b backend.Backend) error { s.mu.Lock() defer s.mu.Unlock() + close(s.stopc) + // TODO: restore without waiting for compaction routine to finish. + // We need a way to notify that the store is finished using the old + // backend though. + s.wg.Wait() + + s.b = b + s.kvindex = newTreeIndex() + s.currentRev = revision{} + s.compactMainRev = -1 + s.tx = b.BatchTx() + s.txnID = -1 + s.stopc = make(chan struct{}) + + return s.restore() +} + +func (s *store) restore() error { min, max := newRevBytes(), newRevBytes() revToBytes(revision{}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index bf906c429..fb837915f 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -332,7 +332,7 @@ func TestStoreRestore(t *testing.T) { b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} - s.Restore() + s.restore() if s.compactMainRev != 2 { t.Errorf("compact rev = %d, want 4", s.compactMainRev) @@ -378,7 +378,6 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Close() s1 := NewStore(b) - s1.Restore() // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond)