Merge remote-tracking branch 'origin/master' into make-db-connection-more-robust

This commit is contained in:
vrde 2016-09-19 17:14:02 +02:00
commit 121c729504
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
17 changed files with 397 additions and 43 deletions

View File

@ -31,6 +31,7 @@ config = {
},
'api_endpoint': 'http://localhost:9984/api/v1',
'consensus_plugin': 'default',
'backlog_reassign_delay': 30
}
# We need to maintain a backup copy of the original config dict in case

View File

@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False):
input('Statsd {}? (default `{}`): '.format(key, val)) \
or val
val = conf['backlog_reassign_delay']
conf['backlog_reassign_delay'] = \
input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \
or val
if config_path != '-':
bigchaindb.config_utils.write_config(conf, config_path)
else:

View File

@ -2,6 +2,7 @@ import random
import math
import collections
from copy import deepcopy
from time import time
from itertools import compress
import rethinkdb as r
@ -29,7 +30,7 @@ class Bigchain(object):
def __init__(self, host=None, port=None, dbname=None,
public_key=None, private_key=None, keyring=[],
consensus_plugin=None):
consensus_plugin=None, backlog_reassign_delay=None):
"""Initialize the Bigchain instance
A Bigchain instance has several configuration parameters (e.g. host).
@ -57,6 +58,7 @@ class Bigchain(object):
self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
self.consensus = config_utils.load_consensus_plugin(consensus_plugin)
# change RethinkDB read mode to majority. This ensures consistency in query results
self.read_mode = 'majority'
@ -138,6 +140,7 @@ class Bigchain(object):
signed_transaction = deepcopy(signed_transaction)
# update the transaction
signed_transaction.update({'assignee': assignee})
signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog
response = self.connection.run(
@ -145,6 +148,48 @@ class Bigchain(object):
.insert(signed_transaction, durability=durability))
return response
def reassign_transaction(self, transaction, durability='hard'):
"""Assign a transaction to a new node
Args:
transaction (dict): assigned transaction
Returns:
dict: database response or None if no reassignment is possible
"""
if self.nodes_except_me:
try:
federation_nodes = self.nodes_except_me + [self.me]
index_current_assignee = federation_nodes.index(transaction['assignee'])
new_assignee = random.choice(federation_nodes[:index_current_assignee] +
federation_nodes[index_current_assignee + 1:])
except ValueError:
# current assignee not in federation
new_assignee = random.choice(self.nodes_except_me)
else:
# There is no other node to assign to
new_assignee = self.me
response = r.table('backlog')\
.get(transaction['id'])\
.update({'assignee': new_assignee,
'assignment_timestamp': time()},
durability=durability).run(self.conn)
return response
def get_stale_transactions(self):
"""Get a RethinkDB cursor of stale transactions
Transactions are considered stale if they have been assigned a node, but are still in the
backlog after some amount of time specified in the configuration
"""
return r.table('backlog')\
.filter(lambda tx: time() - tx['assignment_timestamp'] >
self.backlog_reassign_delay).run(self.conn)
def get_transaction(self, txid, include_status=False):
"""Retrieve a transaction with `txid` from bigchain.
@ -540,12 +585,11 @@ class Bigchain(object):
r.table('bigchain')
.insert(r.json(block_serialized), durability=durability))
# TODO: Decide if we need this method
def transaction_exists(self, transaction_id):
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id'))
return True if len(response.items) > 0 else False
return len(response.items) > 0
def prepare_genesis_block(self):
"""Prepare a genesis block."""

View File

