diff --git a/etcdserver/server.go b/etcdserver/server.go index 146ba4022..33e116cbb 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1029,6 +1029,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } plog.Panicf("unexpected create snapshot error %v", err) } + if s.cfg.V3demo { + // commit v3 storage because WAL file before snapshot index + // could be removed after SaveSnap. + s.kv.Commit() + } + // SaveSnap saves the snapshot and releases the locked wal files + // to the snapshot index. if err := s.r.storage.SaveSnap(snap); err != nil { plog.Fatalf("save snapshot error: %v", err) } diff --git a/etcdserver/snapshot_store_test.go b/etcdserver/snapshot_store_test.go index 119108a34..49ae6f413 100644 --- a/etcdserver/snapshot_store_test.go +++ b/etcdserver/snapshot_store_test.go @@ -186,6 +186,7 @@ func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err func (kv *nopKV) Compact(rev int64) error { return nil } func (kv *nopKV) Hash() (uint32, error) { return 0, nil } func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} } +func (kv *nopKV) Commit() {} func (kv *nopKV) Restore() error { return nil } func (kv *nopKV) Close() error { return nil } diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index 1125c7014..245230336 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -17,6 +17,8 @@ package storage import ( "encoding/binary" "log" + + "github.com/coreos/etcd/storage/storagepb" ) var ( @@ -39,6 +41,8 @@ type consistentWatchableStore struct { // underlying backend. This helps to recover consistent index // when restoring. ig ConsistentIndexGetter + + skip bool // indicate whether or not to skip an operation } func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV { @@ -82,23 +86,53 @@ func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) { func (s *consistentWatchableStore) TxnBegin() int64 { id := s.watchableStore.TxnBegin() - // TODO: avoid this unnecessary allocation - bs := make([]byte, 8) - binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + // If the consistent index of executing entry is not larger than store + // consistent index, skip all operations in this txn. + s.skip = s.ig.ConsistentIndex() <= s.consistentIndex() + + if !s.skip { + // TODO: avoid this unnecessary allocation + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + } return id } -func (s *consistentWatchableStore) ConsistentIndex() uint64 { - tx := s.watchableStore.store.b.BatchTx() - tx.Lock() - defer tx.Unlock() +func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + if s.skip { + return nil, 0, nil + } + return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev) +} +func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { + if s.skip { + return 0, nil + } + return s.watchableStore.TxnPut(txnID, key, value) +} + +func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { + if s.skip { + return 0, 0, nil + } + return s.watchableStore.TxnDeleteRange(txnID, key, end) +} + +func (s *consistentWatchableStore) TxnEnd(txnID int64) error { + // reset skip var + s.skip = false + return s.watchableStore.TxnEnd(txnID) +} + +func (s *consistentWatchableStore) consistentIndex() uint64 { // get the index - _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + // tx has been locked in TxnBegin, so there is no need to lock it again + _, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) if len(vs) == 0 { return 0 } diff --git a/storage/consistent_watchable_store_test.go b/storage/consistent_watchable_store_test.go new file mode 100644 index 000000000..95b6c33be --- /dev/null +++ b/storage/consistent_watchable_store_test.go @@ -0,0 +1,54 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import "testing" + +type indexVal uint64 + +func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } + +func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { + var idx indexVal + s := newConsistentWatchableStore(tmpPath, &idx) + defer cleanup(s, tmpPath) + + tests := []uint64{1, 2, 3, 5, 10} + for i, tt := range tests { + idx = indexVal(tt) + s.Put([]byte("foo"), []byte("bar")) + + id := s.TxnBegin() + g := s.consistentIndex() + s.TxnEnd(id) + if g != tt { + t.Errorf("#%d: index = %d, want %d", i, g, tt) + } + } +} + +func TestConsistentWatchableStoreSkip(t *testing.T) { + idx := indexVal(5) + s := newConsistentWatchableStore(tmpPath, &idx) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + + // put is skipped + rev := s.Put([]byte("foo"), []byte("bar")) + if rev != 0 { + t.Errorf("rev = %d, want 0", rev) + } +} diff --git a/storage/kv.go b/storage/kv.go index daf039438..f0a8ad354 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -69,6 +69,9 @@ type KV interface { // Snapshot snapshots the full KV store. Snapshot() Snapshot + // Commit commits txns into the underlying backend. + Commit() + Restore() error Close() error } @@ -106,10 +109,9 @@ type WatchableKV interface { // ConsistentWatchableKV is a WatchableKV that understands the consistency // algorithm and consistent index. +// If the consistent index of executing entry is not larger than the +// consistent index of ConsistentWatchableKV, all operations in +// this entry are skipped and return empty response. type ConsistentWatchableKV interface { WatchableKV - - // ConsistentIndex returns the index of the last executed entry - // by the KV in the consistent replicated log. - ConsistentIndex() uint64 } diff --git a/storage/kv_test.go b/storage/kv_test.go index 2c8f32d8f..dce4b7862 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -819,25 +819,6 @@ func TestWatchableKVWatch(t *testing.T) { } } -type indexVal uint64 - -func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } - -func TestConsistentWatchableKVConsistentIndex(t *testing.T) { - var idx indexVal - s := newConsistentWatchableStore(tmpPath, &idx) - defer cleanup(s, tmpPath) - - tests := []uint64{1, 2, 3, 5, 10} - for i, tt := range tests { - idx = indexVal(tt) - s.Put([]byte("foo"), []byte("bar")) - if g := s.ConsistentIndex(); g != tt { - t.Errorf("#%d: index = %d, want %d", i, g, tt) - } - } -} - func cleanup(s KV, path string) { s.Close() os.Remove(path) diff --git a/storage/kvstore.go b/storage/kvstore.go index 4a1385e8e..f5a9f6e28 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -294,6 +294,8 @@ func (s *store) Snapshot() Snapshot { return s.b.Snapshot() } +func (s *store) Commit() { s.b.ForceCommit() } + func (s *store) Restore() error { s.mu.Lock() defer s.mu.Unlock()