diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 5df76f22e..ca8cc4532 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -43,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.Commit() + t.commit() t.pending = 0 } } @@ -85,7 +85,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.Commit() + t.commit() t.pending = 0 } } @@ -94,7 +94,10 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { func (t *batchTx) Commit() { t.Lock() defer t.Unlock() + t.commit() +} +func (t *batchTx) commit() { var err error // commit the last tx if t.tx != nil { diff --git a/storage/index.go b/storage/index.go index 37b17e7ff..8af0d17db 100644 --- a/storage/index.go +++ b/storage/index.go @@ -9,11 +9,17 @@ import ( type index interface { Get(key []byte, atIndex uint64) (index uint64, err error) + Range(key, end []byte, atIndex uint64) []kipair Put(key []byte, index uint64) Tombstone(key []byte, index uint64) error Compact(index uint64) map[uint64]struct{} } +type kipair struct { + index uint64 + key []byte +} + type treeIndex struct { sync.RWMutex tree *btree.BTree @@ -54,6 +60,38 @@ func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) { return keyi.get(atIndex) } +func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair { + if end == nil { + index, err := ti.Get(key, atIndex) + if err != nil { + return nil + } + return []kipair{{key: key, index: index}} + } + + keyi := &keyIndex{key: key} + endi := &keyIndex{key: end} + pairs := make([]kipair, 0) + + ti.RLock() + defer ti.RUnlock() + + ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { + if !item.Less(endi) { + return false + } + curKeyi := item.(*keyIndex) + index, err := curKeyi.get(atIndex) + if err != nil { + return true + } + pairs = append(pairs, kipair{index, curKeyi.key}) + return true + }) + + return pairs +} + func (ti *treeIndex) Tombstone(key []byte, index uint64) error { keyi := &keyIndex{key: key} diff --git a/storage/kv.go b/storage/kv.go index 2cb1587cb..d99934be3 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -1,132 +1,35 @@ package storage -import ( - "encoding/binary" - "log" - "sync" - "time" +import "github.com/coreos/etcd/storage/storagepb" - "github.com/coreos/etcd/storage/backend" - "github.com/coreos/etcd/storage/storagepb" -) +type KV interface { + // Range gets the keys in the range at rangeIndex. + // If rangeIndex <=0, range gets the keys at currentIndex. + // If `end` is nil, the request returns the key. + // If `end` is not nil, it gets the keys in range [key, range_end). + // Limit limits the number of keys returned. + Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) -var ( - batchLimit = 10000 - batchInterval = 100 * time.Millisecond - keyBucketName = []byte("key") -) + // Put puts the given key,value into the store. + // A put also increases the index of the store, and generates one event in the event history. + Put(key, value []byte) (index int64) -type store struct { - // read operation MUST hold read lock - // write opeartion MUST hold write lock - sync.RWMutex + // DeleteRange deletes the given range from the store. + // A deleteRange increases the index of the store if any key in the range exists. + // The number of key deleted will be returned. + // It also generates one event for each key delete in the event history. + // if the `end` is nil, deleteRange deletes the key. + // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). + DeleteRange(key, end []byte) (n, index int64) - b backend.Backend - kvindex index - - currentIndex uint64 - marshalBuf []byte // buffer for marshal protobuf -} - -func newStore(path string) *store { - s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - currentIndex: 0, - marshalBuf: make([]byte, 1024*1024), - } - - tx := s.b.BatchTx() - tx.Lock() - tx.UnsafeCreateBucket(keyBucketName) - tx.Unlock() - s.b.ForceCommit() - - return s -} - -func (s *store) Put(key, value []byte) { - s.Lock() - defer s.Unlock() - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - s.currentIndex = currentIndex - - event := storagepb.Event{ - Type: storagepb.PUT, - Kv: storagepb.KeyValue{ - Key: key, - Value: value, - }, - } - - var ( - d []byte - err error - n int - ) - - if event.Size() < len(s.marshalBuf) { - n, err = event.MarshalTo(s.marshalBuf) - d = s.marshalBuf[:n] - } else { - d, err = event.Marshal() - } - if err != nil { - log.Fatalf("storage: cannot marshal event: %v", err) - } - - tx.UnsafePut(keyBucketName, ibytes, d) - - s.kvindex.Put(key, currentIndex) -} - -func (s *store) Get(key []byte) []byte { - s.RLock() - defer s.RUnlock() - - index, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, index) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0) - // TODO: the value will be an event type. - // TODO: copy out the bytes, decode it, return the value. - return vs[0] -} - -func (s *store) Delete(key []byte) error { - s.Lock() - defer s.Unlock() - - _, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - // TODO: the value will be an event type. - // A tombstone is simple a "Delete" type event. - tx.UnsafePut(keyBucketName, key, []byte("tombstone")) - - return s.kvindex.Tombstone(key, currentIndex) + // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked + // until tnx ends. Only one on-going tnx is allowed. + // TnxBegin returns an int64 tnx ID. + // All tnx prefixed operations with same tnx ID will be done with the same index. + TnxBegin() int64 + // TnxEnd ends the on-going tnx with tnx ID. If the on-going tnx ID is not matched, error is returned. + TnxEnd(tnxID int64) error + TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) + TnxPut(tnxID int64, key, value []byte) (index int64, err error) + TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) } diff --git a/storage/kv_test.go b/storage/kv_test.go deleted file mode 100644 index a9e2cb8d4..000000000 --- a/storage/kv_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package storage - -import ( - "crypto/rand" - "os" - "testing" -) - -func BenchmarkStorePut(b *testing.B) { - s := newStore("test") - defer os.Remove("test") - - // prepare keys - keys := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - keys[i] = make([]byte, 64) - rand.Read(keys[i]) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.Put(keys[i], []byte("foo")) - } -} diff --git a/storage/kvstore.go b/storage/kvstore.go new file mode 100644 index 000000000..d4237b293 --- /dev/null +++ b/storage/kvstore.go @@ -0,0 +1,284 @@ +package storage + +import ( + "bytes" + "encoding/binary" + "errors" + "log" + "math/rand" + "sync" + "time" + + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/storage/storagepb" +) + +var ( + batchLimit = 10000 + batchInterval = 100 * time.Millisecond + keyBucketName = []byte("key") + + ErrTnxIDMismatch = errors.New("storage: tnx id mismatch") +) + +type store struct { + mu sync.RWMutex + + b backend.Backend + kvindex index + + currentIndex uint64 + subIndex uint32 // tracks next subIndex to put into backend + + tmu sync.Mutex // protect the tnxID field + tnxID int64 // tracks the current tnxID to verify tnx operations +} + +func newStore(path string) KV { + s := &store{ + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentIndex: 0, + } + + tx := s.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(keyBucketName) + tx.Unlock() + s.b.ForceCommit() + + return s +} + +func (s *store) Put(key, value []byte) int64 { + id := s.TnxBegin() + s.put(key, value, s.currentIndex+1) + s.TnxEnd(id) + + return int64(s.currentIndex) +} + +func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { + id := s.TnxBegin() + kvs, index = s.rangeKeys(key, end, limit, rangeIndex) + s.TnxEnd(id) + + return kvs, index +} + +func (s *store) DeleteRange(key, end []byte) (n, index int64) { + id := s.TnxBegin() + n = s.deleteRange(key, end, s.currentIndex+1) + s.TnxEnd(id) + + return n, int64(s.currentIndex) +} + +func (s *store) TnxBegin() int64 { + s.mu.Lock() + s.subIndex = 0 + + s.tmu.Lock() + defer s.tmu.Unlock() + s.tnxID = rand.Int63() + return s.tnxID +} + +func (s *store) TnxEnd(tnxID int64) error { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return ErrTnxIDMismatch + } + + if s.subIndex != 0 { + s.currentIndex += 1 + } + s.subIndex = 0 + s.mu.Unlock() + return nil +} + +func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return nil, 0, ErrTnxIDMismatch + } + kvs, index = s.rangeKeys(key, end, limit, rangeIndex) + return kvs, index, nil +} + +func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, ErrTnxIDMismatch + } + + s.put(key, value, s.currentIndex+1) + return int64(s.currentIndex + 1), nil +} + +func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, 0, ErrTnxIDMismatch + } + + n = s.deleteRange(key, end, s.currentIndex+1) + if n != 0 || s.subIndex != 0 { + index = int64(s.currentIndex + 1) + } + return n, index, nil +} + +// range is a keyword in Go, add Keys suffix. +func (s *store) rangeKeys(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { + if rangeIndex <= 0 { + index = int64(s.currentIndex) + if s.subIndex > 0 { + index += 1 + } + } else { + index = rangeIndex + } + + pairs := s.kvindex.Range(key, end, uint64(index)) + if len(pairs) == 0 { + return nil, index + } + if limit > 0 && len(pairs) > int(limit) { + pairs = pairs[:limit] + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + + for _, pair := range pairs { + ibytes := make([]byte, 8) + endbytes := make([]byte, 8) + binary.BigEndian.PutUint64(ibytes, pair.index) + binary.BigEndian.PutUint64(endbytes, pair.index+1) + + found := false + var kv *storagepb.KeyValue + + vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) + for _, v := range vs { + var e storagepb.Event + err := e.Unmarshal(v) + if err != nil { + log.Fatalf("storage: range cannot unmarshal event: %v", err) + } + if bytes.Equal(e.Kv.Key, pair.key) { + if e.Type == storagepb.PUT { + kv = &e.Kv + } else { + kv = nil + } + found = true + } + } + + if !found { + log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) + } + if kv != nil { + kvs = append(kvs, *kv) + } + } + return kvs, index +} + +func (s *store) put(key, value []byte, index uint64) { + ibytes := make([]byte, 8+1+4) + indexToBytes(index, s.subIndex, ibytes) + + event := storagepb.Event{ + Type: storagepb.PUT, + Kv: storagepb.KeyValue{ + Key: key, + Value: value, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + s.kvindex.Put(key, index) + s.subIndex += 1 +} + +func (s *store) deleteRange(key, end []byte, index uint64) int64 { + var n int64 + rindex := index + if s.subIndex > 0 { + rindex += 1 + } + pairs := s.kvindex.Range(key, end, rindex) + + if len(pairs) == 0 { + return 0 + } + + for _, pair := range pairs { + ok := s.delete(pair.key, index) + if ok { + n++ + } + } + return n +} + +func (s *store) delete(key []byte, index uint64) bool { + gindex := index + if s.subIndex > 0 { + gindex += 1 + } + _, err := s.kvindex.Get(key, gindex) + if err != nil { + // key not exist + return false + } + + ibytes := make([]byte, 8+1+4) + indexToBytes(index, s.subIndex, ibytes) + + event := storagepb.Event{ + Type: storagepb.DELETE, + Kv: storagepb.KeyValue{ + Key: key, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + err = s.kvindex.Tombstone(key, index) + if err != nil { + log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) + } + s.subIndex += 1 + return true +} + +func indexToBytes(index uint64, subindex uint32, bytes []byte) { + binary.BigEndian.PutUint64(bytes, index) + bytes[8] = '_' + binary.BigEndian.PutUint32(bytes[9:], subindex) +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go new file mode 100644 index 000000000..5aacd165f --- /dev/null +++ b/storage/kvstore_test.go @@ -0,0 +1,257 @@ +package storage + +import ( + "crypto/rand" + "os" + "testing" +) + +func TestRange(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + tests := []struct { + key, end []byte + index int64 + + windex int64 + // TODO: change this to the actual kv + wN int64 + }{ + { + []byte("foo"), []byte("foo3"), 0, + 3, 3, + }, + { + []byte("foo"), []byte("foo1"), 0, + 3, 1, + }, + { + []byte("foo"), []byte("foo3"), 1, + 1, 1, + }, + { + []byte("foo"), []byte("foo3"), 2, + 2, 2, + }, + } + + for i, tt := range tests { + kvs, index := s.Range(tt.key, tt.end, 0, tt.index) + if len(kvs) != int(tt.wN) { + t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex) + } + } +} + +func TestSimpleDeleteRange(t *testing.T) { + tests := []struct { + key, end []byte + + windex int64 + wN int64 + }{ + { + []byte("foo"), []byte("foo1"), + 4, 1, + }, + { + []byte("foo"), []byte("foo2"), + 4, 2, + }, + { + []byte("foo"), []byte("foo3"), + 4, 3, + }, + { + []byte("foo3"), []byte("foo8"), + 3, 0, + }, + } + + for i, tt := range tests { + s := newStore("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + n, index := s.DeleteRange(tt.key, tt.end) + if n != tt.wN { + t.Errorf("#%d: n = %d, want %d", i, n, tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex) + } + + os.Remove("test") + } +} + +func TestRangeInSequence(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + // remove foo + n, index := s.DeleteRange([]byte("foo"), nil) + if n != 1 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4) + } + + // before removal foo + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + if len(kvs) != 3 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3) + } + + // after removal foo + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index = s.DeleteRange([]byte("foo"), nil) + if n != 0 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4) + } + + // remove foo1 + n, index = s.DeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || index != 5 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5) + } + + // after removal foo1 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index = s.DeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || index != 6 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6) + } + + // after removal foo2 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } +} + +func TestOneTnx(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + id := s.TnxBegin() + for i := 0; i < 3; i++ { + s.TnxPut(id, []byte("foo"), []byte("bar")) + s.TnxPut(id, []byte("foo1"), []byte("bar1")) + s.TnxPut(id, []byte("foo2"), []byte("bar2")) + + // remove foo + n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } + if n != 0 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) + } + + // remove foo1 + n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) + if err != nil { + t.Fatal(err) + } + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo1 + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) + if err != nil { + t.Fatal(err) + } + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo2 + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + } + err := s.TnxEnd(id) + if err != nil { + t.Fatal(err) + } + + // After tnx + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + if index != 1 { + t.Fatalf("index = %d, want %d", index, 1) + } +} + +func BenchmarkStorePut(b *testing.B) { + s := newStore("test") + defer os.Remove("test") + + // prepare keys + keys := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + keys[i] = make([]byte, 64) + rand.Read(keys[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Put(keys[i], []byte("foo")) + } +}