@ -42,36 +42,46 @@ class Block:
if tx['assignee'] == self.bigchain.me:
tx.pop('assignee')
tx.pop('assignment_timestamp')
return tx
def delete_tx(self, tx):
"""Delete a transaction.
Args:
tx (dict): the transaction to delete.
Returns:
The transaction.
"""
self.bigchain.connection.run(
r.table('backlog')
.get(tx['id'])
.delete(durability='hard'))
return tx
def validate_tx(self, tx):
"""Validate a transaction.
Also checks if the transaction already exists in the blockchain. If it
does, or it's invalid, it's deleted from the backlog immediately.
Args:
tx (dict): the transaction to validate.
Returns:
The transaction if valid, ``None`` otherwise.
"""
tx = self.bigchain.is_valid_transaction(tx)
if tx:
if self.bigchain.transaction_exists(tx['id']):
# if the transaction already exists, we must check whether
# it's in a valid or undecided block
tx, status = self.bigchain.get_transaction(tx['id'],
include_status=True)
if status == self.bigchain.TX_VALID \
or status == self.bigchain.TX_UNDECIDED:
# if the tx is already in a valid or undecided block,
# then it no longer should be in the backlog, or added
# to a new block. We can delete and drop it.
r.table('backlog').get(tx['id']) \
.delete(durability='hard') \
.run(self.bigchain.conn)
return None
tx_validated = self.bigchain.is_valid_transaction(tx)
if tx_validated:
return tx
else:
# if the transaction is not valid, remove it from the
# backlog
r.table('backlog').get(tx['id']) \
.delete(durability='hard') \
.run(self.bigchain.conn)
return None
def create(self, tx, timeout=False):
"""Create a block.
@ -112,6 +122,22 @@ class Block:
self.bigchain.write_block(block)
return block
def delete_tx(self, block):
"""Delete transactions.
Args:
block (dict): the block containg the transactions to delete.
Returns:
The block.
"""
r.table('backlog')\
.get_all(*[tx['id'] for tx in block['block']['transactions']])\
.delete(durability='hard')\
.run(self.bigchain.conn)
return block
def initial():
"""Return old transactions from the backlog."""
@ -132,7 +158,8 @@ def initial():
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial())
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
def create_pipeline():
@ -143,10 +170,10 @@ def create_pipeline():
block_pipeline = Pipeline([
Node(block.filter_tx),
Node(block.delete_tx),
Node(block.validate_tx, fraction_of_cores=1),
Node(block.create, timeout=1),
Node(block.write),
Node(block.delete_tx),
])
return block_pipeline

View File

@ -0,0 +1,76 @@
"""This module monitors for stale transactions.
It reassigns transactions which have been assigned a node but
remain in the backlog past a certain amount of time.
"""
import logging
from multipipes import Pipeline, Node
from bigchaindb import Bigchain
from time import sleep
logger = logging.getLogger(__name__)
class StaleTransactionMonitor:
"""This class encapsulates the logic for re-assigning stale transactions.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self, timeout=5, backlog_reassign_delay=None):
"""Initialize StaleTransaction monitor
Args:
timeout: how often to check for stale tx (in sec)
backlog_reassign_delay: How stale a transaction should
be before reassignment (in sec). If supplied, overrides the
Bigchain default value.
"""
self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay)
self.timeout = timeout
def check_transactions(self):
"""Poll backlog for stale transactions
Returns:
txs (list): txs to be re assigned
"""
sleep(self.timeout)
for tx in self.bigchain.get_stale_transactions():
yield tx
def reassign_transactions(self, tx):
"""Put tx back in backlog with new assignee
Returns:
transaction
"""
self.bigchain.reassign_transaction(tx)
return tx
def create_pipeline(timeout=5, backlog_reassign_delay=5):
"""Create and return the pipeline of operations to be distributed
on different processes."""
stm = StaleTransactionMonitor(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
monitor_pipeline = Pipeline([
Node(stm.check_transactions),
Node(stm.reassign_transactions)
])
return monitor_pipeline
def start(timeout=5, backlog_reassign_delay=5):
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
pipeline.setup()
pipeline.start()
return pipeline

View File

@ -72,5 +72,5 @@ class ChangeFeed(Node):
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change)
self.outqueue.put(change['new_val'])

View File

@ -2,7 +2,7 @@ import logging
import multiprocessing as mp
import bigchaindb
from bigchaindb.pipelines import vote, block, election
from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.web import server
@ -31,6 +31,9 @@ def start():
logger.info('Starting voter')
vote.start()
logger.info('Starting stale transaction monitor')
stale.start()
logger.info('Starting election')
election.start()

