Merge pull request #994 from bigchaindb/feat/927/implement-mongodb-changefeed

[WIP] Implement mongodb changefeed
This commit is contained in:
Rodolphe Marques 2017-01-04 17:10:46 +01:00 committed by GitHub
commit 8e99d18fd6
7 changed files with 139 additions and 20 deletions

View File

@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module.
"""
# Register the single dispatched modules on import.
from bigchaindb.backend.mongodb import schema, query # noqa no changefeed for now
from bigchaindb.backend.mongodb import schema, query, changefeed # noqa
# MongoDBConnection should always be accessed via
# ``bigchaindb.backend.connect()``.

View File

@ -0,0 +1,94 @@
import logging
import time
import pymongo
from pymongo.errors import ConnectionFailure, OperationFailure
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
logger = logging.getLogger(__name__)
register_changefeed = module_dispatch_registrar(backend.changefeed)
class MongoDBChangeFeed(ChangeFeed):
"""This class implements a MongoDB changefeed.
We emulate the behaviour of the RethinkDB changefeed by using a tailable
cursor that listens for events on the oplog.
"""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (ConnectionFailure, OperationFailure) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
dbname = self.connection.dbname
table = self.table
namespace = '{}.{}'.format(dbname, table)
# last timestamp in the oplog. We only care for operations happening
# in the future.
last_ts = self.connection.conn.local.oplog.rs.find()\
.sort('$natural', pymongo.DESCENDING).limit(1)\
.next()['ts']
# tailable cursor. A tailable cursor will remain open even after the
# last result was returned. ``TAILABLE_AWAIT`` will block for some
# timeout after the last result was returned. If no result is received
# in the meantime it will raise a StopIteration excetiption.
cursor = self.connection.conn.local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
)
while cursor.alive:
try:
record = cursor.next()
except StopIteration:
continue
is_insert = record['op'] == 'i'
is_delete = record['op'] == 'd'
is_update = record['op'] == 'u'
# mongodb documents uses the `_id` for the primary key.
# We are not using this field at this point and we need to
# remove it to prevent problems with schema validation.
# See https://github.com/bigchaindb/bigchaindb/issues/992
if is_insert and (self.operation & ChangeFeed.INSERT):
record['o'].pop('_id', None)
doc = record['o']
elif is_delete and (self.operation & ChangeFeed.DELETE):
# on delete it only returns the id of the document
doc = record['o']
elif is_update and (self.operation & ChangeFeed.UPDATE):
# the oplog entry for updates only returns the update
# operations to apply to the document and not the
# document itself. So here we first read the document
# and then return it.
doc = self.connection.conn[dbname][table]\
.find_one(record['o2'], projection={'_id': False})
self.outqueue.put(doc)
@register_changefeed(MongoDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a MongoDB changefeed.
Returns:
An instance of
:class:`~bigchaindb.backend.mongodb.MongoDBChangeFeed`.
"""
return MongoDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)

View File

