From 4fb4bc3ca8480c8f92c6eb5246e9a8355e78f7a3 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 21 Oct 2015 11:08:19 -0700 Subject: [PATCH] storage: add consistentWatchableStore consistentWatchableStore maintains an index that is always consistent with the latest txn. The index could be used to indicate the progress of the store so far when recovery. --- storage/consistent_watchable_store.go | 102 ++++++++++++++++++++++++++ storage/kv.go | 10 +++ storage/kv_test.go | 19 +++++ 3 files changed, 131 insertions(+) create mode 100644 storage/consistent_watchable_store.go diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go new file mode 100644 index 000000000..aedc55341 --- /dev/null +++ b/storage/consistent_watchable_store.go @@ -0,0 +1,102 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "encoding/binary" + "log" +) + +var ( + consistentIndexKeyName = []byte("consistent_index") +) + +// ConsistentIndexGetter is an interface that wraps the Get method. +// Consistent index is the offset of an entry in a consistent replicated log. +type ConsistentIndexGetter interface { + // Get gets the consistent index of current executing entry. + Get() uint64 +} + +type consistentWatchableStore struct { + *watchableStore + // The field is used to get the consistent index of current + // executing entry. + // When the store finishes executing current entry, it will + // put the index got from ConsistentIndexGetter into the + // underlying backend. This helps to recover consistent index + // when restoring. + ig ConsistentIndexGetter +} + +// newConsistentWatchableStore creates a new consistentWatchableStore +// using the file at the given path. +// If the file at the given path does not exist then it will be created automatically. +func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore { + return &consistentWatchableStore{ + watchableStore: newWatchableStore(path), + ig: ig, + } +} + +func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) { + id := s.TxnBegin() + rev, err := s.TxnPut(id, key, value) + if err != nil { + log.Panicf("unexpected TxnPut error (%v)", err) + } + if err := s.TxnEnd(id); err != nil { + log.Panicf("unexpected TxnEnd error (%v)", err) + } + return rev +} + +func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) { + id := s.TxnBegin() + n, rev, err := s.TxnDeleteRange(id, key, end) + if err != nil { + log.Panicf("unexpected TxnDeleteRange error (%v)", err) + } + if err := s.TxnEnd(id); err != nil { + log.Panicf("unexpected TxnEnd error (%v)", err) + } + return n, rev +} + +func (s *consistentWatchableStore) TxnBegin() int64 { + id := s.watchableStore.TxnBegin() + + // TODO: avoid this unnecessary allocation + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, s.ig.Get()) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + + return id +} + +func (s *consistentWatchableStore) ConsistentIndex() uint64 { + tx := s.watchableStore.store.b.BatchTx() + tx.Lock() + defer tx.Unlock() + + // get the index + _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0 + } + return binary.BigEndian.Uint64(vs[0]) +} diff --git a/storage/kv.go b/storage/kv.go index ceec4f0cd..daf039438 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -103,3 +103,13 @@ type WatchableKV interface { // should always call cancel as soon as watch is done. Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) } + +// ConsistentWatchableKV is a WatchableKV that understands the consistency +// algorithm and consistent index. +type ConsistentWatchableKV interface { + WatchableKV + + // ConsistentIndex returns the index of the last executed entry + // by the KV in the consistent replicated log. + ConsistentIndex() uint64 +} diff --git a/storage/kv_test.go b/storage/kv_test.go index f0db69481..fc99923c3 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -819,6 +819,25 @@ func TestWatchableKVWatch(t *testing.T) { } } +type indexVal uint64 + +func (v *indexVal) Get() uint64 { return uint64(*v) } + +func TestConsistentWatchableKVConsistentIndex(t *testing.T) { + var idx indexVal + s := newConsistentWatchableStore(tmpPath, &idx) + defer cleanup(s, tmpPath) + + tests := []uint64{1, 2, 3, 5, 10} + for i, tt := range tests { + idx = indexVal(tt) + s.Put([]byte("foo"), []byte("bar")) + if g := s.ConsistentIndex(); g != tt { + t.Errorf("#%d: index = %d, want %d", i, g, tt) + } + } +} + func cleanup(s KV, path string) { s.Close() os.Remove(path)