diff --git a/storage/backend/backend.go b/storage/backend/backend.go index d040a563e..c91492ed9 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -57,17 +57,13 @@ func (b *backend) BatchTx() BatchTx { // force commit the current batching tx. func (b *backend) ForceCommit() { - b.batchTx.Lock() - b.commitAndBegin() - b.batchTx.Unlock() + b.batchTx.Commit() } func (b *backend) run() { defer close(b.donec) - b.batchTx.Lock() - b.commitAndBegin() - b.batchTx.Unlock() + b.batchTx.Commit() b.startc <- struct{}{} for { @@ -76,9 +72,7 @@ func (b *backend) run() { case <-b.stopc: return } - b.batchTx.Lock() - b.commitAndBegin() - b.batchTx.Unlock() + b.batchTx.Commit() } } @@ -87,21 +81,3 @@ func (b *backend) Close() error { <-b.donec return b.db.Close() } - -// commitAndBegin commits a previous tx and begins a new writable one. -func (b *backend) commitAndBegin() { - var err error - // commit the last batchTx - if b.batchTx.tx != nil { - err = b.batchTx.tx.Commit() - if err != nil { - log.Fatalf("storage: cannot commit tx (%s)", err) - } - } - - // begin a new tx - b.batchTx.tx, err = b.db.Begin(true) - if err != nil { - log.Fatalf("storage: cannot begin tx (%s)", err) - } -} diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go index 3d4983ec1..db5a60f8c 100644 --- a/storage/backend/backend_test.go +++ b/storage/backend/backend_test.go @@ -27,35 +27,3 @@ func TestBackendPut(t *testing.T) { batchTx.Unlock() } - -func TestBackendForceCommit(t *testing.T) { - backend := New("test", 10*time.Second, 10000) - defer backend.Close() - defer os.Remove("test") - - v := []byte("foo") - batchTx := backend.BatchTx() - - batchTx.Lock() - - batchTx.UnsafeCreateBucket([]byte("test")) - batchTx.UnsafePut([]byte("test"), []byte("foo"), v) - - batchTx.Unlock() - - // expect to see nothing that the batch tx created - tx := backend.ReadTnx() - gbucket := tx.Bucket([]byte("test")) - if gbucket != nil { - t.Errorf("readtx.bu = %p, want nil", gbucket) - } - tx.Commit() - - // commit batch tx - backend.ForceCommit() - tx = backend.ReadTnx() - gbucket = tx.Bucket([]byte("test")) - if gbucket == nil { - t.Errorf("readtx.bu = nil, want not nil") - } -} diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 2bbe16064..45bbf3a24 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -15,23 +15,16 @@ type BatchTx interface { UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte UnsafeDelete(bucketName []byte, key []byte) + Commit() } type batchTx struct { - mu sync.Mutex + sync.Mutex tx *bolt.Tx backend *backend pending int } -func (t *batchTx) Lock() { - t.mu.Lock() -} - -func (t *batchTx) Unlock() { - t.mu.Unlock() -} - func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { @@ -50,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.backend.commitAndBegin() + t.Commit() t.pending = 0 } } @@ -92,7 +85,28 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.backend.commitAndBegin() + t.Commit() t.pending = 0 } } + +// commitAndBegin commits a previous tx and begins a new writable one. +func (t *batchTx) Commit() { + t.Lock() + defer t.Unlock() + + var err error + // commit the last tx + if t.tx != nil { + err = t.tx.Commit() + if err != nil { + log.Fatalf("storage: cannot commit tx (%s)", err) + } + } + + // begin a new tx + t.tx, err = t.backend.db.Begin(true) + if err != nil { + log.Fatalf("storage: cannot begin tx (%s)", err) + } +} diff --git a/storage/key_index.go b/storage/key_index.go index 9434987d2..cce3f6db5 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -186,6 +186,11 @@ type generation struct { func (g *generation) isEmpty() bool { return len(g.cont) == 0 } +// walk walks through the (index, version) pairs in the generation in ascending order. +// It passes the (index, version) to the given function. +// walk returns until: 1. it finishs walking all pairs 2. the function returns false. +// walk returns the (index, version) pair at where it stopped. If it stopped after +// finishing walking, (0, -1) will be returned. func (g *generation) walk(f func(index, ver uint64) bool) (uint64, int) { ver := g.ver l := len(g.cont) diff --git a/storage/kv.go b/storage/kv.go index 581c28132..2cb1587cb 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -3,6 +3,7 @@ package storage import ( "encoding/binary" "log" + "sync" "time" "github.com/coreos/etcd/storage/backend" @@ -16,19 +17,23 @@ var ( ) type store struct { + // read operation MUST hold read lock + // write opeartion MUST hold write lock + sync.RWMutex + b backend.Backend kvindex index - now uint64 // current index of the store - marshalBuf []byte // buffer for marshal protobuf + currentIndex uint64 + marshalBuf []byte // buffer for marshal protobuf } func newStore(path string) *store { s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - now: 0, - marshalBuf: make([]byte, 1024*1024), + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentIndex: 0, + marshalBuf: make([]byte, 1024*1024), } tx := s.b.BatchTx() @@ -41,16 +46,18 @@ func newStore(path string) *store { } func (s *store) Put(key, value []byte) { - now := s.now + 1 + s.Lock() + defer s.Unlock() + + currentIndex := s.currentIndex + 1 - s.kvindex.Put(key, now) ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, now) + binary.BigEndian.PutUint64(ibytes, currentIndex) tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() - s.now = now + s.currentIndex = currentIndex event := storagepb.Event{ Type: storagepb.PUT, @@ -77,10 +84,15 @@ func (s *store) Put(key, value []byte) { } tx.UnsafePut(keyBucketName, ibytes, d) + + s.kvindex.Put(key, currentIndex) } func (s *store) Get(key []byte) []byte { - index, err := s.kvindex.Get(key, s.now) + s.RLock() + defer s.RUnlock() + + index, err := s.kvindex.Get(key, s.currentIndex) if err != nil { return nil } @@ -97,20 +109,24 @@ func (s *store) Get(key []byte) []byte { } func (s *store) Delete(key []byte) error { - now := s.now + 1 + s.Lock() + defer s.Unlock() - err := s.kvindex.Tombstone(key, now) + _, err := s.kvindex.Get(key, s.currentIndex) if err != nil { - return err + return nil } + currentIndex := s.currentIndex + 1 + ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, now) + binary.BigEndian.PutUint64(ibytes, currentIndex) tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() // TODO: the value will be an event type. // A tombstone is simple a "Delete" type event. tx.UnsafePut(keyBucketName, key, []byte("tombstone")) - return nil + + return s.kvindex.Tombstone(key, currentIndex) }