@ -20,10 +20,12 @@ def write_transaction(conn, signed_transaction):
@register_query(MongoDBConnection)
def update_transaction(conn, transaction_id, doc):
# with mongodb we need to add update operators to the doc
doc = {'$set': doc}
return conn.db['backlog']\
.find_one_and_update({'id': transaction_id},
doc,
return_document=ReturnDocument.AFTER)
.find_one_and_update({'id': transaction_id},
doc,
return_document=ReturnDocument.AFTER)
@register_query(MongoDBConnection)
@ -38,10 +40,20 @@ def get_stale_transactions(conn, reassign_delay):
@register_query(MongoDBConnection)
def get_transaction_from_block(conn, block_id, tx_id):
# this is definitely wrong, but it's something like this
return conn.db['bigchain'].find_one({'id': block_id,
'block.transactions.id': tx_id})
def get_transaction_from_block(conn, transaction_id, block_id):
return conn.db['bigchain'].aggregate([
{'$match': {'id': block_id}},
{'$project': {
'block.transactions': {
'$filter': {
'input': '$block.transactions',
'as': 'transaction',
'cond': {
'$eq': ['$$transaction.id', transaction_id]
}
}
}
}}]).next()['block']['transactions'][0]
@register_query(MongoDBConnection)
@ -90,14 +102,16 @@ def get_owned_ids(conn, owner):
@register_query(MongoDBConnection)
def get_votes_by_block_id(conn, block_id):
return conn.db['votes']\
.find({'vote.voting_for_block': block_id})
.find({'vote.voting_for_block': block_id},
projection={'_id': False})
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.db['votes']\
.find({'vote.voting_for_block': block_id,
'node_pubkey': node_pubkey})
'node_pubkey': node_pubkey},
projection={'_id': False})
@register_query(MongoDBConnection)
@ -133,8 +147,9 @@ def write_vote(conn, vote):
@register_query(MongoDBConnection)
def get_genesis_block(conn):
return conn.db['bigchain'].find_one({'block.transactions.0.operation' ==
'GENESIS'})
return conn.db['bigchain'].find_one({
'block.transactions.0.operation': 'GENESIS'
})
@register_query(MongoDBConnection)
@ -142,7 +157,10 @@ def get_last_voted_block(conn, node_pubkey):
last_voted = conn.db['votes']\
.find({'node_pubkey': node_pubkey},
sort=[('vote.timestamp', -1)])
if not last_voted:
# pymongo seems to return a cursor even if there are no results
# so we actually need to check the count
if last_voted.count() == 0:
return get_genesis_block(conn)
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']

View File

@ -61,13 +61,12 @@ def create_bigchain_secondary_index(conn, dbname):
# to query the bigchain for a transaction id, this field is unique
conn.conn[dbname]['bigchain'].create_index('block.transactions.id',
name='transaction_id',
unique=True)
name='transaction_id')
# secondary index for asset uuid, this field is unique
conn.conn[dbname]['bigchain']\
.create_index('block.transactions.transaction.asset.id',
name='asset_id', unique=True)
name='asset_id')
def create_backlog_secondary_index(conn, dbname):

View File

@ -48,6 +48,8 @@ class StaleTransactionMonitor:
Returns:
transaction
"""
# NOTE: Maybe this is to verbose?
logger.info('Reassigning transaction with id %s', tx['id'])
self.bigchain.reassign_transaction(tx)
return tx

View File

@ -5,10 +5,10 @@ of actions to do on transactions is specified in the ``create_pipeline``
function.
"""
import logging
from collections import Counter
from multipipes import Pipeline, Node
from bigchaindb.common import exceptions
import bigchaindb
from bigchaindb import Bigchain
@ -16,6 +16,10 @@ from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Transaction, Block
from bigchaindb.common import exceptions
logger = logging.getLogger(__name__)
class Vote:
@ -132,7 +136,9 @@ class Vote:
Args:
vote: the vote to write.
"""
validity = 'valid' if vote['vote']['is_block_valid'] else 'invalid'
logger.info("Voting '%s' for block %s", validity,
vote['vote']['voting_for_block'])
self.bigchain.write_vote(vote)
return vote

View File

@ -153,14 +153,14 @@ class TestBigchainApi(object):
def test_get_transaction_in_invalid_and_valid_block(self, monkeypatch, b):
from bigchaindb.models import Transaction
monkeypatch.setattr('time.time', lambda: 1)
monkeypatch.setattr('time.time', lambda: 1000000000)
tx1 = Transaction.create([b.me], [([b.me], 1)],
metadata={'msg': random.random()})
tx1 = tx1.sign([b.me_private])
block1 = b.create_block([tx1])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 2222222222)
monkeypatch.setattr('time.time', lambda: 2000000000)
tx2 = Transaction.create([b.me], [([b.me], 1)],
metadata={'msg': random.random()})
tx2 = tx2.sign([b.me_private])