Merge pull request #2874 from xiang90/storeAPI

kv api of stroage
This commit is contained in:
Xiang Li 2015-05-31 15:17:20 -07:00
commit 7735501407
6 changed files with 612 additions and 151 deletions

View File

@ -43,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
}
t.pending++
if t.pending > t.backend.batchLimit {
t.Commit()
t.commit()
t.pending = 0
}
}
@ -85,7 +85,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
}
t.pending++
if t.pending > t.backend.batchLimit {
t.Commit()
t.commit()
t.pending = 0
}
}
@ -94,7 +94,10 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit()
}
func (t *batchTx) commit() {
var err error
// commit the last tx
if t.tx != nil {

View File

@ -9,11 +9,17 @@ import (
type index interface {
Get(key []byte, atIndex uint64) (index uint64, err error)
Range(key, end []byte, atIndex uint64) []kipair
Put(key []byte, index uint64)
Tombstone(key []byte, index uint64) error
Compact(index uint64) map[uint64]struct{}
}
type kipair struct {
index uint64
key []byte
}
type treeIndex struct {
sync.RWMutex
tree *btree.BTree
@ -54,6 +60,38 @@ func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) {
return keyi.get(atIndex)
}
func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair {
if end == nil {
index, err := ti.Get(key, atIndex)
if err != nil {
return nil
}
return []kipair{{key: key, index: index}}
}
keyi := &keyIndex{key: key}
endi := &keyIndex{key: end}
pairs := make([]kipair, 0)
ti.RLock()
defer ti.RUnlock()
ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
if !item.Less(endi) {
return false
}
curKeyi := item.(*keyIndex)
index, err := curKeyi.get(atIndex)
if err != nil {
return true
}
pairs = append(pairs, kipair{index, curKeyi.key})
return true
})
return pairs
}
func (ti *treeIndex) Tombstone(key []byte, index uint64) error {
keyi := &keyIndex{key: key}

View File

@ -1,132 +1,35 @@
package storage
import (
"encoding/binary"
"log"
"sync"
"time"
import "github.com/coreos/etcd/storage/storagepb"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb"
)
type KV interface {
// Range gets the keys in the range at rangeIndex.
// If rangeIndex <=0, range gets the keys at currentIndex.
// If `end` is nil, the request returns the key.
// If `end` is not nil, it gets the keys in range [key, range_end).
// Limit limits the number of keys returned.
Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64)
var (
batchLimit = 10000
batchInterval = 100 * time.Millisecond
keyBucketName = []byte("key")
)
// Put puts the given key,value into the store.
// A put also increases the index of the store, and generates one event in the event history.
Put(key, value []byte) (index int64)
type store struct {
// read operation MUST hold read lock
// write opeartion MUST hold write lock
sync.RWMutex
// DeleteRange deletes the given range from the store.
// A deleteRange increases the index of the store if any key in the range exists.
// The number of key deleted will be returned.
// It also generates one event for each key delete in the event history.
// if the `end` is nil, deleteRange deletes the key.
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, index int64)
b backend.Backend
kvindex index
currentIndex uint64
marshalBuf []byte // buffer for marshal protobuf
}
func newStore(path string) *store {
s := &store{
b: backend.New(path, batchInterval, batchLimit),
kvindex: newTreeIndex(),
currentIndex: 0,
marshalBuf: make([]byte, 1024*1024),
}
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(keyBucketName)
tx.Unlock()
s.b.ForceCommit()
return s
}
func (s *store) Put(key, value []byte) {
s.Lock()
defer s.Unlock()
currentIndex := s.currentIndex + 1
ibytes := make([]byte, 8)
binary.BigEndian.PutUint64(ibytes, currentIndex)
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
s.currentIndex = currentIndex
event := storagepb.Event{
Type: storagepb.PUT,
Kv: storagepb.KeyValue{
Key: key,
Value: value,
},
}
var (
d []byte
err error
n int
)
if event.Size() < len(s.marshalBuf) {
n, err = event.MarshalTo(s.marshalBuf)
d = s.marshalBuf[:n]
} else {
d, err = event.Marshal()
}
if err != nil {
log.Fatalf("storage: cannot marshal event: %v", err)
}
tx.UnsafePut(keyBucketName, ibytes, d)
s.kvindex.Put(key, currentIndex)
}
func (s *store) Get(key []byte) []byte {
s.RLock()
defer s.RUnlock()
index, err := s.kvindex.Get(key, s.currentIndex)
if err != nil {
return nil
}
ibytes := make([]byte, 8)
binary.BigEndian.PutUint64(ibytes, index)
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
// TODO: the value will be an event type.
// TODO: copy out the bytes, decode it, return the value.
return vs[0]
}
func (s *store) Delete(key []byte) error {
s.Lock()
defer s.Unlock()
_, err := s.kvindex.Get(key, s.currentIndex)
if err != nil {
return nil
}
currentIndex := s.currentIndex + 1
ibytes := make([]byte, 8)
binary.BigEndian.PutUint64(ibytes, currentIndex)
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
// TODO: the value will be an event type.
// A tombstone is simple a "Delete" type event.
tx.UnsafePut(keyBucketName, key, []byte("tombstone"))
return s.kvindex.Tombstone(key, currentIndex)
// TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked
// until tnx ends. Only one on-going tnx is allowed.
// TnxBegin returns an int64 tnx ID.
// All tnx prefixed operations with same tnx ID will be done with the same index.
TnxBegin() int64
// TnxEnd ends the on-going tnx with tnx ID. If the on-going tnx ID is not matched, error is returned.
TnxEnd(tnxID int64) error
TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error)
TnxPut(tnxID int64, key, value []byte) (index int64, err error)
TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error)
}