View File

@ -1,6 +1,17 @@
# Notes for Firewall Setup
This is a page of notes on the ports used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (or security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide.
This is a page of notes on the ports potentially used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (and security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide.
## Expected Unsolicited Inbound Traffic
Assuming you aren't exposing the RethinkDB web interface on port 8080 (or any other port, because [there are more secure ways to access it](https://www.rethinkdb.com/docs/security/#binding-the-web-interface-port)), there are only three ports that should expect unsolicited inbound traffic:
1. **Port 22** can expect inbound SSH (TCP) traffic from the node administrator (i.e. a small set of IP addresses).
2. **Port 9984** can expect inbound HTTP (TCP) traffic from BigchainDB clients sending transactions to the BigchainDB HTTP API.
3. **Port 29015** can expect inbound TCP traffic from other RethinkDB nodes in the RethinkDB cluster (for RethinkDB intracluster communications).
All other ports should only get inbound traffic in response to specific requests from inside the node.
## Port 22

View File

@ -132,6 +132,7 @@ class TestBigchainApi(object):
response, status = b.get_transaction(tx_signed["id"], include_status=True)
response.pop('assignee')
response.pop('assignment_timestamp')
# add validity information, which will be returned
assert util.serialize(tx_signed) == util.serialize(response)
assert status == b.TX_IN_BACKLOG

View File

@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['assignee'] = b.me
tx['assignment_timestamp'] = 111
# filter_tx has side effects on the `tx` instance by popping 'assignee'
assert block_maker.filter_tx(tx) == tx
# and 'assignment_timestamp'
filtered_tx = block_maker.filter_tx(tx)
assert filtered_tx == tx
assert 'assignee' not in filtered_tx
assert 'assignment_timestamp' not in filtered_tx
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
@ -69,21 +74,59 @@ def test_write_block(b, user_vk):
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
def test_duplicate_transaction(b, user_vk):
block_maker = block.Block()
txs = []
for i in range(10):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
txs.append(tx)
block_doc = b.create_block(txs)
block_maker.write(block_doc)
# block is in bigchain
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
b.write_transaction(txs[0])
# verify tx is in the backlog
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None
# try to validate a transaction that's already in the chain; should not
# work
assert block_maker.validate_tx(txs[0]) is None
# duplicate tx should be removed from backlog
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None
def test_delete_tx(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
block_maker.create(tx)
# make sure the tx appears in the backlog
b.write_transaction(tx)
tx_backlog = r.table('backlog').get(tx['id']).run(b.conn)
tx_backlog.pop('assignee')
assert tx_backlog == tx
# force the output triggering a `timeout`
block_doc = block_maker.create(None, timeout=True)
returned_tx = block_maker.delete_tx(tx)
for tx in block_doc['block']['transactions']:
returned_tx = r.table('backlog').get(tx['id']).run(b.conn)
returned_tx.pop('assignee')
returned_tx.pop('assignment_timestamp')
assert returned_tx == tx
assert returned_tx == tx
assert r.table('backlog').get(tx['id']).run(b.conn) is None
returned_block = block_maker.delete_tx(block_doc)
assert returned_block == block_doc
for tx in block_doc['block']['transactions']:
assert r.table('backlog').get(tx['id']).run(b.conn) is None
def test_prefeed(b, user_vk):
@ -115,6 +158,7 @@ def test_full_pipeline(b, user_vk):
tx = b.sign_transaction(tx, b.me_private)
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
tx['assignee'] = assignee
tx['assignment_timestamp'] = time.time()
if assignee == b.me:
count_assigned_to_me += 1
r.table('backlog').insert(tx, durability='hard').run(b.conn)

View File

@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk):
e.requeue_transactions(test_block)
tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn)
tx_backlog.pop('assignee')
tx_backlog.pop('assignment_timestamp')
assert tx_backlog == tx1

View File

@ -0,0 +1,116 @@
import rethinkdb as r
from bigchaindb import Bigchain
from bigchaindb.pipelines import stale
from multipipes import Pipe, Pipeline
from unittest.mock import patch
from bigchaindb import config_utils
import time
import os
def test_get_stale(b, user_vk):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
tx_stale = stm.check_transactions()
for _tx in tx_stale:
_tx.pop('assignee')
_tx.pop('assignment_timestamp')
assert tx == _tx
def test_reassign_transactions(b, user_vk):
# test with single node
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
stm.reassign_transactions(tx)
# test with federation
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx, durability='hard')
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
tx = list(r.table('backlog').run(b.conn))[0]
stm.reassign_transactions(tx)
reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn)
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
assert reassigned_tx['assignee'] != tx['assignee']
# test with node not in federation
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx.update({'assignee': 'lol'})
tx.update({'assignment_timestamp': time.time()})
r.table('backlog').insert(tx, durability='hard').run(b.conn)
tx = list(r.table('backlog').run(b.conn))[0]
stm.reassign_transactions(tx)
assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol'
def test_full_pipeline(user_vk):
CONFIG = {
'database': {
'name': 'bigchain_test_{}'.format(os.getpid())
},
'keypair': {
'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA',
'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8'
},
'keyring': ['aaa', 'bbb'],
'backlog_reassign_delay': 0.01
}
config_utils.set_config(CONFIG)
b = Bigchain()
outpipe = Pipe()
original_txs = {}
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn)
assert r.table('backlog').count().run(b.conn) == 100
pipeline = stale.create_pipeline(backlog_reassign_delay=1,
timeout=1)
pipeline.setup(outdata=outpipe)
pipeline.start()
# timing should be careful -- test will fail if reassignment happens multiple times
time.sleep(2)
pipeline.terminate()
# to terminate
outpipe.get()
assert r.table('backlog').count().run(b.conn) == 100
reassigned_txs = list(r.table('backlog').run(b.conn))
# check that every assignment timestamp has increased, and every tx has a new assignee
for reassigned_tx in reassigned_txs:
assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp']
assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee']
@patch.object(Pipeline, 'start')
def test_start(mock_start):
# TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`,
# that is tested by `test_full_pipeline`.
# If anyone has better ideas on how to test this, please do a PR :)
stale.start()
mock_start.assert_called_with()

