diff --git a/leveldb/block.go b/leveldb/block.go new file mode 100644 index 000000000..145a89a7b --- /dev/null +++ b/leveldb/block.go @@ -0,0 +1,293 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb + +import ( + "bytes" + "fmt" + "encoding/binary" + "errors" + "github.com/conformal/btcdb" + "github.com/conformal/btcwire" +) + +// InsertBlockData stores a block hash and its associated data block with a +// previous sha of `prevSha' and a version of `pver'. +func (db *LevelDb) InsertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + return db.insertBlockData(sha, prevSha, pver, buf) +} + +func (db *LevelDb) getBlkLoc(sha *btcwire.ShaHash) (int64, int, error) { + var blkHeight int64 + var blkFile int + + key := sha.Bytes() + + data, err := db.bShaDb.Get(key, db.ro) + + if err != nil { + return 0, 0, err + } + + // deserialize + dr := bytes.NewBuffer(data) + err = binary.Read(dr, binary.LittleEndian, &blkHeight) + if err != nil { + err = errors.New("Db Corrupt") + return 0, 0, err + } + err = binary.Read(dr, binary.LittleEndian, &blkFile) + if err != nil { + err = errors.New("Db Corrupt") + return 0, 0, err + } + return blkHeight, blkFile, nil +} + +func (db *LevelDb) getBlkByHeight(blkHeight int64, blkFile int) (rsha *btcwire.ShaHash, rbuf []byte, err error) { + var blkVal []byte + + key := fmt.Sprintf("%d",blkHeight) + + blkVal, err = db.bBlkDb[blkFile].Get([]byte(key), db.ro) + if err != nil { + return // exists ??? + } + + var sha btcwire.ShaHash + + sha.SetBytes(blkVal[0:31]) + + return &sha, blkVal[32:], nil +} + +func (db *LevelDb) getBlk(sha *btcwire.ShaHash) (rblkHeight int64, rblkFile int, rbuf []byte, err error) { + var blkHeight int64 + var blkFile int + + blkHeight, blkFile, err = db.getBlkLoc(sha) + if err != nil { + return + } + + var buf []byte + + _, buf, err = db.getBlkByHeight(blkHeight, blkFile) + if err != nil { + return + } + return blkHeight, blkFile, buf, nil +} + +func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, blkFile int, buf []byte) (error) { + + // serialize + var lw bytes.Buffer + err := binary.Write(&lw, binary.LittleEndian, &blkHeight) + if err != nil { + err = errors.New("Write fail") + return err + } + err = binary.Write(&lw, binary.LittleEndian, &blkFile) + if err != nil { + err = errors.New("Write fail") + return err + } + key := sha.Bytes() + + err = db.bShaDb.Put(key, lw.Bytes(), db.wo) + + if err != nil { + return err + } + + key = []byte(fmt.Sprintf("%d",blkHeight)) + + shaB := sha.Bytes() + blkVal := make([]byte, len(shaB) + len(buf)) + copy (blkVal[0:], shaB) + copy (blkVal[len(shaB):], buf) + err = db.bBlkDb[blkFile].Put(key, blkVal, db.wo) + + return nil +} + +// insertSha stores a block hash and its associated data block with a +// previous sha of `prevSha' and a version of `pver'. +// insertSha shall be called with db lock held +func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) { + tx := &db.txState + if tx.tx == nil { + err = db.startTx() + if err != nil { + return + } + } + + oBlkHeight, _, err:= db.getBlkLoc(prevSha) + + if err != nil { + // check current block count + // if count != 0 { + // err = btcdb.PrevShaMissing + // return + // } + oBlkHeight = -1 + } + + // TODO(drahn) check curfile filesize, increment curfile if this puts it over + curFile := 0 + blkHeight := oBlkHeight - 1 + + err = db.setBlk(sha, blkHeight, curFile, buf) + + if err != nil { + return + } + + // update the last block cache + db.lastBlkShaCached = true + db.lastBlkSha = *sha + db.lastBlkIdx = blkHeight + + return blkHeight, nil +} + +// fetchSha returns the datablock and pver for the given ShaHash. +func (db *LevelDb) fetchSha(sha *btcwire.ShaHash) (rbuf []byte, rpver uint32, + rblkHeight int64, err error) { + var blkHeight int64 + var buf []byte + + blkHeight, _, buf, err = db.getBlk(sha) + if err != nil { + return + } + + fakepver := uint32(1) + + return buf, fakepver, blkHeight, nil +} + +// ExistsSha looks up the given block hash +// returns true if it is present in the database. +func (db *LevelDb) ExistsSha(sha *btcwire.ShaHash) (exists bool) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + _, exists = db.fetchBlockCache(sha) + if exists { + return + } + + // not in cache, try database + exists = db.blkExistsSha(sha) + return +} + +// blkExistsSha looks up the given block hash +// returns true if it is present in the database. +// CALLED WITH LOCK HELD +func (db *LevelDb) blkExistsSha(sha *btcwire.ShaHash) bool { + var pver uint32 + + oBlkHeight, _, err:= db.getBlkLoc(sha) + + if err != nil { + /* + should this warn if the failure is something besides does not exist ? + log.Warnf("blkExistsSha: fail %v", err) + */ + return false + } + return true +} + +// FetchBlockShaByHeight returns a block hash based on its height in the +// block chain. +func (db *LevelDb) FetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + return db.fetchBlockShaByHeight(height) +} + +// fetchBlockShaByHeight returns a block hash based on its height in the +// block chain. +func (db *LevelDb) fetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) { + + // TODO(drahn) figure out which file blkHeight is located + blkFile := 0 + var buf []byte + + _, buf, err = db.getBlkByHeight(height, blkFile) + + var shaval btcwire.ShaHash + shaval.SetBytes(buf[0:31]) + return &shaval, nil +} + +// FetchHeightRange looks up a range of blocks by the start and ending +// heights. Fetch is inclusive of the start height and exclusive of the +// ending height. To fetch all hashes from the start height until no +// more are present, use the special id `AllShas'. +func (db *LevelDb) FetchHeightRange(startHeight, endHeight int64) (rshalist []btcwire.ShaHash, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + var endidx int64 + if endHeight == btcdb.AllShas { + endidx = startHeight + 500 + } else { + endidx = endHeight + } + + var shalist []btcwire.ShaHash + for height := startHeight; height < endidx; height++ { + // TODO(drahn) fix blkFile from height + blkFile := 0 + + key := fmt.Sprintf("%d", height) + blkVal, lerr := db.bBlkDb[blkFile].Get([]byte(key), db.ro) + if lerr != nil { + break + } + + var sha btcwire.ShaHash + sha.SetBytes(blkVal[0:31]) + shalist = append(shalist, sha) + } + + if err == nil { + return + } + log.Tracef("FetchIdxRange idx %v %v returned %v shas err %v", startHeight, endHeight, len(shalist), err) + + return shalist, nil +} + +// NewestSha returns the hash and block height of the most recent (end) block of +// the block chain. It will return the zero hash, -1 for the block height, and +// no error (nil) if there are not any blocks in the database yet. +func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + if db.lastBlkIdx == -1 { + err = errors.New("Empty Database") + return + } + sha := db.lastBlkSha + + return &sha, db.lastBlkIdx, nil +} + +func (db *LevelDb) NewIterateBlocks() (rbogus btcdb.BlockIterator, err error) { + err = errors.New("Not implemented") + return +} diff --git a/leveldb/dbcache.go b/leveldb/dbcache.go new file mode 100644 index 000000000..6c3bfafe2 --- /dev/null +++ b/leveldb/dbcache.go @@ -0,0 +1,375 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb + +import ( + "bytes" + "container/list" + "github.com/conformal/btcdb" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "sync" +) + +type txCache struct { + maxcount int + fifo list.List + // NOTE: the key is specifically ShaHash, not *ShaHash + txMap map[btcwire.ShaHash]*txCacheObj + cacheLock sync.RWMutex +} + +type txCacheObj struct { + next *txCacheObj + sha btcwire.ShaHash + blksha btcwire.ShaHash + pver uint32 + tx *btcwire.MsgTx + height int64 + spent []byte + txbuf []byte +} + +type blockCache struct { + maxcount int + fifo list.List + blockMap map[btcwire.ShaHash]*blockCacheObj + blockHeightMap map[int64]*blockCacheObj + cacheLock sync.RWMutex +} + +type blockCacheObj struct { + next *blockCacheObj + sha btcwire.ShaHash + blk *btcutil.Block +} + +// FetchBlockBySha - return a btcutil Block, object may be a cached. +func (db *LevelDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + return db.fetchBlockBySha(sha) +} + +// fetchBlockBySha - return a btcutil Block, object may be a cached. +// Must be called with db lock held. +func (db *LevelDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) { + + blkcache, ok := db.fetchBlockCache(sha) + if ok { + return blkcache.blk, nil + } + + buf, pver, height, err := db.fetchSha(sha) + if err != nil { + return + } + + blk, err = btcutil.NewBlockFromBytes(buf, pver) + if err != nil { + return + } + blk.SetHeight(height) + db.insertBlockCache(sha, blk) + + return +} + +// fetchBlockCache check if a block is in the block cache, if so return it. +func (db *LevelDb) fetchBlockCache(sha *btcwire.ShaHash) (*blockCacheObj, bool) { + + db.blockCache.cacheLock.RLock() + defer db.blockCache.cacheLock.RUnlock() + + blkobj, ok := db.blockCache.blockMap[*sha] + if !ok { // could this just return the map deref? + return nil, false + } + return blkobj, true +} + +// fetchBlockHeightCache check if a block is in the block cache, if so return it. +func (db *LevelDb) fetchBlockHeightCache(height int64) (*blockCacheObj, bool) { + + db.blockCache.cacheLock.RLock() + defer db.blockCache.cacheLock.RUnlock() + + blkobj, ok := db.blockCache.blockHeightMap[height] + if !ok { // could this just return the map deref? + return nil, false + } + return blkobj, true +} + +// insertBlockCache insert the given sha/block into the cache map. +// If the block cache is determined to be full, it will release +// an old entry in FIFO order. +func (db *LevelDb) insertBlockCache(sha *btcwire.ShaHash, blk *btcutil.Block) { + bc := &db.blockCache + + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + + blkObj := blockCacheObj{sha: *sha, blk: blk} + bc.fifo.PushBack(&blkObj) + + if bc.fifo.Len() > bc.maxcount { + listobj := bc.fifo.Front() + bc.fifo.Remove(listobj) + tailObj, ok := listobj.Value.(*blockCacheObj) + if ok { + delete(bc.blockMap, tailObj.sha) + delete(bc.blockHeightMap, tailObj.blk.Height()) + } else { + panic("invalid type pushed on blockCache list") + } + } + + bc.blockHeightMap[blk.Height()] = &blkObj + bc.blockMap[blkObj.sha] = &blkObj +} + +// FetchTxByShaList given a array of ShaHash, look up the transactions +// and return them in a TxListReply array. +func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + var replies []*btcdb.TxListReply + for _, txsha := range txShaList { + tx, _, _, _, height, txspent, err := db.fetchTxDataBySha(txsha) + btxspent := []bool{} + if err == nil { + btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut)) + for idx := range tx.TxOut { + byteidx := idx / 8 + byteoff := uint(idx % 8) + btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0 + } + } + txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Height: height, TxSpent: btxspent, Err: err} + replies = append(replies, &txlre) + } + return replies +} + +// fetchTxDataBySha returns several pieces of data regarding the given sha. +func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) { + + var pver uint32 + var blksha *btcwire.ShaHash + var height int64 + var txspent []byte + var toff int + var tlen int + var blk *btcutil.Block + var blkbuf []byte + + // Check Tx cache + if txc, ok := db.fetchTxCache(txsha); ok { + if txc.spent != nil { + return txc.tx, txc.txbuf, txc.pver, &txc.blksha, txc.height, txc.spent, nil + } + } + + // If not cached load it + height, toff, tlen, txspent, err = db.fetchLocationUsedBySha(txsha) + if err != nil { + return + } + + blksha, err = db.fetchBlockShaByHeight(height) + if err != nil { + log.Warnf("block idx lookup %v to %v", height, err) + return + } + log.Tracef("transaction %v is at block %v %v tx %v", + txsha, blksha, height, toff) + + blk, err = db.fetchBlockBySha(blksha) + if err != nil { + log.Warnf("unable to fetch block %v %v ", + height, &blksha) + return + } + + blkbuf, pver, err = blk.Bytes() + if err != nil { + log.Warnf("unable to decode block %v %v", height, &blksha) + return + } + + txbuf := make([]byte, tlen) + copy(txbuf[:], blkbuf[toff:toff+tlen]) + rbuf := bytes.NewBuffer(txbuf) + + var tx btcwire.MsgTx + err = tx.BtcDecode(rbuf, pver) + if err != nil { + log.Warnf("unable to decode tx block %v %v txoff %v txlen %v", + height, &blksha, toff, tlen) + return + } + + // Shove data into TxCache + // XXX - + var txc txCacheObj + txc.sha = *txsha + txc.tx = &tx + txc.txbuf = txbuf + txc.pver = pver + txc.height = height + txc.spent = txspent + txc.blksha = *blksha + db.insertTxCache(&txc) + + return &tx, txbuf, pver, blksha, height, txspent, nil +} + +// FetchTxAllBySha returns several pieces of data regarding the given sha. +func (db *LevelDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error) { + var pver uint32 + var blksha *btcwire.ShaHash + var height int64 + var toff int + var tlen int + var blk *btcutil.Block + var blkbuf []byte + + // Check Tx cache + if txc, ok := db.fetchTxCache(txsha); ok { + return txc.tx, txc.txbuf, txc.pver, &txc.blksha, nil + } + + // If not cached load it + height, toff, tlen, err = db.FetchLocationBySha(txsha) + if err != nil { + return + } + + blksha, err = db.FetchBlockShaByHeight(height) + if err != nil { + log.Warnf("block idx lookup %v to %v", height, err) + return + } + log.Tracef("transaction %v is at block %v %v tx %v", + txsha, blksha, height, toff) + + blk, err = db.FetchBlockBySha(blksha) + if err != nil { + log.Warnf("unable to fetch block %v %v ", + height, &blksha) + return + } + + blkbuf, pver, err = blk.Bytes() + if err != nil { + log.Warnf("unable to decode block %v %v", height, &blksha) + return + } + + txbuf := make([]byte, tlen) + copy(txbuf[:], blkbuf[toff:toff+tlen]) + rbuf := bytes.NewBuffer(txbuf) + + var tx btcwire.MsgTx + err = tx.BtcDecode(rbuf, pver) + if err != nil { + log.Warnf("unable to decode tx block %v %v txoff %v txlen %v", + height, &blksha, toff, tlen) + return + } + + // Shove data into TxCache + // XXX - + var txc txCacheObj + txc.sha = *txsha + txc.tx = &tx + txc.txbuf = txbuf + txc.pver = pver + txc.height = height + txc.blksha = *blksha + db.insertTxCache(&txc) + + return &tx, txbuf, pver, blksha, nil +} + +// FetchTxBySha returns some data for the given Tx Sha. +func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rpver uint32, blksha *btcwire.ShaHash, err error) { + rtx, _, rpver, blksha, err = db.FetchTxAllBySha(txsha) + return +} + +// FetchTxBufBySha return the bytestream data and associated protocol version. +// for the given Tx Sha +func (db *LevelDb) FetchTxBufBySha(txsha *btcwire.ShaHash) (txbuf []byte, rpver uint32, err error) { + _, txbuf, rpver, _, err = db.FetchTxAllBySha(txsha) + return +} + +// fetchTxCache look up the given transaction in the Tx cache. +func (db *LevelDb) fetchTxCache(sha *btcwire.ShaHash) (*txCacheObj, bool) { + tc := &db.txCache + + tc.cacheLock.RLock() + defer tc.cacheLock.RUnlock() + + txObj, ok := tc.txMap[*sha] + if !ok { // could this just return the map deref? + return nil, false + } + return txObj, true +} + +// insertTxCache, insert the given txobj into the cache. +// if the tx cache is determined to be full, it will release +// an old entry in FIFO order. +func (db *LevelDb) insertTxCache(txObj *txCacheObj) { + tc := &db.txCache + + tc.cacheLock.Lock() + defer tc.cacheLock.Unlock() + + tc.fifo.PushBack(txObj) + + if tc.fifo.Len() >= tc.maxcount { + listobj := tc.fifo.Front() + tc.fifo.Remove(listobj) + tailObj, ok := listobj.Value.(*txCacheObj) + if ok { + delete(tc.txMap, tailObj.sha) + } else { + panic("invalid type pushed on tx list") + } + + } + + tc.txMap[txObj.sha] = txObj +} + +// InvalidateTxCache clear/release all cached transactions. +func (db *LevelDb) InvalidateTxCache() { + tc := &db.txCache + tc.cacheLock.Lock() + defer tc.cacheLock.Unlock() + tc.txMap = map[btcwire.ShaHash]*txCacheObj{} + tc.fifo = list.List{} +} + +// InvalidateTxCache clear/release all cached blocks. +func (db *LevelDb) InvalidateBlockCache() { + bc := &db.blockCache + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + bc.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bc.blockHeightMap = map[int64]*blockCacheObj{} + bc.fifo = list.List{} +} + +// InvalidateCache clear/release all cached blocks and transactions. +func (db *LevelDb) InvalidateCache() { + db.InvalidateTxCache() + db.InvalidateBlockCache() +} diff --git a/leveldb/doc.go b/leveldb/doc.go new file mode 100644 index 000000000..65906a937 --- /dev/null +++ b/leveldb/doc.go @@ -0,0 +1,15 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package sqlite3 implements a sqlite3 instance of btcdb. + +sqlite provides a zero setup, single file database. It requires cgo +and the presence of the sqlite library and headers, but nothing else. +The performance is generally high although it goes down with database +size. + +Many of the block or tx specific functions for btcdb are in this subpackage. +*/ +package ldb diff --git a/leveldb/insertremove_test.go b/leveldb/insertremove_test.go new file mode 100644 index 000000000..27fa248df --- /dev/null +++ b/leveldb/insertremove_test.go @@ -0,0 +1,209 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb_test + +import ( + "github.com/conformal/btcdb" + "github.com/conformal/btcdb/sqlite3" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "os" + "path/filepath" + "testing" +) + +var tstBlocks []*btcutil.Block + +func loadblocks(t *testing.T) []*btcutil.Block { + if len(tstBlocks) != 0 { + return tstBlocks + } + + testdatafile := filepath.Join("testdata", "blocks1-256.bz2") + blocks, err := loadBlocks(t, testdatafile) + if err != nil { + t.Errorf("Unable to load blocks from test data: %v", err) + return nil + } + tstBlocks = blocks + return blocks +} + +func TestUnspentInsert(t *testing.T) { + testUnspentInsert(t, dbTmDefault) + testUnspentInsert(t, dbTmNormal) + testUnspentInsert(t, dbTmFast) +} + +// insert every block in the test chain +// after each insert, fetch all the tx affected by the latest +// block and verify that the the tx is spent/unspent +// new tx should be fully unspent, referenced tx should have +// the associated txout set to spent. +func testUnspentInsert(t *testing.T, mode int) { + // Ignore db remove errors since it means we didn't have an old one. + dbname := "tstdbuspnt1" + _ = os.Remove(dbname) + db, err := btcdb.CreateDB("sqlite", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer os.Remove(dbname) + defer db.Close() + + switch mode { + case dbTmDefault: // default + // no setup + case dbTmNormal: // explicit normal + db.SetDBInsertMode(btcdb.InsertNormal) + case dbTmFast: // fast mode + db.SetDBInsertMode(btcdb.InsertFast) + if sqldb, ok := db.(*sqlite3.LevelDb); ok { + sqldb.TempTblMax = 100 + } else { + t.Errorf("not right type") + } + case dbTmNoVerify: // validated block + t.Errorf("UnspentInsert test is not valid in NoVerify mode") + } + + // Since we are dealing with small dataset, reduce cache size + sqlite3.SetBlockCacheSize(db, 2) + sqlite3.SetTxCacheSize(db, 3) + + blocks := loadblocks(t) +endtest: + for height := int64(0); height < int64(len(blocks)); height++ { + + block := blocks[height] + // look up inputs to this x + mblock := block.MsgBlock() + var txneededList []*btcwire.ShaHash + var txlookupList []*btcwire.ShaHash + var txOutList []*btcwire.ShaHash + var txInList []*btcwire.OutPoint + for _, tx := range mblock.Transactions { + for _, txin := range tx.TxIn { + if txin.PreviousOutpoint.Index == uint32(4294967295) { + continue + } + origintxsha := &txin.PreviousOutpoint.Hash + + txInList = append(txInList, &txin.PreviousOutpoint) + txneededList = append(txneededList, origintxsha) + txlookupList = append(txlookupList, origintxsha) + + if !db.ExistsTxSha(origintxsha) { + t.Errorf("referenced tx not found %v ", origintxsha) + } + + } + txshaname, _ := tx.TxSha(block.ProtocolVersion()) + txlookupList = append(txlookupList, &txshaname) + txOutList = append(txOutList, &txshaname) + } + + txneededmap := map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist := db.FetchTxByShaList(txneededList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txneededmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txneededmap[spend.Hash] + if itxe.TxSpent[spend.Index] == true { + t.Errorf("txin %v:%v is already spent", spend.Hash, spend.Index) + } + } + + newheight, err := db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break endtest + } + if newheight != height { + t.Errorf("height mismatch expect %v returned %v", height, newheight) + break endtest + } + + txlookupmap := map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == false { + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + for _, txo := range txOutList { + itxe := txlookupmap[*txo] + for i, spent := range itxe.TxSpent { + if spent == true { + t.Errorf("freshly inserted tx %v already spent %v", txo, i) + } + } + + } + if len(txInList) == 0 { + continue + } + dropblock := blocks[height-1] + dropsha, _ := dropblock.Sha() + + err = db.DropAfterBlockBySha(dropsha) + if err != nil { + t.Errorf("failed to drop block %v err %v", height, err) + break endtest + } + + txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + if _, ok := txneededmap[*txe.Sha]; ok { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == true { + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + newheight, err = db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break endtest + } + txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == false { + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + } +} diff --git a/leveldb/internal_test.go b/leveldb/internal_test.go new file mode 100644 index 000000000..08706154c --- /dev/null +++ b/leveldb/internal_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb + +import ( + "fmt" + "github.com/conformal/btcdb" + "github.com/conformal/btcwire" +) + +// FetchSha returns the datablock and pver for the given ShaHash. +// This is a testing only interface. +func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32, + blkid int64, err error) { + sqldb, ok := db.(*LevelDb) + if !ok { + err = fmt.Errorf("Invalid data type") + return + } + buf, pver, blkid, err = sqldb.fetchSha(*sha) + return +} + +// SetBlockCacheSize configures the maximum number of blocks in the cache to +// be the given size should be made before any fetching. +// This is a testing only interface. +func SetBlockCacheSize(db btcdb.Db, newsize int) { + sqldb, ok := db.(*LevelDb) + if !ok { + return + } + bc := &sqldb.blockCache + bc.maxcount = newsize +} + +// SetTxCacheSize configures the maximum number of tx in the cache to +// be the given size should be made before any fetching. +// This is a testing only interface. +func SetTxCacheSize(db btcdb.Db, newsize int) { + sqldb, ok := db.(*LevelDb) + if !ok { + return + } + tc := &sqldb.txCache + tc.maxcount = newsize +} diff --git a/leveldb/leveldb.go b/leveldb/leveldb.go new file mode 100644 index 000000000..d44d0e6e4 --- /dev/null +++ b/leveldb/leveldb.go @@ -0,0 +1,798 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb + +import ( + "database/sql" + "fmt" + "github.com/conformal/btcdb" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "github.com/conformal/seelog" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + _ "github.com/mattn/go-sqlite3" + "os" + "sync" +) + +const ( + dbVersion int = 2 + dbMaxTransCnt = 20000 + dbMaxTransMem = 64 * 1024 * 1024 // 64 MB +) + +var log seelog.LoggerInterface = seelog.Disabled + +type tBlockInsertData struct { + sha btcwire.ShaHash + pver uint32 + buf []byte +} +type tTxInsertData struct { + txsha *btcwire.ShaHash + blockid int64 + txoff int + txlen int + usedbuf []byte +} + +type txState struct { + tx *sql.Tx + writeCount int + txDataSz int + txInsertList []interface{} +} +type LevelDb struct { + // to be removed + sqldb *sql.DB + blkStmts []*sql.Stmt + blkBaseStmts []*sql.Stmt + txStmts []*sql.Stmt + txBaseStmts []*sql.Stmt + txState txState + + // lock preventing multiple entry + dbLock sync.Mutex + + // leveldb pieces + bShaDb *leveldb.DB + bBlkDb []*leveldb.DB + tShaDb *leveldb.DB + tLocDb []*leveldb.DB + tSpentDb []*leveldb.DB + blkOpen int + txOpen int + txSpentOpen int + ro *opt.ReadOptions + wo *opt.WriteOptions + + lastBlkShaCached bool + lastBlkSha btcwire.ShaHash + lastBlkIdx int64 + txCache txCache + blockCache blockCache + + UseTempTX bool + TempTblSz int + TempTblMax int + + dbInsertMode btcdb.InsertMode +} + +var self = btcdb.DriverDB{DbType: "sqlite", Create: CreateSqliteDB, Open: OpenSqliteDB} + +func init() { + btcdb.AddDBDriver(self) +} + +// createDB configure the database, setting up all tables to initial state. +func createDB(db *sql.DB) error { + log.Infof("Initializing new block database") + + // XXX check for old tables + buildTables := []string{ + "CREATE TABLE dbversion (version integer);", + "CREATE TABLE block ( blockid INTEGER PRIMARY KEY, key BLOB UNIQUE, " + + "pver INTEGER NOT NULL, data BLOB NOT NULL);", + "INSERT INTO dbversion (version) VALUES (" + fmt.Sprintf("%d", dbVersion) + + ");", + } + buildtxTables := []string{ + "CREATE TABLE tx (txidx INTEGER PRIMARY KEY, " + + "key TEXT, " + + "blockid INTEGER NOT NULL, " + + "txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " + + "data BLOB NOT NULL, " + + "FOREIGN KEY(blockid) REFERENCES block(blockid));", + "CREATE TABLE txtmp (key TEXT PRIMARY KEY, " + + "blockid INTEGER NOT NULL, " + + "txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " + + "data BLOB NOT NULL, " + + "FOREIGN KEY(blockid) REFERENCES block(blockid));", + "CREATE UNIQUE INDEX uniquetx ON tx (key);", + } + for _, sql := range buildTables { + _, err := db.Exec(sql) + if err != nil { + log.Warnf("sql table op failed %v [%v]", err, sql) + return err + } + } + for _, sql := range buildtxTables { + _, err := db.Exec(sql) + if err != nil { + log.Warnf("sql table op failed %v [%v]", err, sql) + return err + } + } + + return nil +} + +// OpenSqliteDB opens an existing database for use. +func OpenSqliteDB(filepath string) (pbdb btcdb.Db, err error) { + log = btcdb.GetLog() + return newOrCreateSqliteDB(filepath, false) +} + +// CreateSqliteDB creates, initializes and opens a database for use. +func CreateSqliteDB(filepath string) (pbdb btcdb.Db, err error) { + log = btcdb.GetLog() + return newOrCreateSqliteDB(filepath, true) +} + +// newOrCreateSqliteDB opens a database, either creating it or opens +// existing database based on flag. +func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error) { + var bdb LevelDb + if create == false { + _, err = os.Stat(filepath) + if err != nil { + return nil, btcdb.DbDoesNotExist + } + } + + db, err := sql.Open("sqlite3", filepath) + if err != nil { + log.Warnf("db open failed %v\n", err) + return nil, err + } + + dbverstmt, err := db.Prepare("SELECT version FROM dbversion;") + if err != nil { + // about the only reason this would fail is that the database + // is not initialized + if create == false { + return nil, btcdb.DbDoesNotExist + } + err = createDB(db) + if err != nil { + // already warned in the called function + return nil, err + } + dbverstmt, err = db.Prepare("SELECT version FROM dbversion;") + if err != nil { + // if it failed this a second time, fail. + return nil, err + } + + } + row := dbverstmt.QueryRow() + var version int + err = row.Scan(&version) + if err != nil { + log.Warnf("unable to find db version: no row\n", err) + } + switch version { + case dbVersion: + // all good + default: + log.Warnf("mismatch db version: %v expected %v\n", version, dbVersion) + return nil, fmt.Errorf("Invalid version in database") + } + db.Exec("PRAGMA foreign_keys = ON;") + db.Exec("PRAGMA journal_mode=WAL;") + bdb.sqldb = db + + bdb.blkStmts = make([]*sql.Stmt, len(blkqueries)) + bdb.blkBaseStmts = make([]*sql.Stmt, len(blkqueries)) + for i := range blkqueries { + stmt, err := db.Prepare(blkqueries[i]) + if err != nil { + // XXX log/ + return nil, err + } + bdb.blkBaseStmts[i] = stmt + } + for i := range bdb.blkBaseStmts { + bdb.blkStmts[i] = bdb.blkBaseStmts[i] + } + + bdb.txBaseStmts = make([]*sql.Stmt, len(txqueries)) + for i := range txqueries { + stmt, err := db.Prepare(txqueries[i]) + if err != nil { + // XXX log/ + return nil, err + } + bdb.txBaseStmts[i] = stmt + } + // NOTE: all array entries in txStmts remain nil'ed + // tx statements are lazy bound + bdb.txStmts = make([]*sql.Stmt, len(txqueries)) + + bdb.blockCache.maxcount = 150 + bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bdb.blockCache.blockHeightMap = map[int64]*blockCacheObj{} + bdb.txCache.maxcount = 2000 + bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{} + + bdb.UseTempTX = true + bdb.TempTblMax = 1000000 + + return &bdb, nil +} + +// Sync verifies that the database is coherent on disk, +// and no outstanding transactions are in flight. +func (db *LevelDb) Sync() { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + db.endTx(true) +} + +// syncPoint notifies the db that this is a safe time to sync the database, +// if there are many outstanding transactions. +// Must be called with db lock held. +func (db *LevelDb) syncPoint() { + + tx := &db.txState + + if db.TempTblSz > db.TempTblMax { + err := db.migrateTmpTable() + if err != nil { + return + } + } else { + if len(tx.txInsertList) > dbMaxTransCnt || tx.txDataSz > dbMaxTransMem { + db.endTx(true) + } + } +} + +// Close cleanly shuts down database, syncing all data. +func (db *LevelDb) Close() { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + db.close() +} + +// RollbackClose discards the recent database changes to the previously +// saved data at last Sync. +func (db *LevelDb) RollbackClose() { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + tx := &db.txState + if tx.tx != nil { + err := tx.tx.Rollback() + if err != nil { + log.Debugf("Rollback failed: %v", err) + } else { + tx.tx = nil + } + } + db.close() +} + +// close performs the internal shutdown/close operation. +func (db *LevelDb) close() { + db.endTx(true) + + db.InvalidateCache() + + for i := range db.blkBaseStmts { + db.blkBaseStmts[i].Close() + } + for i := range db.txBaseStmts { + if db.txBaseStmts[i] != nil { + db.txBaseStmts[i].Close() + db.txBaseStmts[i] = nil + } + } + db.sqldb.Close() +} + +// txop returns the appropriately prepared statement, based on +// transaction state of the database. +func (db *LevelDb) txop(op int) *sql.Stmt { + if db.txStmts[op] != nil { + return db.txStmts[op] + } + if db.txState.tx == nil { + // we are not in a transaction, return the base statement + return db.txBaseStmts[op] + } + + if db.txStmts[op] == nil { + db.txStmts[op] = db.txState.tx.Stmt(db.txBaseStmts[op]) + } + + return db.txStmts[op] +} + +// startTx starts a transaction, preparing or scrubbing statements +// for proper operation inside a transaction. +func (db *LevelDb) startTx() (err error) { + tx := &db.txState + if tx.tx != nil { + // this shouldn't happen... + log.Warnf("Db startTx called while in a transaction") + return + } + tx.tx, err = db.sqldb.Begin() + if err != nil { + log.Warnf("Db startTx: begin failed %v", err) + tx.tx = nil + return + } + for i := range db.blkBaseStmts { + db.blkStmts[i] = tx.tx.Stmt(db.blkBaseStmts[i]) + } + for i := range db.txBaseStmts { + db.txStmts[i] = nil // these are lazily prepared + } + return +} + +// endTx commits the current active transaction, it zaps all of the prepared +// statements associated with the transaction. +func (db *LevelDb) endTx(recover bool) (err error) { + tx := &db.txState + + if tx.tx == nil { + return + } + + err = tx.tx.Commit() + if err != nil && recover { + // XXX - double check that the tx is dead after + // commit failure (rollback?) + + log.Warnf("Db endTx: commit failed %v", err) + err = db.rePlayTransaction() + if err != nil { + // We tried, return failure (after zeroing state) + // so the upper level can notice and restart + } + } + for i := range db.blkBaseStmts { + db.blkStmts[i].Close() + db.blkStmts[i] = db.blkBaseStmts[i] + } + for i := range db.txStmts { + if db.txStmts[i] != nil { + db.txStmts[i].Close() + db.txStmts[i] = nil + } + } + tx.tx = nil + var emptyTxList []interface{} + tx.txInsertList = emptyTxList + tx.txDataSz = 0 + return +} + +// rePlayTransaction will attempt to re-execute inserts performed +// sync the beginning of a transaction. This is to be used after +// a sql Commit operation fails to keep the database from losing data. +func (db *LevelDb) rePlayTransaction() (err error) { + err = db.startTx() + if err != nil { + return + } + tx := &db.txState + for _, ins := range tx.txInsertList { + switch v := ins.(type) { + case tBlockInsertData: + block := v + _, err = db.blkStmts[blkInsertSha].Exec(block.sha.Bytes(), + block.pver, block.buf) + if err != nil { + break + } + case tTxInsertData: + txd := v + txnamebytes := txd.txsha.Bytes() + txop := db.txop(txInsertStmt) + _, err = txop.Exec(txd.blockid, txnamebytes, txd.txoff, + txd.txlen, txd.usedbuf) + if err != nil { + break + } + } + } + // This function is called even if we have failed. + // We need to clean up so the database can be used again. + // However we want the original error not any new error, + // unless there was no original error but the commit fails. + err2 := db.endTx(false) + if err == nil && err2 != nil { + err = err2 + } + + return +} + +// DropAfterBlockBySha will remove any blocks from the database after the given block. +// It terminates any existing transaction and performs its operations in an +// atomic transaction, it is terminated (committed) before exit. +func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) { + var row *sql.Row + db.dbLock.Lock() + defer db.dbLock.Unlock() + + // This is a destructive operation and involves multiple requests + // so requires a transaction, terminate any transaction to date + // and start a new transaction + err = db.endTx(true) + if err != nil { + return err + } + err = db.startTx() + if err != nil { + return err + } + + var startheight int64 + + if db.lastBlkShaCached { + startheight = db.lastBlkIdx + } else { + querystr := "SELECT blockid FROM block ORDER BY blockid DESC;" + + tx := &db.txState + if tx.tx != nil { + row = tx.tx.QueryRow(querystr) + } else { + row = db.sqldb.QueryRow(querystr) + } + var startblkidx int64 + err = row.Scan(&startblkidx) + if err != nil { + log.Warnf("DropAfterBlockBySha:unable to fetch blockheight %v", err) + return err + } + startheight = startblkidx + } + // also drop any cached sha data + db.lastBlkShaCached = false + + querystr := "SELECT blockid FROM block WHERE key = ?;" + + tx := &db.txState + row = tx.tx.QueryRow(querystr, sha.Bytes()) + + var keepidx int64 + err = row.Scan(&keepidx) + if err != nil { + // XXX + db.endTx(false) + return err + } + + for height := startheight; height > keepidx; height = height - 1 { + var blk *btcutil.Block + blkc, ok := db.fetchBlockHeightCache(height) + + if ok { + blk = blkc.blk + } else { + // must load the block from the db + sha, err = db.fetchBlockShaByHeight(height - 1) + if err != nil { + return + } + + var buf []byte + var pver uint32 + + buf, pver, _, err = db.fetchSha(*sha) + if err != nil { + return + } + + blk, err = btcutil.NewBlockFromBytes(buf, pver) + if err != nil { + return + } + } + + for _, tx := range blk.MsgBlock().Transactions { + err = db.unSpend(tx) + if err != nil { + return + } + } + } + + // invalidate the cache after possibly using cached entries for block + // lookup to unspend coins in them + db.InvalidateCache() + + _, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx) + if err != nil { + // XXX + db.endTx(false) + return err + } + + _, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", keepidx) + if err != nil { + // XXX + db.endTx(false) + return err + } + + // delete from block last in case of foreign keys + _, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", keepidx) + if err != nil { + // XXX + db.endTx(false) + return err + } + + err = db.endTx(true) + if err != nil { + return err + } + return +} + +// InsertBlock inserts raw block and transaction data from a block into the +// database. The first block inserted into the database will be treated as the +// genesis block. Every subsequent block insert requires the referenced parent +// block to already exist. +func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + blocksha, err := block.Sha() + if err != nil { + log.Warnf("Failed to compute block sha %v", blocksha) + return + } + mblock := block.MsgBlock() + rawMsg, pver, err := block.Bytes() + if err != nil { + log.Warnf("Failed to obtain raw block sha %v", blocksha) + return + } + txloc, err := block.TxLoc() + if err != nil { + log.Warnf("Failed to obtain raw block sha %v", blocksha) + return + } + + // Insert block into database + newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock, + pver, rawMsg) + if err != nil { + log.Warnf("Failed to insert block %v %v %v", blocksha, + &mblock.Header.PrevBlock, err) + return + } + + // At least two blocks in the long past were generated by faulty + // miners, the sha of the transaction exists in a previous block, + // detect this condition and 'accept' the block. + for txidx, tx := range mblock.Transactions { + var txsha btcwire.ShaHash + txsha, err = tx.TxSha(pver) + if err != nil { + log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err) + return + } + // Some old blocks contain duplicate transactions + // Attempt to cleanly bypass this problem + // http://blockexplorer.com/b/91842 + // http://blockexplorer.com/b/91880 + if newheight == 91842 { + dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599") + if err != nil { + panic("invalid sha string in source") + } + if txsha == *dupsha { + log.Tracef("skipping sha %v %v", dupsha, newheight) + continue + } + } + if newheight == 91880 { + dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468") + if err != nil { + panic("invalid sha string in source") + } + if txsha == *dupsha { + log.Tracef("skipping sha %v %v", dupsha, newheight) + continue + } + } + spentbuflen := (len(tx.TxOut) + 7) / 8 + spentbuf := make([]byte, spentbuflen, spentbuflen) + if len(tx.TxOut)%8 != 0 { + for i := uint(len(tx.TxOut) % 8); i < 8; i++ { + spentbuf[spentbuflen-1] |= (byte(1) << i) + } + } + + err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf) + if err != nil { + log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, &txsha, txidx, err) + var oBlkIdx int64 + oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha) + log.Warnf("oblkidx %v err %v", oBlkIdx, err) + + return + } + err = db.doSpend(tx) + if err != nil { + log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err) + + return + } + } + db.syncPoint() + return newheight, nil +} + +// SetDBInsertMode provides hints to the database to how the application +// is running this allows the database to work in optimized modes when the +// database may be very busy. +func (db *LevelDb) SetDBInsertMode(newmode btcdb.InsertMode) { + + oldMode := db.dbInsertMode + switch newmode { + case btcdb.InsertNormal: + // Normal mode inserts tx directly into the tx table + db.UseTempTX = false + db.dbInsertMode = newmode + switch oldMode { + case btcdb.InsertFast: + if db.TempTblSz != 0 { + err := db.migrateTmpTable() + if err != nil { + return + } + } + case btcdb.InsertValidatedInput: + // generate tx indexes + txop := db.txop(txMigrateFinish) + _, err := txop.Exec() + if err != nil { + log.Warnf("Failed to create tx table index - %v", err) + } + } + case btcdb.InsertFast: + // Fast mode inserts tx into txtmp with validation, + // then dumps to tx then rebuilds indexes at thresholds + db.UseTempTX = true + if oldMode != btcdb.InsertNormal { + log.Warnf("switching between invalid DB modes") + break + } + db.dbInsertMode = newmode + case btcdb.InsertValidatedInput: + // ValidatedInput mode inserts into tx table with + // no duplicate checks, then builds index on exit from + // ValidatedInput mode + if oldMode != btcdb.InsertNormal { + log.Warnf("switching between invalid DB modes") + break + } + // remove tx table index + txop := db.txop(txMigratePrep) + _, err := txop.Exec() + if err != nil { + log.Warnf("Failed to clear tx table index - %v", err) + } + db.dbInsertMode = newmode + + // XXX + db.UseTempTX = false + } +} +func (db *LevelDb) doSpend(tx *btcwire.MsgTx) error { + for txinidx := range tx.TxIn { + txin := tx.TxIn[txinidx] + + inTxSha := txin.PreviousOutpoint.Hash + inTxidx := txin.PreviousOutpoint.Index + + if inTxidx == ^uint32(0) { + continue + } + + //log.Infof("spending %v %v", &inTxSha, inTxidx) + + err := db.setSpentData(&inTxSha, inTxidx) + if err != nil { + return err + } + } + return nil +} + +func (db *LevelDb) unSpend(tx *btcwire.MsgTx) error { + for txinidx := range tx.TxIn { + txin := tx.TxIn[txinidx] + + inTxSha := txin.PreviousOutpoint.Hash + inTxidx := txin.PreviousOutpoint.Index + + if inTxidx == ^uint32(0) { + continue + } + + err := db.clearSpentData(&inTxSha, inTxidx) + if err != nil { + return err + } + } + return nil +} + +func (db *LevelDb) setSpentData(sha *btcwire.ShaHash, idx uint32) error { + return db.setclearSpentData(sha, idx, true) +} + +func (db *LevelDb) clearSpentData(sha *btcwire.ShaHash, idx uint32) error { + return db.setclearSpentData(sha, idx, false) +} + +func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set bool) error { + var spentdata []byte + usingtmp := false + txop := db.txop(txFetchUsedByShaStmt) + row := txop.QueryRow(txsha.String()) + err := row.Scan(&spentdata) + if err != nil { + // if the error is simply didn't fine continue otherwise + // retun failure + + usingtmp = true + txop = db.txop(txtmpFetchUsedByShaStmt) + row := txop.QueryRow(txsha.String()) + err := row.Scan(&spentdata) + if err != nil { + log.Warnf("Failed to locate spent data - %v %v", txsha, err) + return err + } + } + byteidx := idx / 8 + byteoff := idx % 8 + + if set { + spentdata[byteidx] |= (byte(1) << byteoff) + } else { + spentdata[byteidx] &= ^(byte(1) << byteoff) + } + txc, cached := db.fetchTxCache(txsha) + if cached { + txc.spent = spentdata + } + + if usingtmp { + txop = db.txop(txtmpUpdateUsedByShaStmt) + } else { + txop = db.txop(txUpdateUsedByShaStmt) + } + _, err = txop.Exec(spentdata, txsha.String()) + + return err +} diff --git a/leveldb/operational_test.go b/leveldb/operational_test.go new file mode 100644 index 000000000..71c924303 --- /dev/null +++ b/leveldb/operational_test.go @@ -0,0 +1,420 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb_test + +import ( + "compress/bzip2" + "encoding/binary" + "github.com/conformal/btcdb" + "github.com/conformal/btcdb/sqlite3" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "io" + "os" + "path/filepath" + "strings" + "testing" +) + +var network = btcwire.MainNet + +const ( + dbTmDefault = iota + dbTmNormal + dbTmFast + dbTmNoVerify +) + +func TestOperational(t *testing.T) { + testOperationalMode(t, dbTmDefault) + testOperationalMode(t, dbTmNormal) + testOperationalMode(t, dbTmFast) + testOperationalMode(t, dbTmNoVerify) +} + +func testOperationalMode(t *testing.T, mode int) { + // simplified basic operation is: + // 1) fetch block from remote server + // 2) look up all txin (except coinbase in db) + // 3) insert block + + // Ignore db remove errors since it means we didn't have an old one. + dbname := "tstdbop1" + _ = os.Remove(dbname) + db, err := btcdb.CreateDB("sqlite", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer os.Remove(dbname) + defer db.Close() + + switch mode { + case dbTmDefault: // default + // no setup + case dbTmNormal: // explicit normal + db.SetDBInsertMode(btcdb.InsertNormal) + case dbTmFast: // fast mode + db.SetDBInsertMode(btcdb.InsertFast) + if sqldb, ok := db.(*sqlite3.LevelDb); ok { + sqldb.TempTblMax = 100 + } else { + t.Errorf("not right type") + } + case dbTmNoVerify: // validated block + db.SetDBInsertMode(btcdb.InsertValidatedInput) + } + + // Since we are dealing with small dataset, reduce cache size + sqlite3.SetBlockCacheSize(db, 2) + sqlite3.SetTxCacheSize(db, 3) + + testdatafile := filepath.Join("testdata", "blocks1-256.bz2") + blocks, err := loadBlocks(t, testdatafile) + if err != nil { + t.Errorf("Unable to load blocks from test data for mode %v: %v", + mode, err) + return + } + + err = nil +out: + for height := int64(0); height < int64(len(blocks)); height++ { + block := blocks[height] + if mode != dbTmNoVerify { + // except for NoVerify which does not allow lookups check inputs + mblock := block.MsgBlock() + var txneededList []*btcwire.ShaHash + for _, tx := range mblock.Transactions { + for _, txin := range tx.TxIn { + if txin.PreviousOutpoint.Index == uint32(4294967295) { + continue + } + origintxsha := &txin.PreviousOutpoint.Hash + txneededList = append(txneededList, origintxsha) + + if !db.ExistsTxSha(origintxsha) { + t.Errorf("referenced tx not found %v ", origintxsha) + } + + _, _, _, _, err := db.FetchTxAllBySha(origintxsha) + if err != nil { + t.Errorf("referenced tx not found %v err %v ", origintxsha, err) + } + _, _, _, _, err = db.FetchTxAllBySha(origintxsha) + if err != nil { + t.Errorf("referenced tx not found %v err %v ", origintxsha, err) + } + _, _, _, err = db.FetchTxBySha(origintxsha) + if err != nil { + t.Errorf("referenced tx not found %v err %v ", origintxsha, err) + } + _, _, err = db.FetchTxBufBySha(origintxsha) + if err != nil { + t.Errorf("referenced tx not found %v err %v ", origintxsha, err) + } + _, err = db.FetchTxUsedBySha(origintxsha) + if err != nil { + t.Errorf("tx used fetch fail %v err %v ", origintxsha, err) + } + } + } + txlist := db.FetchTxByShaList(txneededList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break out + } + } + + } + + newheight, err := db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break out + } + if newheight != height { + t.Errorf("height mismatch expect %v returned %v", height, newheight) + break out + } + + newSha, blkid, err := db.NewestSha() + if err != nil { + t.Errorf("failed to obtain latest sha %v %v", height, err) + } + + if blkid != height { + t.Errorf("height doe not match latest block height %v %v", blkid, height, err) + } + + blkSha, _ := block.Sha() + if *newSha != *blkSha { + t.Errorf("Newest block sha does not match freshly inserted one %v %v ", newSha, blkSha, err) + } + } + + // now that db is populated, do some additional test + testFetchRangeHeight(t, db, blocks) + + switch mode { + case dbTmDefault: // default + // no cleanup + case dbTmNormal: // explicit normal + // no cleanup + case dbTmFast: // fast mode + db.SetDBInsertMode(btcdb.InsertNormal) + case dbTmNoVerify: // validated block + db.SetDBInsertMode(btcdb.InsertNormal) + } +} + +func TestBackout(t *testing.T) { + testBackout(t, dbTmDefault) + testBackout(t, dbTmNormal) + testBackout(t, dbTmFast) +} + +func testBackout(t *testing.T, mode int) { + // simplified basic operation is: + // 1) fetch block from remote server + // 2) look up all txin (except coinbase in db) + // 3) insert block + + // Ignore db remove errors since it means we didn't have an old one. + dbname := "tstdbop2" + _ = os.Remove(dbname) + db, err := btcdb.CreateDB("sqlite", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer os.Remove(dbname) + defer db.Close() + + switch mode { + case dbTmDefault: // default + // no setup + case dbTmNormal: // explicit normal + db.SetDBInsertMode(btcdb.InsertNormal) + case dbTmFast: // fast mode + db.SetDBInsertMode(btcdb.InsertFast) + if sqldb, ok := db.(*sqlite3.LevelDb); ok { + sqldb.TempTblMax = 100 + } else { + t.Errorf("not right type") + } + } + + // Since we are dealing with small dataset, reduce cache size + sqlite3.SetBlockCacheSize(db, 2) + sqlite3.SetTxCacheSize(db, 3) + + testdatafile := filepath.Join("testdata", "blocks1-256.bz2") + blocks, err := loadBlocks(t, testdatafile) + if len(blocks) < 120 { + t.Errorf("test data too small") + return + } + + err = nil + for height := int64(0); height < int64(len(blocks)); height++ { + if height == 100 { + t.Logf("Syncing at block height 100") + db.Sync() + } + if height == 120 { + t.Logf("Simulating unexpected application quit") + // Simulate unexpected application quit + db.RollbackClose() + break + } + + block := blocks[height] + + newheight, err := db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break + } + if newheight != height { + t.Errorf("height mismatch expect %v returned %v", height, newheight) + break + } + } + + // db was closed at height 120, so no cleanup is possible. + + // reopen db + db, err = btcdb.OpenDB("sqlite", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer db.Close() + + sha, err := blocks[99].Sha() + if err != nil { + t.Errorf("failed to get block 99 sha err %v", err) + return + } + _ = db.ExistsSha(sha) + _, err = db.FetchBlockBySha(sha) + if err != nil { + t.Errorf("failed to load block 99 from db %v", err) + } + + sha, err = blocks[110].Sha() + if err != nil { + t.Errorf("failed to get block 110 sha err %v", err) + return + } + _ = db.ExistsSha(sha) + _, err = db.FetchBlockBySha(sha) + if err == nil { + t.Errorf("loaded block 110 from db, failure expected") + return + } + + block := blocks[110] + mblock := block.MsgBlock() + txsha, err := mblock.Transactions[0].TxSha(block.ProtocolVersion()) + exists := db.ExistsTxSha(&txsha) + if exists { + t.Errorf("tx %v exists in db, failure expected", txsha) + } + + _, _, _, err = db.FetchTxBySha(&txsha) + _, err = db.FetchTxUsedBySha(&txsha) + + block = blocks[99] + mblock = block.MsgBlock() + txsha, err = mblock.Transactions[0].TxSha(block.ProtocolVersion()) + oldused, err := db.FetchTxUsedBySha(&txsha) + err = db.InsertTx(&txsha, 99, 1024, 1048, oldused) + if err == nil { + t.Errorf("dup insert of tx succeeded") + return + } +} + +func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) { + testdatafile := filepath.Join("testdata", "blocks1-256.bz2") + var dr io.Reader + var fi io.ReadCloser + fi, err = os.Open(testdatafile) + if err != nil { + t.Errorf("failed to open file %v, err %v", testdatafile, err) + return + } + if strings.HasSuffix(testdatafile, ".bz2") { + z := bzip2.NewReader(fi) + dr = z + } else { + dr = fi + } + + defer func() { + if err := fi.Close(); err != nil { + t.Errorf("failed to close file %v %v", testdatafile, err) + } + }() + + // Set the first block as the genesis block. + genesis := btcutil.NewBlock(&btcwire.GenesisBlock, btcwire.ProtocolVersion) + blocks = append(blocks, genesis) + + var block *btcutil.Block + err = nil + for height := int64(1); err == nil; height++ { + var rintbuf uint32 + err = binary.Read(dr, binary.LittleEndian, &rintbuf) + if err == io.EOF { + // hit end of file at expected offset: no warning + height-- + err = nil + break + } + if err != nil { + t.Errorf("failed to load network type, err %v", err) + break + } + if rintbuf != uint32(network) { + t.Errorf("Block doesn't match network: %v expects %v", + rintbuf, network) + break + } + err = binary.Read(dr, binary.LittleEndian, &rintbuf) + blocklen := rintbuf + + rbytes := make([]byte, blocklen) + + // read block + dr.Read(rbytes) + + var pver uint32 + switch { + case height < 200000: + pver = 1 + case height >= 200000: + pver = 2 + } + block, err = btcutil.NewBlockFromBytes(rbytes, pver) + if err != nil { + t.Errorf("failed to parse block %v", height) + return + } + blocks = append(blocks, block) + } + return +} + +func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { + + var testincrement int64 = 50 + var testcnt int64 = 100 + + shanames := make([]*btcwire.ShaHash, len(blocks)) + + nBlocks := int64(len(blocks)) + + for i := range blocks { + blockSha, err := blocks[i].Sha() + if err != nil { + t.Errorf("FetchRangeHeight: unexpected failure computing block sah %v", err) + } + shanames[i] = blockSha + } + + for startheight := int64(0); startheight < nBlocks; startheight += testincrement { + endheight := startheight + testcnt + + if endheight > nBlocks { + endheight = btcdb.AllShas + } + + shalist, err := db.FetchHeightRange(startheight, endheight) + if err != nil { + t.Errorf("FetchRangeHeight: unexpected failure looking up shas %v", err) + } + + if endheight == btcdb.AllShas { + if int64(len(shalist)) != nBlocks-startheight { + t.Errorf("FetchRangeHeight: expected A %v shas, got %v", nBlocks-startheight, len(shalist)) + } + } else { + if int64(len(shalist)) != testcnt { + t.Errorf("FetchRangeHeight: expected %v shas, got %v", testcnt, len(shalist)) + } + } + + for i := range shalist { + if *shanames[int64(i)+startheight] != shalist[i] { + t.Errorf("FetchRangeHeight: mismatch sha at %v requested range %v %v ", int64(i)+startheight, startheight, endheight) + } + } + } + +} diff --git a/leveldb/testdata/blocks1-256.bz2 b/leveldb/testdata/blocks1-256.bz2 new file mode 100644 index 000000000..6b8bda442 Binary files /dev/null and b/leveldb/testdata/blocks1-256.bz2 differ diff --git a/leveldb/tx.go b/leveldb/tx.go new file mode 100644 index 000000000..c233777b9 --- /dev/null +++ b/leveldb/tx.go @@ -0,0 +1,335 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb + +import ( + "database/sql" + "github.com/conformal/btcdb" + "github.com/conformal/btcwire" + _ "github.com/mattn/go-sqlite3" +) + +// InsertTx inserts a tx hash and its associated data into the database. +func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + return db.insertTx(txsha, height, txoff, txlen, usedbuf) +} + +// insertTx inserts a tx hash and its associated data into the database. +// Must be called with db lock held. +func (db *LevelDb) insertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) { + + tx := &db.txState + if tx.tx == nil { + err = db.startTx() + if err != nil { + return + } + } + blockid := height + 1 + txd := tTxInsertData{txsha: txsha, blockid: blockid, txoff: txoff, txlen: txlen, usedbuf: usedbuf} + + log.Tracef("inserting tx %v for block %v off %v len %v", + txsha, blockid, txoff, txlen) + + rowBytes := txsha.String() + + var op int // which table to insert data into. + if db.UseTempTX { + var tblockid int64 + var ttxoff int + var ttxlen int + txop := db.txop(txFetchLocationByShaStmt) + row := txop.QueryRow(rowBytes) + err = row.Scan(&tblockid, &ttxoff, &ttxlen) + if err != sql.ErrNoRows { + // sha already present + err = btcdb.DuplicateSha + return + } + op = txtmpInsertStmt + } else { + op = txInsertStmt + } + + txop := db.txop(op) + _, err = txop.Exec(rowBytes, blockid, txoff, txlen, usedbuf) + if err != nil { + log.Warnf("failed to insert %v %v %v", txsha, blockid, err) + return + } + if db.UseTempTX { + db.TempTblSz++ + } + + // put in insert list for replay + tx.txInsertList = append(tx.txInsertList, txd) + + return +} + +// ExistsTxSha returns if the given tx sha exists in the database +func (db *LevelDb) ExistsTxSha(txsha *btcwire.ShaHash) (exists bool) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + if _, ok := db.fetchTxCache(txsha); ok { + return true + } + + return db.existsTxSha(txsha) +} + +// existsTxSha returns if the given tx sha exists in the database.o +// Must be called with the db lock held. +func (db *LevelDb) existsTxSha(txsha *btcwire.ShaHash) (exists bool) { + var blockid uint32 + + txop := db.txop(txExistsShaStmt) + row := txop.QueryRow(txsha.String()) + err := row.Scan(&blockid) + + if err == sql.ErrNoRows { + txop = db.txop(txtmpExistsShaStmt) + row = txop.QueryRow(txsha.String()) + err := row.Scan(&blockid) + + if err == sql.ErrNoRows { + return false + } + if err != nil { + log.Warnf("txTmpExistsTxSha: fail %v", err) + return false + } + log.Warnf("txtmpExistsTxSha: success") + return true + } + + if err != nil { + // ignore real errors? + log.Warnf("existsTxSha: fail %v", err) + return false + } + + return true +} + +// FetchLocationBySha looks up the Tx sha information by name. +func (db *LevelDb) FetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + return db.fetchLocationBySha(txsha) +} + +// fetchLocationBySha look up the Tx sha information by name. +// Must be called with db lock held. +func (db *LevelDb) fetchLocationBySha(txsha *btcwire.ShaHash) (height int64, txoff int, txlen int, err error) { + var row *sql.Row + var blockid int64 + var ttxoff int + var ttxlen int + + rowBytes := txsha.String() + txop := db.txop(txFetchLocationByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &ttxoff, &ttxlen) + if err == sql.ErrNoRows { + txop = db.txop(txtmpFetchLocationByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &ttxoff, &ttxlen) + if err == sql.ErrNoRows { + err = btcdb.TxShaMissing + return + } + if err != nil { + log.Warnf("txtmp FetchLocationBySha: fail %v", + err) + return + } + } + if err != nil { + log.Warnf("FetchLocationBySha: fail %v", err) + return + } + height = blockid - 1 + txoff = ttxoff + txlen = ttxlen + return +} + +// fetchLocationUsedBySha look up the Tx sha information by name. +// Must be called with db lock held. +func (db *LevelDb) fetchLocationUsedBySha(txsha *btcwire.ShaHash) (rheight int64, rtxoff int, rtxlen int, rspentbuf []byte, err error) { + var row *sql.Row + var blockid int64 + var txoff int + var txlen int + var txspent []byte + + rowBytes := txsha.String() + txop := db.txop(txFetchLocUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &txoff, &txlen, &txspent) + if err == sql.ErrNoRows { + txop = db.txop(txtmpFetchLocUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &txoff, &txlen, &txspent) + if err == sql.ErrNoRows { + err = btcdb.TxShaMissing + return + } + if err != nil { + log.Warnf("txtmp FetchLocationBySha: fail %v", + err) + return + } + } + if err != nil { + log.Warnf("FetchLocationBySha: fail %v", err) + return + } + height := blockid - 1 + return height, txoff, txlen, txspent, nil +} + +// FetchTxUsedBySha returns the used/spent buffer for a given transaction. +func (db *LevelDb) FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error) { + var row *sql.Row + db.dbLock.Lock() + defer db.dbLock.Unlock() + + rowBytes := txsha.String() + txop := db.txop(txFetchUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + var databytes []byte + err = row.Scan(&databytes) + if err == sql.ErrNoRows { + txop := db.txop(txtmpFetchUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&databytes) + if err == sql.ErrNoRows { + err = btcdb.TxShaMissing + return + } + if err != nil { + log.Warnf("txtmp FetchLocationBySha: fail %v", + err) + return + } + } + + if err != nil { + log.Warnf("FetchUsedBySha: fail %v", err) + return + } + spentbuf = databytes + return +} + +var vaccumDbNextMigrate bool + +// migrateTmpTable functions to perform internal db optimization when +// performing large numbers of database inserts. When in Fast operation +// mode, it inserts into txtmp, then when that table reaches a certain +// size limit it moves all tx in the txtmp table into the primary tx +// table and recomputes the index on the primary tx table. +func (db *LevelDb) migrateTmpTable() error { + db.endTx(true) + db.startTx() // ??? + + db.UseTempTX = false + db.TempTblSz = 0 + + var doVacuum bool + var nsteps int + if vaccumDbNextMigrate { + nsteps = 6 + vaccumDbNextMigrate = false + doVacuum = true + } else { + nsteps = 5 + vaccumDbNextMigrate = true + } + + log.Infof("db compaction Stage 1/%v: Preparing", nsteps) + txop := db.txop(txMigratePrep) + _, err := txop.Exec() + if err != nil { + log.Warnf("Failed to prepare migrate - %v", err) + return err + } + + log.Infof("db compaction Stage 2/%v: Copying", nsteps) + txop = db.txop(txMigrateCopy) + _, err = txop.Exec() + if err != nil { + log.Warnf("Migrate read failed - %v", err) + return err + } + + log.Tracef("db compaction Stage 2a/%v: Enable db vacuum", nsteps) + txop = db.txop(txPragmaVacuumOn) + _, err = txop.Exec() + if err != nil { + log.Warnf("Migrate error trying to enable vacuum on "+ + "temporary transaction table - %v", err) + return err + } + + log.Infof("db compaction Stage 3/%v: Clearing old data", nsteps) + txop = db.txop(txMigrateClear) + _, err = txop.Exec() + if err != nil { + log.Warnf("Migrate error trying to clear temporary "+ + "transaction table - %v", err) + return err + } + + log.Tracef("db compaction Stage 3a/%v: Disable db vacuum", nsteps) + txop = db.txop(txPragmaVacuumOff) + _, err = txop.Exec() + if err != nil { + log.Warnf("Migrate error trying to disable vacuum on "+ + "temporary transaction table - %v", err) + return err + } + + log.Infof("db compaction Stage 4/%v: Rebuilding index", nsteps) + txop = db.txop(txMigrateFinish) + _, err = txop.Exec() + if err != nil { + log.Warnf("Migrate error trying to clear temporary "+ + "transaction table - %v", err) + return err + } + + log.Infof("db compaction Stage 5/%v: Finalizing transaction", nsteps) + db.endTx(true) // ??? + + if doVacuum { + log.Infof("db compaction Stage 6/%v: Optimizing database", nsteps) + txop = db.txop(txVacuum) + _, err = txop.Exec() + if err != nil { + log.Warnf("migrate error trying to clear txtmp tbl %v", err) + return err + } + } + + log.Infof("db compaction: Complete") + + // TODO(drahn) - determine if this should be turned back on or not + db.UseTempTX = true + + return nil +}