diff --git a/.ci/travis-before-script.sh b/.ci/travis-before-script.sh index c6c5fe2e..06319d47 100755 --- a/.ci/travis-before-script.sh +++ b/.ci/travis-before-script.sh @@ -4,6 +4,9 @@ set -e -x if [[ "${TOXENV}" == *-rdb ]]; then rethinkdb --daemon -elif [[ "${TOXENV}" == *-mdb ]]; then - sudo service mongod start +elif [[ "${BIGCHAINDB_DATABASE_BACKEND}" == mongodb ]]; then + wget http://downloads.mongodb.org/linux/mongodb-linux-x86_64-3.4.1.tgz -O /tmp/mongodb.tgz + tar -xvf /tmp/mongodb.tgz + mkdir /tmp/mongodb-data + ${PWD}/mongodb-linux-x86_64-3.4.1/bin/mongod --dbpath=/tmp/mongodb-data --replSet=rs0 &> /dev/null & fi diff --git a/.ci/travis_script.sh b/.ci/travis_script.sh index ad5a2ddc..83d1731e 100755 --- a/.ci/travis_script.sh +++ b/.ci/travis_script.sh @@ -4,6 +4,8 @@ set -e -x if [[ -n ${TOXENV} ]]; then tox -e ${TOXENV} +elif [[ "${BIGCHAINDB_DATABASE_BACKEND}" == mongodb ]]; then + pytest -v --database-backend=mongodb --cov=bigchaindb else pytest -v -n auto --cov=bigchaindb fi diff --git a/.travis.yml b/.travis.yml index 5704b3a6..da7ae05f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,8 +12,6 @@ env: matrix: fast_finish: true - allow_failures: - - env: BIGCHAINDB_DATABASE_BACKEND=mongodb exclude: - python: 3.4 env: TOXENV=flake8 @@ -26,18 +24,19 @@ matrix: addons: rethinkdb: '2.3.5' env: BIGCHAINDB_DATABASE_BACKEND=rethinkdb - - python: 3.5 - services: mongodb - env: BIGCHAINDB_DATABASE_BACKEND=mongodb - python: 3.5 addons: rethinkdb: '2.3.5' env: BIGCHAINDB_DATABASE_BACKEND=rethinkdb + - python: 3.5 + env: BIGCHAINDB_DATABASE_BACKEND=mongodb before_install: sudo .ci/travis-before-install.sh install: .ci/travis-install.sh +before_script: .ci/travis-before-script.sh + script: .ci/travis_script.sh after_success: .ci/travis-after-success.sh diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index d52927b9..05ae7150 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -67,10 +67,10 @@ class MongoDBChangeFeed(ChangeFeed): # See https://github.com/bigchaindb/bigchaindb/issues/992 if is_insert and (self.operation & ChangeFeed.INSERT): record['o'].pop('_id', None) - doc = record['o'] + self.outqueue.put(record['o']) elif is_delete and (self.operation & ChangeFeed.DELETE): # on delete it only returns the id of the document - doc = record['o'] + self.outqueue.put(record['o']) elif is_update and (self.operation & ChangeFeed.UPDATE): # the oplog entry for updates only returns the update # operations to apply to the document and not the @@ -78,7 +78,7 @@ class MongoDBChangeFeed(ChangeFeed): # and then return it. doc = self.connection.conn[dbname][table]\ .find_one(record['o2'], projection={'_id': False}) - self.outqueue.put(doc) + self.outqueue.put(doc) @register_changefeed(MongoDBConnection) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 8765d110..4cb82ef6 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -1,8 +1,10 @@ """Query implementation for MongoDB""" from time import time +from itertools import chain from pymongo import ReturnDocument +from pymongo import errors from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError @@ -15,7 +17,10 @@ register_query = module_dispatch_registrar(backend.query) @register_query(MongoDBConnection) def write_transaction(conn, signed_transaction): - return conn.db['backlog'].insert_one(signed_transaction) + try: + return conn.db['backlog'].insert_one(signed_transaction) + except errors.DuplicateKeyError: + return @register_query(MongoDBConnection) @@ -36,7 +41,8 @@ def delete_transaction(conn, *transaction_id): @register_query(MongoDBConnection) def get_stale_transactions(conn, reassign_delay): return conn.db['backlog']\ - .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}) + .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, + projection={'_id': False}) @register_query(MongoDBConnection) @@ -58,7 +64,10 @@ def get_transaction_from_block(conn, transaction_id, block_id): @register_query(MongoDBConnection) def get_transaction_from_backlog(conn, transaction_id): - return conn.db['backlog'].find_one({'id': transaction_id}) + return conn.db['backlog']\ + .find_one({'id': transaction_id}, + projection={'_id': False, 'assignee': False, + 'assignment_timestamp': False}) @register_query(MongoDBConnection) @@ -70,33 +79,83 @@ def get_blocks_status_from_transaction(conn, transaction_id): @register_query(MongoDBConnection) def get_txids_by_asset_id(conn, asset_id): - return conn.db['bigchain']\ - .find({'block.transactions.asset.id': asset_id}, - projection=['id']) + # get the txid of the create transaction for asset_id + cursor = conn.db['bigchain'].aggregate([ + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$project': {'block.transactions.id': True}} + ]) + create_tx_txids = (elem['block']['transactions']['id'] for elem in cursor) + + # get txids of transfer transaction with asset_id + cursor = conn.db['bigchain'].aggregate([ + {'$match': { + 'block.transactions.asset.id': asset_id + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.asset.id': asset_id + }}, + {'$project': {'block.transactions.id': True}} + ]) + transfer_tx_ids = (elem['block']['transactions']['id'] for elem in cursor) + + return chain(create_tx_txids, transfer_tx_ids) @register_query(MongoDBConnection) def get_asset_by_id(conn, asset_id): - return conn.db['bigchain']\ - .find_one({'block.transactions.asset.id': asset_id, - 'block.transactions.asset.operation': 'CREATE'}, - projection=['block.transactions.asset']) + cursor = conn.db['bigchain'].aggregate([ + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$project': {'block.transactions.asset': True}} + ]) + # we need to access some nested fields before returning so lets use a + # generator to avoid having to read all records on the cursor at this point + return (elem['block']['transactions'] for elem in cursor) @register_query(MongoDBConnection) -def get_spent(conn, transaction_id, condition_id): - return conn.db['bigchain']\ - .find_one({'block.transactions.fulfillments.input.txid': - transaction_id, - 'block.transactions.fulfillments.input.cid': - condition_id}) +def get_spent(conn, transaction_id, output): + cursor = conn.db['bigchain'].aggregate([ + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.inputs.fulfills.txid': transaction_id, + 'block.transactions.inputs.fulfills.output': output + }} + ]) + # we need to access some nested fields before returning so lets use a + # generator to avoid having to read all records on the cursor at this point + return (elem['block']['transactions'] for elem in cursor) @register_query(MongoDBConnection) def get_owned_ids(conn, owner): - return conn.db['bigchain']\ - .find({'block.transactions.transaction.conditions.owners_after': - owner}) + cursor = conn.db['bigchain'].aggregate([ + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.outputs.public_keys': { + '$elemMatch': {'$eq': owner} + } + }} + ]) + # we need to access some nested fields before returning so lets use a + # generator to avoid having to read all records on the cursor at this point + return (elem['block']['transactions'] for elem in cursor) @register_query(MongoDBConnection) @@ -121,7 +180,8 @@ def write_block(conn, block): @register_query(MongoDBConnection) def get_block(conn, block_id): - return conn.db['bigchain'].find_one({'id': block_id}) + return conn.db['bigchain'].find_one({'id': block_id}, + projection={'_id': False}) @register_query(MongoDBConnection) @@ -184,4 +244,18 @@ def get_last_voted_block(conn, node_pubkey): @register_query(MongoDBConnection) def get_unvoted_blocks(conn, node_pubkey): - pass + return conn.db['bigchain'].aggregate([ + {'$lookup': { + 'from': 'votes', + 'localField': 'id', + 'foreignField': 'vote.voting_for_block', + 'as': 'votes' + }}, + {'$match': { + 'votes.node_pubkey': {'$ne': node_pubkey}, + 'block.transactions.operation': {'$ne': 'GENESIS'} + }}, + {'$project': { + 'votes': False, '_id': False + }} + ]) diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py index fed2d1e4..50674b12 100644 --- a/bigchaindb/backend/mongodb/schema.py +++ b/bigchaindb/backend/mongodb/schema.py @@ -72,6 +72,12 @@ def create_bigchain_secondary_index(conn, dbname): def create_backlog_secondary_index(conn, dbname): logger.info('Create `backlog` secondary index.') + # secondary index on the transaction id with a uniqueness constraint + # to make sure there are no duplicated transactions in the backlog + conn.conn[dbname]['backlog'].create_index('id', + name='transaction_id', + unique=True) + # compound index to read transactions from the backlog per assignee conn.conn[dbname]['backlog']\ .create_index([('assignee', ASCENDING), diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py new file mode 100644 index 00000000..90179ab8 --- /dev/null +++ b/tests/backend/mongodb/test_changefeed.py @@ -0,0 +1 @@ +"""MongoDB changefeed tests""" diff --git a/tests/backend/mongodb/test_schema.py b/tests/backend/mongodb/test_schema.py index 033d4113..33ddb719 100644 --- a/tests/backend/mongodb/test_schema.py +++ b/tests/backend/mongodb/test_schema.py @@ -25,7 +25,8 @@ def test_init_creates_db_tables_and_indexes(): 'transaction_id'] indexes = conn.conn[dbname]['backlog'].index_information().keys() - assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp'] + assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp', + 'transaction_id'] indexes = conn.conn[dbname]['votes'].index_information().keys() assert sorted(indexes) == ['_id_', 'block_and_voter'] @@ -85,26 +86,23 @@ def test_create_secondary_indexes(): # Backlog table indexes = conn.conn[dbname]['backlog'].index_information().keys() - assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp'] + assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp', + 'transaction_id'] # Votes table indexes = conn.conn[dbname]['votes'].index_information().keys() assert sorted(indexes) == ['_id_', 'block_and_voter'] -def test_drop(): +def test_drop(dummy_db): import bigchaindb from bigchaindb import backend from bigchaindb.backend import schema conn = backend.connect() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - assert dbname in conn.conn.database_names() - - schema.drop_database(conn, dbname) - assert dbname not in conn.conn.database_names() + assert dummy_db in conn.conn.database_names() + schema.drop_database(conn, dummy_db) + assert dummy_db not in conn.conn.database_names() def test_get_replica_set_name_not_enabled(): diff --git a/tests/backend/test_changefeed.py b/tests/backend/rethinkdb/test_changefeed.py similarity index 100% rename from tests/backend/test_changefeed.py rename to tests/backend/rethinkdb/test_changefeed.py diff --git a/tests/conftest.py b/tests/conftest.py index 9a6b7a73..8b7c5cb1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -136,11 +136,18 @@ def _configure_bigchaindb(request): def _setup_database(_configure_bigchaindb): from bigchaindb import config from bigchaindb.backend import connect, schema + from bigchaindb.backend.mongodb.schema import initialize_replica_set from bigchaindb.common.exceptions import DatabaseDoesNotExist print('Initializing test db') dbname = config['database']['name'] conn = connect() + # if we are setting up mongodb for the first time we need to make sure + # that the replica set is initialized before doing any operation in the + # database + if config['database']['backend'] == 'mongodb': + initialize_replica_set(conn) + try: schema.drop_database(conn, dbname) except DatabaseDoesNotExist: @@ -315,10 +322,10 @@ def dummy_db(request): if xdist_suffix: dbname = '{}_{}'.format(dbname, xdist_suffix) try: - schema.create_database(conn, dbname) + schema.init_database(conn, dbname) except DatabaseAlreadyExists: schema.drop_database(conn, dbname) - schema.create_database(conn, dbname) + schema.init_database(conn, dbname) yield dbname try: schema.drop_database(conn, dbname) diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 2d4c48e5..241c12a0 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -97,18 +97,18 @@ class TestBigchainApi(object): tx = Transaction.create([b.me], [([b.me], 1)]) tx = tx.sign([b.me_private]) - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) block1 = b.create_block([tx]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2) + monkeypatch.setattr('time.time', lambda: 1000000020) transfer_tx = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], asset_id=tx.id) transfer_tx = transfer_tx.sign([b.me_private]) block2 = b.create_block([transfer_tx]) b.write_block(block2) - monkeypatch.setattr('time.time', lambda: 3333333333) + monkeypatch.setattr('time.time', lambda: 1000000030) transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], asset_id=tx.id) transfer_tx2 = transfer_tx2.sign([b.me_private]) @@ -132,11 +132,11 @@ class TestBigchainApi(object): tx = Transaction.create([b.me], [([b.me], 1)]) tx = tx.sign([b.me_private]) - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) block1 = b.create_block([tx]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 1000000020) block2 = b.create_block([tx]) b.write_block(block2) @@ -160,7 +160,7 @@ class TestBigchainApi(object): block1 = b.create_block([tx1]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2000000000) + monkeypatch.setattr('time.time', lambda: 1000000020) tx2 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx2 = tx2.sign([b.me_private]) @@ -180,6 +180,7 @@ class TestBigchainApi(object): @pytest.mark.usefixtures('inputs') def test_write_transaction(self, b, user_pk, user_sk): + from bigchaindb import Bigchain from bigchaindb.models import Transaction input_tx = b.get_owned_ids(user_pk).pop() @@ -190,12 +191,10 @@ class TestBigchainApi(object): tx = tx.sign([user_sk]) response = b.write_transaction(tx) - assert response['skipped'] == 0 - assert response['deleted'] == 0 - assert response['unchanged'] == 0 - assert response['errors'] == 0 - assert response['replaced'] == 0 - assert response['inserted'] == 1 + tx_from_db, status = b.get_transaction(tx.id, include_status=True) + + assert tx_from_db.to_dict() == tx.to_dict() + assert status == Bigchain.TX_IN_BACKLOG @pytest.mark.usefixtures('inputs') def test_read_transaction(self, b, user_pk, user_sk): diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 1c46c065..e0b27f50 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -279,7 +279,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, tx = Transaction.create([b.me], [([test_user_pub], 1)]) tx = tx.sign([b.me_private]) - monkeypatch.setattr('time.time', lambda: 1111111111) + monkeypatch.setattr('time.time', lambda: 1000000000) block = b.create_block([tx]) b.write_block(block) @@ -289,7 +289,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, asset_id=tx.id) tx2 = tx2.sign([test_user_priv]) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 2000000000) block2 = b.create_block([tx2]) b.write_block(block2) @@ -314,7 +314,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, 'previous_block': genesis_block.id, 'is_block_valid': True, 'invalid_reason': None, - 'timestamp': '2222222222'} + 'timestamp': '2000000000'} serialized_vote = utils.serialize(vote_doc['vote']).encode() assert vote_doc['node_pubkey'] == b.me @@ -328,7 +328,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, 'previous_block': block.id, 'is_block_valid': True, 'invalid_reason': None, - 'timestamp': '2222222222'} + 'timestamp': '2000000000'} serialized_vote2 = utils.serialize(vote2_doc['vote']).encode() assert vote2_doc['node_pubkey'] == b.me @@ -498,15 +498,15 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): outpipe = Pipe() - monkeypatch.setattr('time.time', lambda: 1111111111) + monkeypatch.setattr('time.time', lambda: 1000000000) block_ids = [] # insert blocks in the database while the voter process is not listening # (these blocks won't appear in the changefeed) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 1000000020) block_1 = dummy_block(b) block_ids.append(block_1.id) - monkeypatch.setattr('time.time', lambda: 3333333333) + monkeypatch.setattr('time.time', lambda: 1000000030) b.write_block(block_1) block_2 = dummy_block(b) block_ids.append(block_2.id) @@ -522,7 +522,7 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): outpipe.get() # create a new block that will appear in the changefeed - monkeypatch.setattr('time.time', lambda: 4444444444) + monkeypatch.setattr('time.time', lambda: 1000000040) block_3 = dummy_block(b) block_ids.append(block_3.id) b.write_block(block_3) @@ -546,15 +546,15 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): outpipe = Pipe() - monkeypatch.setattr('time.time', lambda: 1111111111) + monkeypatch.setattr('time.time', lambda: 1000000000) block_ids = [] - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 1000000020) block_1 = dummy_block(b) block_ids.append(block_1.id) b.write_block(block_1) - monkeypatch.setattr('time.time', lambda: 3333333333) + monkeypatch.setattr('time.time', lambda: 1000000030) block_2 = dummy_block(b) block_ids.append(block_2.id) b.write_block(block_2) @@ -588,9 +588,9 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): inpipe = Pipe() outpipe = Pipe() - monkeypatch.setattr('time.time', lambda: 1111111111) + monkeypatch.setattr('time.time', lambda: 1000000000) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 1000000020) block_1 = dummy_block(b) inpipe.put(block_1.to_dict()) assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 0 @@ -603,11 +603,11 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): outpipe.get() # queue block for voting AGAIN - monkeypatch.setattr('time.time', lambda: 3333333333) + monkeypatch.setattr('time.time', lambda: 1000000030) inpipe.put(block_1.to_dict()) # queue another block - monkeypatch.setattr('time.time', lambda: 4444444444) + monkeypatch.setattr('time.time', lambda: 1000000040) block_2 = dummy_block(b) inpipe.put(block_2.to_dict()) diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index 2a326147..e190f622 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -102,7 +102,10 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): file_config = { - 'database': {'host': 'test-host'}, + 'database': { + 'host': 'test-host', + 'backend': request.config.getoption('--database-backend') + }, 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config)