View File

@ -43,8 +43,7 @@ def test_changefeed_update(mock_run):
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
@ -55,8 +54,7 @@ def test_changefeed_multiple_operations(mock_run):
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0

View File

@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch):
'keypair': {},
'database': {'host': 'host', 'port': 12345, 'name': 'adbname'},
'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1},
'backlog_reassign_delay': 5
}
monkeypatch.setattr('bigchaindb._config', config)

View File

@ -145,7 +145,8 @@ def test_env_config(monkeypatch):
def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
file_config = {
'database': {'host': 'test-host'}
'database': {'host': 'test-host'},
'backlog_reassign_delay': 5
}
monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config)
monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname',
@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
},
'api_endpoint': 'http://localhost:9984/api/v1',
'consensus_plugin': 'default',
'backlog_reassign_delay': 5
}

View File

@ -19,7 +19,8 @@ def config(request, monkeypatch):
},
'keyring': [],
'CONFIGURED': True,
'consensus_plugin': 'default'
'consensus_plugin': 'default',
'backlog_reassign_delay': 30
}
monkeypatch.setattr('bigchaindb.config', config)

23
tests/test_processes.py Normal file
View File

@ -0,0 +1,23 @@
from unittest.mock import patch
from multiprocessing import Process
from bigchaindb.pipelines import vote, block, election, stale
@patch.object(stale, 'start')
@patch.object(election, 'start')
@patch.object(block, 'start')
@patch.object(vote, 'start')
@patch.object(Process, 'start')
def test_processes_start(mock_vote, mock_block, mock_election, mock_stale,
mock_process):
from bigchaindb import processes
processes.start()
mock_vote.assert_called_with()
mock_block.assert_called_with()
mock_election.assert_called_with()
mock_stale.assert_called_with()
mock_process.assert_called_with()