From 7f05974f0f9da311baae73a05c2a40ae1f84b9fd Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 6 Jan 2017 15:30:25 +0100 Subject: [PATCH] Added a secondary index with uniqueness constraint in backlog. Several test fixes --- bigchaindb/backend/mongodb/changefeed.py | 6 ++--- bigchaindb/backend/mongodb/query.py | 6 ++++- bigchaindb/backend/mongodb/schema.py | 6 +++++ tests/backend/mongodb/test_changefeed.py | 1 + tests/backend/mongodb/test_schema.py | 11 +++++++-- .../{ => rethinkdb}/test_changefeed.py | 0 tests/db/test_bigchain_api.py | 23 +++++++++---------- 7 files changed, 35 insertions(+), 18 deletions(-) create mode 100644 tests/backend/mongodb/test_changefeed.py rename tests/backend/{ => rethinkdb}/test_changefeed.py (100%) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index d52927b9..05ae7150 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -67,10 +67,10 @@ class MongoDBChangeFeed(ChangeFeed): # See https://github.com/bigchaindb/bigchaindb/issues/992 if is_insert and (self.operation & ChangeFeed.INSERT): record['o'].pop('_id', None) - doc = record['o'] + self.outqueue.put(record['o']) elif is_delete and (self.operation & ChangeFeed.DELETE): # on delete it only returns the id of the document - doc = record['o'] + self.outqueue.put(record['o']) elif is_update and (self.operation & ChangeFeed.UPDATE): # the oplog entry for updates only returns the update # operations to apply to the document and not the @@ -78,7 +78,7 @@ class MongoDBChangeFeed(ChangeFeed): # and then return it. doc = self.connection.conn[dbname][table]\ .find_one(record['o2'], projection={'_id': False}) - self.outqueue.put(doc) + self.outqueue.put(doc) @register_changefeed(MongoDBConnection) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 658eb3ca..fdccda68 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -4,6 +4,7 @@ from time import time from itertools import chain from pymongo import ReturnDocument +from pymongo import errors from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError @@ -16,7 +17,10 @@ register_query = module_dispatch_registrar(backend.query) @register_query(MongoDBConnection) def write_transaction(conn, signed_transaction): - return conn.db['backlog'].insert_one(signed_transaction) + try: + return conn.db['backlog'].insert_one(signed_transaction) + except errors.DuplicateKeyError: + return @register_query(MongoDBConnection) diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py index fed2d1e4..50674b12 100644 --- a/bigchaindb/backend/mongodb/schema.py +++ b/bigchaindb/backend/mongodb/schema.py @@ -72,6 +72,12 @@ def create_bigchain_secondary_index(conn, dbname): def create_backlog_secondary_index(conn, dbname): logger.info('Create `backlog` secondary index.') + # secondary index on the transaction id with a uniqueness constraint + # to make sure there are no duplicated transactions in the backlog + conn.conn[dbname]['backlog'].create_index('id', + name='transaction_id', + unique=True) + # compound index to read transactions from the backlog per assignee conn.conn[dbname]['backlog']\ .create_index([('assignee', ASCENDING), diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py new file mode 100644 index 00000000..90179ab8 --- /dev/null +++ b/tests/backend/mongodb/test_changefeed.py @@ -0,0 +1 @@ +"""MongoDB changefeed tests""" diff --git a/tests/backend/mongodb/test_schema.py b/tests/backend/mongodb/test_schema.py index 033d4113..8a97ddd8 100644 --- a/tests/backend/mongodb/test_schema.py +++ b/tests/backend/mongodb/test_schema.py @@ -25,7 +25,8 @@ def test_init_creates_db_tables_and_indexes(): 'transaction_id'] indexes = conn.conn[dbname]['backlog'].index_information().keys() - assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp'] + assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp', + 'transaction_id'] indexes = conn.conn[dbname]['votes'].index_information().keys() assert sorted(indexes) == ['_id_', 'block_and_voter'] @@ -85,13 +86,19 @@ def test_create_secondary_indexes(): # Backlog table indexes = conn.conn[dbname]['backlog'].index_information().keys() - assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp'] + assert sorted(indexes) == ['_id_', 'assignee__transaction_timestamp', + 'transaction_id'] # Votes table indexes = conn.conn[dbname]['votes'].index_information().keys() assert sorted(indexes) == ['_id_', 'block_and_voter'] +# The database is set up with a session scope. +# If we run this test we will remove secondary indexes that are nedeed for +# the rest of the tests +@pytest.mark.skipif(reason='This will remove the secondary indexes needed' + ' for the rest of the tests') def test_drop(): import bigchaindb from bigchaindb import backend diff --git a/tests/backend/test_changefeed.py b/tests/backend/rethinkdb/test_changefeed.py similarity index 100% rename from tests/backend/test_changefeed.py rename to tests/backend/rethinkdb/test_changefeed.py diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 2d4c48e5..241c12a0 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -97,18 +97,18 @@ class TestBigchainApi(object): tx = Transaction.create([b.me], [([b.me], 1)]) tx = tx.sign([b.me_private]) - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) block1 = b.create_block([tx]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2) + monkeypatch.setattr('time.time', lambda: 1000000020) transfer_tx = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], asset_id=tx.id) transfer_tx = transfer_tx.sign([b.me_private]) block2 = b.create_block([transfer_tx]) b.write_block(block2) - monkeypatch.setattr('time.time', lambda: 3333333333) + monkeypatch.setattr('time.time', lambda: 1000000030) transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], asset_id=tx.id) transfer_tx2 = transfer_tx2.sign([b.me_private]) @@ -132,11 +132,11 @@ class TestBigchainApi(object): tx = Transaction.create([b.me], [([b.me], 1)]) tx = tx.sign([b.me_private]) - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) block1 = b.create_block([tx]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 1000000020) block2 = b.create_block([tx]) b.write_block(block2) @@ -160,7 +160,7 @@ class TestBigchainApi(object): block1 = b.create_block([tx1]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2000000000) + monkeypatch.setattr('time.time', lambda: 1000000020) tx2 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx2 = tx2.sign([b.me_private]) @@ -180,6 +180,7 @@ class TestBigchainApi(object): @pytest.mark.usefixtures('inputs') def test_write_transaction(self, b, user_pk, user_sk): + from bigchaindb import Bigchain from bigchaindb.models import Transaction input_tx = b.get_owned_ids(user_pk).pop() @@ -190,12 +191,10 @@ class TestBigchainApi(object): tx = tx.sign([user_sk]) response = b.write_transaction(tx) - assert response['skipped'] == 0 - assert response['deleted'] == 0 - assert response['unchanged'] == 0 - assert response['errors'] == 0 - assert response['replaced'] == 0 - assert response['inserted'] == 1 + tx_from_db, status = b.get_transaction(tx.id, include_status=True) + + assert tx_from_db.to_dict() == tx.to_dict() + assert status == Bigchain.TX_IN_BACKLOG @pytest.mark.usefixtures('inputs') def test_read_transaction(self, b, user_pk, user_sk):