storage: add consistentWatchableStore

consistentWatchableStore maintains an index that is always consistent
with the latest txn. The index could be used to indicate the progress
of the store so far when recovery.
This commit is contained in:
Yicheng Qin 2015-10-21 11:08:19 -07:00
parent ae62a77de6
commit 4fb4bc3ca8
3 changed files with 131 additions and 0 deletions

View File

@ -0,0 +1,102 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"encoding/binary"
"log"
)
var (
consistentIndexKeyName = []byte("consistent_index")
)
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
// Get gets the consistent index of current executing entry.
Get() uint64
}
type consistentWatchableStore struct {
*watchableStore
// The field is used to get the consistent index of current
// executing entry.
// When the store finishes executing current entry, it will
// put the index got from ConsistentIndexGetter into the
// underlying backend. This helps to recover consistent index
// when restoring.
ig ConsistentIndexGetter
}
// newConsistentWatchableStore creates a new consistentWatchableStore
// using the file at the given path.
// If the file at the given path does not exist then it will be created automatically.
func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore {
return &consistentWatchableStore{
watchableStore: newWatchableStore(path),
ig: ig,
}
}
func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) {
id := s.TxnBegin()
rev, err := s.TxnPut(id, key, value)
if err != nil {
log.Panicf("unexpected TxnPut error (%v)", err)
}
if err := s.TxnEnd(id); err != nil {
log.Panicf("unexpected TxnEnd error (%v)", err)
}
return rev
}
func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
id := s.TxnBegin()
n, rev, err := s.TxnDeleteRange(id, key, end)
if err != nil {
log.Panicf("unexpected TxnDeleteRange error (%v)", err)
}
if err := s.TxnEnd(id); err != nil {
log.Panicf("unexpected TxnEnd error (%v)", err)
}
return n, rev
}
func (s *consistentWatchableStore) TxnBegin() int64 {
id := s.watchableStore.TxnBegin()
// TODO: avoid this unnecessary allocation
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, s.ig.Get())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
return id
}
func (s *consistentWatchableStore) ConsistentIndex() uint64 {
tx := s.watchableStore.store.b.BatchTx()
tx.Lock()
defer tx.Unlock()
// get the index
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
return binary.BigEndian.Uint64(vs[0])
}

View File

@ -103,3 +103,13 @@ type WatchableKV interface {
// should always call cancel as soon as watch is done.
Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc)
}
// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the index of the last executed entry
// by the KV in the consistent replicated log.
ConsistentIndex() uint64
}

View File

@ -819,6 +819,25 @@ func TestWatchableKVWatch(t *testing.T) {
}
}
type indexVal uint64
func (v *indexVal) Get() uint64 { return uint64(*v) }
func TestConsistentWatchableKVConsistentIndex(t *testing.T) {
var idx indexVal
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
tests := []uint64{1, 2, 3, 5, 10}
for i, tt := range tests {
idx = indexVal(tt)
s.Put([]byte("foo"), []byte("bar"))
if g := s.ConsistentIndex(); g != tt {
t.Errorf("#%d: index = %d, want %d", i, g, tt)
}
}
}
func cleanup(s KV, path string) {
s.Close()
os.Remove(path)