View File

@ -1,24 +0,0 @@
package storage
import (
"crypto/rand"
"os"
"testing"
)
func BenchmarkStorePut(b *testing.B) {
s := newStore("test")
defer os.Remove("test")
// prepare keys
keys := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
keys[i] = make([]byte, 64)
rand.Read(keys[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Put(keys[i], []byte("foo"))
}
}

284
storage/kvstore.go Normal file
View File

@ -0,0 +1,284 @@
package storage
import (
"bytes"
"encoding/binary"
"errors"
"log"
"math/rand"
"sync"
"time"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb"
)
var (
batchLimit = 10000
batchInterval = 100 * time.Millisecond
keyBucketName = []byte("key")
ErrTnxIDMismatch = errors.New("storage: tnx id mismatch")
)
type store struct {
mu sync.RWMutex
b backend.Backend
kvindex index
currentIndex uint64
subIndex uint32 // tracks next subIndex to put into backend
tmu sync.Mutex // protect the tnxID field
tnxID int64 // tracks the current tnxID to verify tnx operations
}
func newStore(path string) KV {
s := &store{
b: backend.New(path, batchInterval, batchLimit),
kvindex: newTreeIndex(),
currentIndex: 0,
}
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(keyBucketName)
tx.Unlock()
s.b.ForceCommit()
return s
}
func (s *store) Put(key, value []byte) int64 {
id := s.TnxBegin()
s.put(key, value, s.currentIndex+1)
s.TnxEnd(id)
return int64(s.currentIndex)
}
func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
id := s.TnxBegin()
kvs, index = s.rangeKeys(key, end, limit, rangeIndex)
s.TnxEnd(id)
return kvs, index
}
func (s *store) DeleteRange(key, end []byte) (n, index int64) {
id := s.TnxBegin()
n = s.deleteRange(key, end, s.currentIndex+1)
s.TnxEnd(id)
return n, int64(s.currentIndex)
}
func (s *store) TnxBegin() int64 {
s.mu.Lock()
s.subIndex = 0
s.tmu.Lock()
defer s.tmu.Unlock()
s.tnxID = rand.Int63()
return s.tnxID
}
func (s *store) TnxEnd(tnxID int64) error {
s.tmu.Lock()
defer s.tmu.Unlock()
if tnxID != s.tnxID {
return ErrTnxIDMismatch
}
if s.subIndex != 0 {
s.currentIndex += 1
}
s.subIndex = 0
s.mu.Unlock()
return nil
}
func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) {
s.tmu.Lock()
defer s.tmu.Unlock()
if tnxID != s.tnxID {
return nil, 0, ErrTnxIDMismatch
}
kvs, index = s.rangeKeys(key, end, limit, rangeIndex)
return kvs, index, nil
}
func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) {
s.tmu.Lock()
defer s.tmu.Unlock()
if tnxID != s.tnxID {
return 0, ErrTnxIDMismatch
}
s.put(key, value, s.currentIndex+1)
return int64(s.currentIndex + 1), nil
}
func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) {
s.tmu.Lock()
defer s.tmu.Unlock()
if tnxID != s.tnxID {
return 0, 0, ErrTnxIDMismatch
}
n = s.deleteRange(key, end, s.currentIndex+1)
if n != 0 || s.subIndex != 0 {
index = int64(s.currentIndex + 1)
}
return n, index, nil
}
// range is a keyword in Go, add Keys suffix.
func (s *store) rangeKeys(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
if rangeIndex <= 0 {
index = int64(s.currentIndex)
if s.subIndex > 0 {
index += 1
}
} else {
index = rangeIndex
}
pairs := s.kvindex.Range(key, end, uint64(index))
if len(pairs) == 0 {
return nil, index
}
if limit > 0 && len(pairs) > int(limit) {
pairs = pairs[:limit]
}
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
for _, pair := range pairs {
ibytes := make([]byte, 8)
endbytes := make([]byte, 8)
binary.BigEndian.PutUint64(ibytes, pair.index)
binary.BigEndian.PutUint64(endbytes, pair.index+1)
found := false
var kv *storagepb.KeyValue
vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0)
for _, v := range vs {
var e storagepb.Event
err := e.Unmarshal(v)
if err != nil {
log.Fatalf("storage: range cannot unmarshal event: %v", err)
}
if bytes.Equal(e.Kv.Key, pair.key) {
if e.Type == storagepb.PUT {
kv = &e.Kv
} else {
kv = nil
}
found = true
}
}
if !found {
log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index)
}
if kv != nil {
kvs = append(kvs, *kv)
}
}
return kvs, index
}
func (s *store) put(key, value []byte, index uint64) {
ibytes := make([]byte, 8+1+4)
indexToBytes(index, s.subIndex, ibytes)
event := storagepb.Event{
Type: storagepb.PUT,
Kv: storagepb.KeyValue{
Key: key,
Value: value,
},
}
d, err := event.Marshal()
if err != nil {
log.Fatalf("storage: cannot marshal event: %v", err)
}
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(keyBucketName, ibytes, d)
s.kvindex.Put(key, index)
s.subIndex += 1
}
func (s *store) deleteRange(key, end []byte, index uint64) int64 {
var n int64
rindex := index
if s.subIndex > 0 {
rindex += 1
}
pairs := s.kvindex.Range(key, end, rindex)
if len(pairs) == 0 {
return 0
}
for _, pair := range pairs {
ok := s.delete(pair.key, index)
if ok {
n++
}
}
return n
}
func (s *store) delete(key []byte, index uint64) bool {
gindex := index
if s.subIndex > 0 {
gindex += 1
}
_, err := s.kvindex.Get(key, gindex)
if err != nil {
// key not exist
return false
}
ibytes := make([]byte, 8+1+4)
indexToBytes(index, s.subIndex, ibytes)
event := storagepb.Event{
Type: storagepb.DELETE,
Kv: storagepb.KeyValue{
Key: key,
},
}
d, err := event.Marshal()
if err != nil {
log.Fatalf("storage: cannot marshal event: %v", err)
}
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(keyBucketName, ibytes, d)
err = s.kvindex.Tombstone(key, index)
if err != nil {
log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
}
s.subIndex += 1
return true
}
func indexToBytes(index uint64, subindex uint32, bytes []byte) {
binary.BigEndian.PutUint64(bytes, index)
bytes[8] = '_'
binary.BigEndian.PutUint32(bytes[9:], subindex)
}

