From 1714290f4e4dfb04adefd0ee9f21e3d55bc691e0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 6 Jan 2016 16:25:25 -0800 Subject: [PATCH] storage: support recovering from backend We want the KV to support recovering from backend to avoid additional pointer swap. Or we have to do coordination between etcdserver and API layer, since API layer might have access to kv pointer and use a closed kv. --- etcdserver/server.go | 52 +++++++++++++++++---------------------- etcdserver/server_test.go | 2 +- storage/kv.go | 3 ++- storage/kv_test.go | 3 +-- storage/kvstore.go | 25 ++++++++++++++++++- storage/kvstore_test.go | 3 +-- 6 files changed, 51 insertions(+), 37 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index aec1f22e7..1c9514b60 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -24,7 +24,6 @@ import ( "os" "path" "regexp" - "sync" "sync/atomic" "time" @@ -167,8 +166,8 @@ type EtcdServer struct { store store.Store - kvMu sync.RWMutex - kv dstorage.ConsistentWatchableKV + kv dstorage.ConsistentWatchableKV + be backend.Backend stats *stats.ServerStats lstats *stats.LeaderStats @@ -359,11 +358,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if cfg.V3demo { - be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) - srv.kv = dstorage.New(be, &srv.consistIndex) - if err := srv.kv.Restore(); err != nil { - plog.Fatalf("v3 storage restore error: %v", err) - } + srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + srv.kv = dstorage.New(srv.be, &srv.consistIndex) } // TODO: move transport initialization near the definition of remote @@ -542,6 +538,14 @@ func (s *EtcdServer) run() { defer func() { s.r.stop() + // kv and backend can be nil if runing without v3 enabled + // or running unit tests. + if s.kv != nil { + s.kv.Close() + } + if s.be != nil { + s.be.Close() + } close(s.done) <-appdonec }() @@ -586,21 +590,21 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } newbe := backend.NewDefaultBackend(fn) - newKV := dstorage.New(newbe, &s.consistIndex) - if err := newKV.Restore(); err != nil { + if err := s.kv.Restore(newbe); err != nil { plog.Panicf("restore KV error: %v", err) } - oldKV := s.swapKV(newKV) - - // Closing oldKV might block until all the txns - // on the kv are finished. - // We do not want to wait on closing the old kv. + // Closing old backend might block until all the txns + // on the backend are finished. + // We do not want to wait on closing the old backend. + oldbe := s.be go func() { - if err := oldKV.Close(); err != nil { - plog.Panicf("close KV error: %v", err) + if err := oldbe.Close(); err != nil { + plog.Panicf("close backend error: %v", err) } }() + + s.be = newbe } if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) @@ -1277,16 +1281,4 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { - s.kvMu.RLock() - defer s.kvMu.RUnlock() - return s.kv -} - -func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV { - s.kvMu.Lock() - defer s.kvMu.Unlock() - old := s.kv - s.kv = kv - return old -} +func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index c89a6395d..d85d22a4d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -867,10 +867,10 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer func() { - be.Close() os.RemoveAll(tmpPath) }() s.kv = dstorage.New(be, &s.consistIndex) + s.be = be s.start() defer s.Stop() diff --git a/storage/kv.go b/storage/kv.go index c190f5671..134703122 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -71,7 +71,8 @@ type KV interface { // Commit commits txns into the underlying backend. Commit() - Restore() error + // Restore restores the KV store from a backend. + Restore(b backend.Backend) error Close() error } diff --git a/storage/kv_test.go b/storage/kv_test.go index 1f16f1a46..130db3d2d 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -676,8 +676,8 @@ func TestKVRestore(t *testing.T) { } s.Close() + // ns should recover the the previous state from backend. ns := NewStore(b) - ns.Restore() // wait for possible compaction to finish testutil.WaitSchedule() var nkvss [][]storagepb.KeyValue @@ -724,7 +724,6 @@ func TestKVSnapshot(t *testing.T) { ns := NewStore(b) defer ns.Close() - ns.Restore() kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0) if err != nil { t.Errorf("unexpect range error (%v)", err) diff --git a/storage/kvstore.go b/storage/kvstore.go index 87b9aacbc..d83533472 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -84,6 +84,11 @@ func NewStore(b backend.Backend) *store { tx.Unlock() s.b.ForceCommit() + if err := s.restore(); err != nil { + // TODO: return the error instead of panic here? + panic("failed to recover store from backend") + } + return s } @@ -237,10 +242,28 @@ func (s *store) Snapshot() Snapshot { func (s *store) Commit() { s.b.ForceCommit() } -func (s *store) Restore() error { +func (s *store) Restore(b backend.Backend) error { s.mu.Lock() defer s.mu.Unlock() + close(s.stopc) + // TODO: restore without waiting for compaction routine to finish. + // We need a way to notify that the store is finished using the old + // backend though. + s.wg.Wait() + + s.b = b + s.kvindex = newTreeIndex() + s.currentRev = revision{} + s.compactMainRev = -1 + s.tx = b.BatchTx() + s.txnID = -1 + s.stopc = make(chan struct{}) + + return s.restore() +} + +func (s *store) restore() error { min, max := newRevBytes(), newRevBytes() revToBytes(revision{}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index bf906c429..fb837915f 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -332,7 +332,7 @@ func TestStoreRestore(t *testing.T) { b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} - s.Restore() + s.restore() if s.compactMainRev != 2 { t.Errorf("compact rev = %d, want 4", s.compactMainRev) @@ -378,7 +378,6 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Close() s1 := NewStore(b) - s1.Restore() // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond)