Merge pull request #3746 from yichengq/load-storage

etcdserver: fix recovering snapshot from disk
This commit is contained in:
Yicheng Qin 2015-10-27 14:42:41 -07:00
commit 099d8674c4
7 changed files with 115 additions and 34 deletions

View File

@ -1029,6 +1029,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
plog.Panicf("unexpected create snapshot error %v", err)
}
if s.cfg.V3demo {
// commit v3 storage because WAL file before snapshot index
// could be removed after SaveSnap.
s.kv.Commit()
}
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
if err := s.r.storage.SaveSnap(snap); err != nil {
plog.Fatalf("save snapshot error: %v", err)
}

View File

@ -186,6 +186,7 @@ func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
func (kv *nopKV) Compact(rev int64) error { return nil }
func (kv *nopKV) Hash() (uint32, error) { return 0, nil }
func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
func (kv *nopKV) Commit() {}
func (kv *nopKV) Restore() error { return nil }
func (kv *nopKV) Close() error { return nil }

View File

@ -17,6 +17,8 @@ package storage
import (
"encoding/binary"
"log"
"github.com/coreos/etcd/storage/storagepb"
)
var (
@ -39,6 +41,8 @@ type consistentWatchableStore struct {
// underlying backend. This helps to recover consistent index
// when restoring.
ig ConsistentIndexGetter
skip bool // indicate whether or not to skip an operation
}
func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
@ -82,23 +86,53 @@ func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
func (s *consistentWatchableStore) TxnBegin() int64 {
id := s.watchableStore.TxnBegin()
// TODO: avoid this unnecessary allocation
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
// If the consistent index of executing entry is not larger than store
// consistent index, skip all operations in this txn.
s.skip = s.ig.ConsistentIndex() <= s.consistentIndex()
if !s.skip {
// TODO: avoid this unnecessary allocation
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
return id
}
func (s *consistentWatchableStore) ConsistentIndex() uint64 {
tx := s.watchableStore.store.b.BatchTx()
tx.Lock()
defer tx.Unlock()
func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
if s.skip {
return nil, 0, nil
}
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
}
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
if s.skip {
return 0, nil
}
return s.watchableStore.TxnPut(txnID, key, value)
}
func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
if s.skip {
return 0, 0, nil
}
return s.watchableStore.TxnDeleteRange(txnID, key, end)
}
func (s *consistentWatchableStore) TxnEnd(txnID int64) error {
// reset skip var
s.skip = false
return s.watchableStore.TxnEnd(txnID)
}
func (s *consistentWatchableStore) consistentIndex() uint64 {
// get the index
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
// tx has been locked in TxnBegin, so there is no need to lock it again
_, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}

View File

@ -0,0 +1,54 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import "testing"
type indexVal uint64
func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
var idx indexVal
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
tests := []uint64{1, 2, 3, 5, 10}
for i, tt := range tests {
idx = indexVal(tt)
s.Put([]byte("foo"), []byte("bar"))
id := s.TxnBegin()
g := s.consistentIndex()
s.TxnEnd(id)
if g != tt {
t.Errorf("#%d: index = %d, want %d", i, g, tt)
}
}
}
func TestConsistentWatchableStoreSkip(t *testing.T) {
idx := indexVal(5)
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
// put is skipped
rev := s.Put([]byte("foo"), []byte("bar"))
if rev != 0 {
t.Errorf("rev = %d, want 0", rev)
}
}

View File

@ -69,6 +69,9 @@ type KV interface {
// Snapshot snapshots the full KV store.
Snapshot() Snapshot
// Commit commits txns into the underlying backend.
Commit()
Restore() error
Close() error
}
@ -106,10 +109,9 @@ type WatchableKV interface {
// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the index of the last executed entry
// by the KV in the consistent replicated log.
ConsistentIndex() uint64
}

View File

@ -819,25 +819,6 @@ func TestWatchableKVWatch(t *testing.T) {
}
}
type indexVal uint64
func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
func TestConsistentWatchableKVConsistentIndex(t *testing.T) {
var idx indexVal
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
tests := []uint64{1, 2, 3, 5, 10}
for i, tt := range tests {
idx = indexVal(tt)
s.Put([]byte("foo"), []byte("bar"))
if g := s.ConsistentIndex(); g != tt {
t.Errorf("#%d: index = %d, want %d", i, g, tt)
}
}
}
func cleanup(s KV, path string) {
s.Close()
os.Remove(path)

View File

@ -294,6 +294,8 @@ func (s *store) Snapshot() Snapshot {
return s.b.Snapshot()
}
func (s *store) Commit() { s.b.ForceCommit() }
func (s *store) Restore() error {
s.mu.Lock()
defer s.mu.Unlock()