257
storage/kvstore_test.go Normal file
View File

@ -0,0 +1,257 @@
package storage
import (
"crypto/rand"
"os"
"testing"
)
func TestRange(t *testing.T) {
s := newStore("test")
defer os.Remove("test")
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
tests := []struct {
key, end []byte
index int64
windex int64
// TODO: change this to the actual kv
wN int64
}{
{
[]byte("foo"), []byte("foo3"), 0,
3, 3,
},
{
[]byte("foo"), []byte("foo1"), 0,
3, 1,
},
{
[]byte("foo"), []byte("foo3"), 1,
1, 1,
},
{
[]byte("foo"), []byte("foo3"), 2,
2, 2,
},
}
for i, tt := range tests {
kvs, index := s.Range(tt.key, tt.end, 0, tt.index)
if len(kvs) != int(tt.wN) {
t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN)
}
if index != tt.windex {
t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex)
}
}
}
func TestSimpleDeleteRange(t *testing.T) {
tests := []struct {
key, end []byte
windex int64
wN int64
}{
{
[]byte("foo"), []byte("foo1"),
4, 1,
},
{
[]byte("foo"), []byte("foo2"),
4, 2,
},
{
[]byte("foo"), []byte("foo3"),
4, 3,
},
{
[]byte("foo3"), []byte("foo8"),
3, 0,
},
}
for i, tt := range tests {
s := newStore("test")
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
n, index := s.DeleteRange(tt.key, tt.end)
if n != tt.wN {
t.Errorf("#%d: n = %d, want %d", i, n, tt.wN)
}
if index != tt.windex {
t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex)
}
os.Remove("test")
}
}
func TestRangeInSequence(t *testing.T) {
s := newStore("test")
defer os.Remove("test")
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
// remove foo
n, index := s.DeleteRange([]byte("foo"), nil)
if n != 1 || index != 4 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4)
}
// before removal foo
kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3)
if len(kvs) != 3 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3)
}
// after removal foo
kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4)
if len(kvs) != 2 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
}
// remove again -> expect nothing
n, index = s.DeleteRange([]byte("foo"), nil)
if n != 0 || index != 4 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4)
}
// remove foo1
n, index = s.DeleteRange([]byte("foo"), []byte("foo2"))
if n != 1 || index != 5 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5)
}
// after removal foo1
kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5)
if len(kvs) != 1 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
}
// remove foo2
n, index = s.DeleteRange([]byte("foo2"), []byte("foo3"))
if n != 1 || index != 6 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6)
}
// after removal foo2
kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6)
if len(kvs) != 0 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
}
}
func TestOneTnx(t *testing.T) {
s := newStore("test")
defer os.Remove("test")
id := s.TnxBegin()
for i := 0; i < 3; i++ {
s.TnxPut(id, []byte("foo"), []byte("bar"))
s.TnxPut(id, []byte("foo1"), []byte("bar1"))
s.TnxPut(id, []byte("foo2"), []byte("bar2"))
// remove foo
n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil)
if err != nil {
t.Fatal(err)
}
if n != 1 || index != 1 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
}
kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil {
t.Fatal(err)
}
if len(kvs) != 2 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
}
// remove again -> expect nothing
n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil)
if err != nil {
t.Fatal(err)
}
if n != 0 || index != 1 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1)
}
// remove foo1
n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2"))
if err != nil {
t.Fatal(err)
}
if n != 1 || index != 1 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
}
// after removal foo1
kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil {
t.Fatal(err)
}
if len(kvs) != 1 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
}
// remove foo2
n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3"))
if err != nil {
t.Fatal(err)
}
if n != 1 || index != 1 {
t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
}
// after removal foo2
kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil {
t.Fatal(err)
}
if len(kvs) != 0 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
}
}
err := s.TnxEnd(id)
if err != nil {
t.Fatal(err)
}
// After tnx
kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
if len(kvs) != 0 {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
}
if index != 1 {
t.Fatalf("index = %d, want %d", index, 1)
}
}
func BenchmarkStorePut(b *testing.B) {
s := newStore("test")
defer os.Remove("test")
// prepare keys
keys := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
keys[i] = make([]byte, 64)
rand.Read(keys[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Put(keys[i], []byte("foo"))
}
}