From 4b8ee2d66e719b7c134d38bcc601735fd73bb3fe Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 26 Oct 2015 21:42:49 -0700 Subject: [PATCH] storage: skip old entry in ConsistentWatchableStore This avoids to apply the same entry twice when restoring from disk. --- storage/consistent_watchable_store.go | 56 +++++++++++++++++----- storage/consistent_watchable_store_test.go | 54 +++++++++++++++++++++ storage/kv.go | 7 ++- storage/kv_test.go | 19 -------- 4 files changed, 102 insertions(+), 34 deletions(-) create mode 100644 storage/consistent_watchable_store_test.go diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index 1125c7014..245230336 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -17,6 +17,8 @@ package storage import ( "encoding/binary" "log" + + "github.com/coreos/etcd/storage/storagepb" ) var ( @@ -39,6 +41,8 @@ type consistentWatchableStore struct { // underlying backend. This helps to recover consistent index // when restoring. ig ConsistentIndexGetter + + skip bool // indicate whether or not to skip an operation } func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV { @@ -82,23 +86,53 @@ func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) { func (s *consistentWatchableStore) TxnBegin() int64 { id := s.watchableStore.TxnBegin() - // TODO: avoid this unnecessary allocation - bs := make([]byte, 8) - binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + // If the consistent index of executing entry is not larger than store + // consistent index, skip all operations in this txn. + s.skip = s.ig.ConsistentIndex() <= s.consistentIndex() + + if !s.skip { + // TODO: avoid this unnecessary allocation + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + } return id } -func (s *consistentWatchableStore) ConsistentIndex() uint64 { - tx := s.watchableStore.store.b.BatchTx() - tx.Lock() - defer tx.Unlock() +func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + if s.skip { + return nil, 0, nil + } + return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev) +} +func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { + if s.skip { + return 0, nil + } + return s.watchableStore.TxnPut(txnID, key, value) +} + +func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { + if s.skip { + return 0, 0, nil + } + return s.watchableStore.TxnDeleteRange(txnID, key, end) +} + +func (s *consistentWatchableStore) TxnEnd(txnID int64) error { + // reset skip var + s.skip = false + return s.watchableStore.TxnEnd(txnID) +} + +func (s *consistentWatchableStore) consistentIndex() uint64 { // get the index - _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + // tx has been locked in TxnBegin, so there is no need to lock it again + _, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) if len(vs) == 0 { return 0 } diff --git a/storage/consistent_watchable_store_test.go b/storage/consistent_watchable_store_test.go new file mode 100644 index 000000000..95b6c33be --- /dev/null +++ b/storage/consistent_watchable_store_test.go @@ -0,0 +1,54 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import "testing" + +type indexVal uint64 + +func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } + +func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { + var idx indexVal + s := newConsistentWatchableStore(tmpPath, &idx) + defer cleanup(s, tmpPath) + + tests := []uint64{1, 2, 3, 5, 10} + for i, tt := range tests { + idx = indexVal(tt) + s.Put([]byte("foo"), []byte("bar")) + + id := s.TxnBegin() + g := s.consistentIndex() + s.TxnEnd(id) + if g != tt { + t.Errorf("#%d: index = %d, want %d", i, g, tt) + } + } +} + +func TestConsistentWatchableStoreSkip(t *testing.T) { + idx := indexVal(5) + s := newConsistentWatchableStore(tmpPath, &idx) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + + // put is skipped + rev := s.Put([]byte("foo"), []byte("bar")) + if rev != 0 { + t.Errorf("rev = %d, want 0", rev) + } +} diff --git a/storage/kv.go b/storage/kv.go index 2a605b042..f0a8ad354 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -109,10 +109,9 @@ type WatchableKV interface { // ConsistentWatchableKV is a WatchableKV that understands the consistency // algorithm and consistent index. +// If the consistent index of executing entry is not larger than the +// consistent index of ConsistentWatchableKV, all operations in +// this entry are skipped and return empty response. type ConsistentWatchableKV interface { WatchableKV - - // ConsistentIndex returns the index of the last executed entry - // by the KV in the consistent replicated log. - ConsistentIndex() uint64 } diff --git a/storage/kv_test.go b/storage/kv_test.go index 2c8f32d8f..dce4b7862 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -819,25 +819,6 @@ func TestWatchableKVWatch(t *testing.T) { } } -type indexVal uint64 - -func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } - -func TestConsistentWatchableKVConsistentIndex(t *testing.T) { - var idx indexVal - s := newConsistentWatchableStore(tmpPath, &idx) - defer cleanup(s, tmpPath) - - tests := []uint64{1, 2, 3, 5, 10} - for i, tt := range tests { - idx = indexVal(tt) - s.Put([]byte("foo"), []byte("bar")) - if g := s.ConsistentIndex(); g != tt { - t.Errorf("#%d: index = %d, want %d", i, g, tt) - } - } -} - func cleanup(s KV, path string) { s.Close() os.Remove(path)