From ee479731992f9861aa567cf48912f04351ff2778 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 14 May 2015 17:53:41 -0700 Subject: [PATCH] storage: initial index --- storage/index.go | 97 ++++++++++++++++++++++++++++++ storage/index_test.go | 137 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 storage/index.go create mode 100644 storage/index_test.go diff --git a/storage/index.go b/storage/index.go new file mode 100644 index 000000000..41f263ca7 --- /dev/null +++ b/storage/index.go @@ -0,0 +1,97 @@ +package storage + +import ( + "log" + "sync" + + "github.com/google/btree" +) + +type index interface { + Get(key []byte, atIndex uint64) (index uint64, err error) + Put(key []byte, index uint64) + Tombstone(key []byte, index uint64) error + Compact(index uint64) map[uint64]struct{} +} + +type treeIndex struct { + sync.RWMutex + tree *btree.BTree +} + +func newTreeIndex() index { + return &treeIndex{ + tree: btree.New(32), + } +} + +func (ti *treeIndex) Put(key []byte, index uint64) { + keyi := &keyIndex{key: key} + + ti.Lock() + defer ti.Unlock() + item := ti.tree.Get(keyi) + if item == nil { + keyi.put(index) + ti.tree.ReplaceOrInsert(keyi) + return + } + okeyi := item.(*keyIndex) + okeyi.put(index) +} + +func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) { + keyi := &keyIndex{key: key} + + ti.RLock() + defer ti.RUnlock() + item := ti.tree.Get(keyi) + if item == nil { + return 0, ErrIndexNotFound + } + + keyi = item.(*keyIndex) + return keyi.get(atIndex) +} + +func (ti *treeIndex) Tombstone(key []byte, index uint64) error { + keyi := &keyIndex{key: key} + + ti.Lock() + defer ti.Unlock() + item := ti.tree.Get(keyi) + if item == nil { + return ErrIndexNotFound + } + + ki := item.(*keyIndex) + ki.tombstone(index) + return nil +} + +func (ti *treeIndex) Compact(index uint64) map[uint64]struct{} { + available := make(map[uint64]struct{}) + emptyki := make([]*keyIndex, 0) + log.Printf("store.index: compact %d", index) + ti.Lock() + defer ti.Unlock() + ti.tree.Ascend(compactIndex(index, available, &emptyki)) + for _, ki := range emptyki { + item := ti.tree.Delete(ki) + if item == nil { + log.Panic("store.index: unexpected delete failure during compaction") + } + } + return available +} + +func compactIndex(index uint64, available map[uint64]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { + return func(i btree.Item) bool { + keyi := i.(*keyIndex) + keyi.compact(index, available) + if keyi.isEmpty() { + *emptyki = append(*emptyki, keyi) + } + return true + } +} diff --git a/storage/index_test.go b/storage/index_test.go new file mode 100644 index 000000000..ce89d3d70 --- /dev/null +++ b/storage/index_test.go @@ -0,0 +1,137 @@ +package storage + +import ( + "reflect" + "testing" +) + +func TestIndexPutAndGet(t *testing.T) { + index := newTestTreeIndex() + + tests := []T{ + {[]byte("foo"), 0, ErrIndexNotFound, 0}, + {[]byte("foo"), 1, nil, 1}, + {[]byte("foo"), 3, nil, 1}, + {[]byte("foo"), 5, nil, 5}, + {[]byte("foo"), 6, nil, 5}, + + {[]byte("foo1"), 0, ErrIndexNotFound, 0}, + {[]byte("foo1"), 1, ErrIndexNotFound, 0}, + {[]byte("foo1"), 2, nil, 2}, + {[]byte("foo1"), 5, nil, 2}, + {[]byte("foo1"), 6, nil, 6}, + + {[]byte("foo2"), 0, ErrIndexNotFound, 0}, + {[]byte("foo2"), 1, ErrIndexNotFound, 0}, + {[]byte("foo2"), 3, nil, 3}, + {[]byte("foo2"), 4, nil, 4}, + {[]byte("foo2"), 6, nil, 4}, + } + verify(t, index, tests) +} + +func TestContinuousCompact(t *testing.T) { + index := newTestTreeIndex() + + tests := []T{ + {[]byte("foo"), 0, ErrIndexNotFound, 0}, + {[]byte("foo"), 1, nil, 1}, + {[]byte("foo"), 3, nil, 1}, + {[]byte("foo"), 5, nil, 5}, + {[]byte("foo"), 6, nil, 5}, + + {[]byte("foo1"), 0, ErrIndexNotFound, 0}, + {[]byte("foo1"), 1, ErrIndexNotFound, 0}, + {[]byte("foo1"), 2, nil, 2}, + {[]byte("foo1"), 5, nil, 2}, + {[]byte("foo1"), 6, nil, 6}, + + {[]byte("foo2"), 0, ErrIndexNotFound, 0}, + {[]byte("foo2"), 1, ErrIndexNotFound, 0}, + {[]byte("foo2"), 3, nil, 3}, + {[]byte("foo2"), 4, nil, 4}, + {[]byte("foo2"), 6, nil, 4}, + } + wa := map[uint64]struct{}{ + 1: struct{}{}, + 2: struct{}{}, + 3: struct{}{}, + 4: struct{}{}, + 5: struct{}{}, + 6: struct{}{}, + } + ga := index.Compact(1) + if !reflect.DeepEqual(ga, wa) { + t.Errorf("a = %v, want %v", ga, wa) + } + verify(t, index, tests) + + ga = index.Compact(2) + if !reflect.DeepEqual(ga, wa) { + t.Errorf("a = %v, want %v", ga, wa) + } + verify(t, index, tests) + + ga = index.Compact(3) + if !reflect.DeepEqual(ga, wa) { + t.Errorf("a = %v, want %v", ga, wa) + } + verify(t, index, tests) + + ga = index.Compact(4) + delete(wa, 3) + tests[12] = T{[]byte("foo2"), 3, ErrIndexNotFound, 0} + if !reflect.DeepEqual(wa, ga) { + t.Errorf("a = %v, want %v", ga, wa) + } + verify(t, index, tests) + + ga = index.Compact(5) + delete(wa, 1) + if !reflect.DeepEqual(ga, wa) { + t.Errorf("a = %v, want %v", ga, wa) + } + tests[1] = T{[]byte("foo"), 1, ErrIndexNotFound, 0} + tests[2] = T{[]byte("foo"), 3, ErrIndexNotFound, 0} + verify(t, index, tests) + + ga = index.Compact(6) + delete(wa, 2) + if !reflect.DeepEqual(ga, wa) { + t.Errorf("a = %v, want %v", ga, wa) + } + tests[7] = T{[]byte("foo1"), 2, ErrIndexNotFound, 0} + tests[8] = T{[]byte("foo1"), 5, ErrIndexNotFound, 0} + verify(t, index, tests) +} + +func verify(t *testing.T, index index, tests []T) { + for i, tt := range tests { + h, err := index.Get(tt.key, tt.index) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if h != tt.windex { + t.Errorf("#%d: index = %d, want %d", i, h, tt.windex) + } + } +} + +type T struct { + key []byte + index uint64 + + werr error + windex uint64 +} + +func newTestTreeIndex() index { + index := newTreeIndex() + index.Put([]byte("foo"), 1) + index.Put([]byte("foo1"), 2) + index.Put([]byte("foo2"), 3) + index.Put([]byte("foo2"), 4) + index.Put([]byte("foo"), 5) + index.Put([]byte("foo1"), 6) + return index +}