Added metadata table for mongodb

This commit is contained in:
kansi 2017-11-06 20:20:06 +05:30
parent 3b33cdb111
commit d9d0585228
12 changed files with 201 additions and 7 deletions

View File

@ -265,6 +265,16 @@ def write_assets(conn, assets):
return
@register_query(MongoDBConnection)
def write_metadata(conn, metadata):
try:
return conn.run(
conn.collection('metadata')
.insert_many(metadata, ordered=False))
except OperationError:
return
@register_query(MongoDBConnection)
def get_assets(conn, asset_ids):
return conn.run(
@ -273,6 +283,14 @@ def get_assets(conn, asset_ids):
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_metadata(conn, txn_ids):
return conn.run(
conn.collection('metadata')
.find({'id': {'$in': txn_ids}},
projection={'_id': False}))
@register_query(MongoDBConnection)
def count_blocks(conn):
return conn.run(

View File

@ -27,7 +27,7 @@ def create_database(conn, dbname):
@register_schema(MongoDBConnection)
def create_tables(conn, dbname):
for table_name in ['bigchain', 'backlog', 'votes', 'assets']:
for table_name in ['bigchain', 'backlog', 'votes', 'assets', 'metadata']:
logger.info('Create `%s` table.', table_name)
# create the table
# TODO: read and write concerns can be declared here
@ -40,6 +40,7 @@ def create_indexes(conn, dbname):
create_backlog_secondary_index(conn, dbname)
create_votes_secondary_index(conn, dbname)
create_assets_secondary_index(conn, dbname)
create_metadata_secondary_index(conn, dbname)
@register_schema(MongoDBConnection)
@ -121,3 +122,17 @@ def create_assets_secondary_index(conn, dbname):
# full text search index
conn.conn[dbname]['assets'].create_index([('$**', TEXT)], name='text')
def create_metadata_secondary_index(conn, dbname):
logger.info('Create `metadata` secondary index.')
# unique index on the id of the metadata.
# the id is the txid of the transaction for which the metadata
# was specified
conn.conn[dbname]['metadata'].create_index('id',
name='transaction_id',
unique=True)
# full text search index
conn.conn[dbname]['metadata'].create_index([('$**', TEXT)], name='text')

View File

@ -254,6 +254,19 @@ def write_assets(connection, assets):
raise NotImplementedError
@singledispatch
def write_metadata(connection, metadata):
"""Write a list of metadata to the metadata table.
Args:
metadata (list): a list of metadata to write.
Returns:
The database response.
"""
raise NotImplementedError
@singledispatch
def get_assets(connection, asset_ids):
"""Get a list of assets from the assets table.
@ -268,6 +281,20 @@ def get_assets(connection, asset_ids):
raise NotImplementedError
@singledispatch
def get_metadata(connection, txn_ids):
"""Get a list of metadata from the metadata table.
Args:
txn_ids (list): a list of ids for the metadata to be retrieved from
the database.
Returns:
metadata (list): the list of returned metadata.
"""
raise NotImplementedError
@singledispatch
def count_blocks(connection):
"""Count the number of blocks in the bigchain table.

View File

@ -173,6 +173,13 @@ def write_assets(connection, assets):
.insert(assets, durability=WRITE_DURABILITY))
@register_query(RethinkDBConnection)
def write_metadata(connection, metadata):
return connection.run(
r.table('metadata')
.insert(metadata, durability=WRITE_DURABILITY))
@register_query(RethinkDBConnection)
def get_assets(connection, asset_ids):
return connection.run(
@ -180,6 +187,13 @@ def get_assets(connection, asset_ids):
.get_all(*asset_ids))
@register_query(RethinkDBConnection)
def get_metadata(connection, txn_ids):
return connection.run(
r.table('metadata', read_mode=READ_MODE)
.get_all(*txn_ids))
@register_query(RethinkDBConnection)
def count_blocks(connection):
return connection.run(

View File

@ -23,7 +23,7 @@ def create_database(connection, dbname):
@register_schema(RethinkDBConnection)
def create_tables(connection, dbname):
for table_name in ['bigchain', 'backlog', 'votes', 'assets']:
for table_name in ['bigchain', 'backlog', 'votes', 'assets', 'metadata']:
logger.info('Create `%s` table.', table_name)
connection.run(r.db(dbname).table_create(table_name))

View File

@ -19,7 +19,7 @@ from bigchaindb.backend.connection import connect
logger = logging.getLogger(__name__)
TABLES = ('bigchain', 'backlog', 'votes', 'assets')
TABLES = ('bigchain', 'backlog', 'votes', 'assets', 'metadata')
@singledispatch

View File

@ -190,10 +190,15 @@ class Bigchain(object):
# get the asset ids from the block
if block_dict:
asset_ids = Block.get_asset_ids(block_dict)
txn_ids = Block.get_txn_ids(block_dict)
# get the assets from the database
assets = self.get_assets(asset_ids)
# get the metadata from the database
metadata = self.get_metadata(txn_ids)
# add the assets to the block transactions
block_dict = Block.couple_assets(block_dict, assets)
# add the metadata to the block transactions
block_dict = Block.couple_metadata(block_dict, metadata)
status = None
if include_status:
@ -508,10 +513,15 @@ class Bigchain(object):
# Decouple assets from block
assets, block_dict = block.decouple_assets()
metadatas, block_dict = block.decouple_metadata(block_dict)
# write the assets
if assets:
self.write_assets(assets)
if metadatas:
self.write_metadata(metadatas)
# write the block
return backend.query.write_block(self.connection, block_dict)
@ -622,6 +632,19 @@ class Bigchain(object):
"""
return backend.query.get_assets(self.connection, asset_ids)
def get_metadata(self, txn_ids):
"""
Return a list of metadata that match the transaction ids (txn_ids)
Args:
txn_ids (:obj:`list` of :obj:`str`): A list of txn_ids to
retrieve from the database.
Returns:
list: The list of metadata returned from the database.
"""
return backend.query.get_metadata(self.connection, txn_ids)
def write_assets(self, assets):
"""
Writes a list of assets into the database.
@ -632,6 +655,16 @@ class Bigchain(object):
"""
return backend.query.write_assets(self.connection, assets)
def write_metadata(self, metadata):
"""
Writes a list of metadata into the database.
Args:
metadata (:obj:`list` of :obj:`dict`): A list of metadata to write to
the database.
"""
return backend.query.write_metadata(self.connection, metadata)
def text_search(self, search, *, limit=0):
"""
Return an iterator of assets that match the text search

View File

@ -47,7 +47,9 @@ class Transaction(Transaction):
'input `{}` does not exist in a valid block'.format(
input_txid))
print(input_txid, self.id)
spent = bigchain.get_spent(input_txid, input_.fulfills.output)
print(spent)
if spent and spent.id != self.id:
raise DoubleSpend('input `{}` was already spent'
.format(input_txid))
@ -112,6 +114,13 @@ class Transaction(Transaction):
del asset['id']
tx_dict.update({'asset': asset})
# get metadata of the transaction
metadata = list(bigchain.get_metadata([tx_dict['id']]))
if metadata:
metadata = metadata[0]
del metadata['id']
tx_dict.update({'metadata': metadata})
return cls.from_dict(tx_dict)
@ -350,11 +359,15 @@ class Block(object):
"""
asset_ids = cls.get_asset_ids(block_dict)
assets = bigchain.get_assets(asset_ids)
txn_ids = cls.get_txn_ids(block_dict)
metadata = bigchain.get_metadata(txn_ids)
# reconstruct block
block_dict = cls.couple_assets(block_dict, assets)
block_dict = cls.couple_metadata(block_dict, metadata)
kwargs = from_dict_kwargs or {}
return cls.from_dict(block_dict, **kwargs)
def decouple_assets(self):
def decouple_assets(self, block_dict=None):
"""
Extracts the assets from the ``CREATE`` transactions in the block.
@ -363,6 +376,9 @@ class Block(object):
the block being the dict of the block with no assets in the CREATE
transactions.
"""
if block_dict is None:
block_dict = deepcopy(self.to_dict())
block_dict = deepcopy(self.to_dict())
assets = []
for transaction in block_dict['block']['transactions']:
@ -374,6 +390,28 @@ class Block(object):
return (assets, block_dict)
def decouple_metadata(self, block_dict=None):
"""
Extracts the metadata from transactions in the block.
Returns:
tuple: (metadatas, block) with the metadatas being a list of dict/null and
the block being the dict of the block with no metadata in any transaction.
"""
if block_dict is None:
block_dict = deepcopy(self.to_dict())
metadatas = []
for transaction in block_dict['block']['transactions']:
metadata = transaction.pop('metadata')
if isinstance(metadata, dict):
metadata.update({'id': transaction['id']})
metadatas.append(metadata)
else:
transaction.update({'metadata': metadata})
return (metadatas, block_dict)
@staticmethod
def couple_assets(block_dict, assets):
"""
@ -399,6 +437,31 @@ class Block(object):
transaction.update({'asset': assets.get(transaction['id'])})
return block_dict
@staticmethod
def couple_metadata(block_dict, metadatal):
"""
Given a block_dict with no metadata (as returned from a database call)
and a list of metadata, reconstruct the original block by putting the
metadata of each transaction back into its original transaction.
Args:
block_dict (:obj:`dict`): The block dict as returned from a
database call.
metadata (:obj:`list` of :obj:`dict`): A list of metadata returned from
a database call.
Returns:
dict: The dict of the reconstructed block.
"""
# create a dict with {'<txid>': metadata}
metadatal = {m.pop('id'): m for m in metadatal}
# add the metadata to their corresponding transactions
for transaction in block_dict['block']['transactions']:
metadata = metadatal.get(transaction['id'])
if metadata:
transaction.update({'metadata': metadata})
return block_dict
@staticmethod
def get_asset_ids(block_dict):
"""
@ -422,6 +485,25 @@ class Block(object):
return asset_ids
@staticmethod
def get_txn_ids(block_dict):
"""
Given a block_dict return all the transaction ids.
Args:
block_dict (:obj:`dict`): The block dict as returned from a
database call.
Returns:
list: The list of txn_ids in the block.
"""
txn_ids = []
for transaction in block_dict['block']['transactions']:
txn_ids.append(transaction['id'])
return txn_ids
def to_str(self):
return serialize(self.to_dict())

View File

@ -353,6 +353,7 @@ def test_get_genesis_block(genesis_block):
conn = connect()
assets, genesis_block_dict = genesis_block.decouple_assets()
metadata, genesis_block_dict = genesis_block.decouple_metadata(genesis_block_dict)
assert query.get_genesis_block(conn) == genesis_block_dict

View File

@ -19,7 +19,7 @@ def test_init_creates_db_tables_and_indexes():
collection_names = conn.conn[dbname].collection_names()
assert sorted(collection_names) == ['assets', 'backlog', 'bigchain',
'votes']
'metadata', 'votes']
indexes = conn.conn[dbname]['bigchain'].index_information().keys()
assert sorted(indexes) == ['_id_', 'asset_id', 'block_id', 'block_timestamp',
@ -67,7 +67,7 @@ def test_create_tables():
collection_names = conn.conn[dbname].collection_names()
assert sorted(collection_names) == ['assets', 'backlog', 'bigchain',
'votes']
'metadata', 'votes']
def test_create_secondary_indexes():

View File

@ -40,6 +40,8 @@ def test_schema(schema_func_name, args_qty):
('get_spending_transactions', 1),
('write_assets', 1),
('get_assets', 1),
('write_metadata', 1),
('get_metadata', 1),
))
def test_query(query_func_name, args_qty):
from bigchaindb.backend import query

View File

@ -25,6 +25,7 @@ def test_double_create(b, user_pk):
assert count_blocks(b.connection) == 2
@pytest.mark.dspend
@pytest.mark.usefixtures('inputs')
def test_get_owned_ids_works_after_double_spend(b, user_pk, user_sk):
""" Test for #633 https://github.com/bigchaindb/bigchaindb/issues/633 """
@ -37,9 +38,10 @@ def test_get_owned_ids_works_after_double_spend(b, user_pk, user_sk):
input_valid.id,
{'1': 1}).sign([user_sk])
print(tx_valid)
# write the valid tx and wait for voting/block to catch up
b.write_transaction(tx_valid)
time.sleep(2)
time.sleep(5)
# doesn't throw an exception
b.get_owned_ids(user_pk)