mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Merge remote-tracking branch 'remotes/origin/master' into fix/260/add-output-condition-to-fulfillment-message
This commit is contained in:
commit
76eb18329e
@ -2,6 +2,10 @@ import multiprocessing as mp
|
||||
import uuid
|
||||
import json
|
||||
import argparse
|
||||
import csv
|
||||
import time
|
||||
import logging
|
||||
import rethinkdb as r
|
||||
|
||||
from os.path import expanduser
|
||||
|
||||
@ -10,6 +14,10 @@ from bigchaindb.util import ProcessGroup
|
||||
from bigchaindb.commands import utils
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_write_transaction(tx_left):
|
||||
b = Bigchain()
|
||||
while tx_left > 0:
|
||||
@ -36,6 +44,60 @@ def run_set_statsd_host(args):
|
||||
json.dump(conf, f)
|
||||
|
||||
|
||||
def run_gather_metrics(args):
|
||||
# setup a rethinkdb connection
|
||||
conn = r.connect(args.bigchaindb_host, 28015, 'bigchain')
|
||||
|
||||
# setup csv writer
|
||||
csv_file = open(args.csvfile, 'w')
|
||||
csv_writer = csv.writer(csv_file)
|
||||
|
||||
# query for the number of transactions on the backlog
|
||||
num_transactions = r.table('backlog').count().run(conn)
|
||||
num_transactions_received = 0
|
||||
initial_time = None
|
||||
logger.info('Starting gathering metrics. {} transasctions in the backlog'.format(num_transactions))
|
||||
logger.info('This process should exit automatically. '
|
||||
'If this does not happen you can exit at any time using Ctrl-C'
|
||||
' saving all the metrics gathered up to this point.')
|
||||
|
||||
logger.info('\t{:<20} {:<20} {:<20} {:<20}'.format('timestamp', 'tx in block',
|
||||
'tx/s', '% complete'))
|
||||
|
||||
# listen to the changefeed
|
||||
try:
|
||||
for change in r.table('bigchain').changes().run(conn):
|
||||
# check only for new blocks
|
||||
if change['old_val'] is None:
|
||||
block_num_transactions = len(change['new_val']['block']['transactions'])
|
||||
time_now = time.time()
|
||||
csv_writer.writerow([str(time_now), str(block_num_transactions)])
|
||||
|
||||
# log statistics
|
||||
if initial_time is None:
|
||||
initial_time = time_now
|
||||
|
||||
num_transactions_received += block_num_transactions
|
||||
elapsed_time = time_now - initial_time
|
||||
percent_complete = round((num_transactions_received / num_transactions) * 100)
|
||||
|
||||
if elapsed_time != 0:
|
||||
transactions_per_second = round(num_transactions_received / elapsed_time)
|
||||
else:
|
||||
transactions_per_second = float('nan')
|
||||
|
||||
logger.info('\t{:<20} {:<20} {:<20} {:<20}'.format(time_now, block_num_transactions,
|
||||
transactions_per_second, percent_complete))
|
||||
|
||||
if (num_transactions - num_transactions_received) == 0:
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
logger.info('Interrupted. Exiting early...')
|
||||
finally:
|
||||
# close files
|
||||
csv_file.close()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='BigchainDB benchmarking utils')
|
||||
subparsers = parser.add_subparsers(title='Commands', dest='command')
|
||||
@ -52,6 +114,18 @@ def main():
|
||||
statsd_parser.add_argument('statsd_host', metavar='statsd_host', default='localhost',
|
||||
help='Hostname of the statsd server')
|
||||
|
||||
# metrics
|
||||
metrics_parser = subparsers.add_parser('gather-metrics',
|
||||
help='Gather metrics to a csv file')
|
||||
|
||||
metrics_parser.add_argument('-b', '--bigchaindb-host',
|
||||
required=True,
|
||||
help='Bigchaindb node hostname to connect to gather cluster metrics')
|
||||
|
||||
metrics_parser.add_argument('-c', '--csvfile',
|
||||
required=True,
|
||||
help='Filename to save the metrics')
|
||||
|
||||
utils.start(parser, globals())
|
||||
|
||||
|
||||
|
@ -7,6 +7,7 @@ import rethinkdb as r
|
||||
import bigchaindb
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.monitor import Monitor
|
||||
from bigchaindb.util import ProcessGroup
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -180,7 +181,9 @@ class Block(object):
|
||||
# add results to the queue
|
||||
for result in initial_results:
|
||||
q_initial.put(result)
|
||||
q_initial.put('stop')
|
||||
|
||||
for i in range(mp.cpu_count()):
|
||||
q_initial.put('stop')
|
||||
|
||||
return q_initial
|
||||
|
||||
@ -203,17 +206,21 @@ class Block(object):
|
||||
self._start()
|
||||
logger.info('exiting block module...')
|
||||
|
||||
def kill(self):
|
||||
for i in range(mp.cpu_count()):
|
||||
self.q_new_transaction.put('stop')
|
||||
|
||||
def _start(self):
|
||||
"""
|
||||
Initialize, spawn, and start the processes
|
||||
"""
|
||||
|
||||
# initialize the processes
|
||||
p_filter = mp.Process(name='filter_transactions', target=self.filter_by_assignee)
|
||||
p_validate = mp.Process(name='validate_transactions', target=self.validate_transactions)
|
||||
p_blocks = mp.Process(name='create_blocks', target=self.create_blocks)
|
||||
p_write = mp.Process(name='write_blocks', target=self.write_blocks)
|
||||
p_delete = mp.Process(name='delete_transactions', target=self.delete_transactions)
|
||||
p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee)
|
||||
p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions)
|
||||
p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks)
|
||||
p_write = ProcessGroup(name='write_blocks', target=self.write_blocks)
|
||||
p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions)
|
||||
|
||||
# start the processes
|
||||
p_filter.start()
|
||||
@ -222,9 +229,3 @@ class Block(object):
|
||||
p_write.start()
|
||||
p_delete.start()
|
||||
|
||||
# join processes
|
||||
p_filter.join()
|
||||
p_validate.join()
|
||||
p_blocks.join()
|
||||
p_write.join()
|
||||
p_delete.join()
|
||||
|
@ -16,6 +16,7 @@ import copy
|
||||
import json
|
||||
import logging
|
||||
import collections
|
||||
from functools import lru_cache
|
||||
|
||||
from pkg_resources import iter_entry_points, ResolutionError
|
||||
|
||||
@ -218,6 +219,7 @@ def autoconfigure(filename=None, config=None, force=False):
|
||||
set_config(newconfig) # sets bigchaindb.config
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def load_consensus_plugin(name=None):
|
||||
"""Find and load the chosen consensus plugin.
|
||||
|
||||
|
@ -31,7 +31,7 @@ A transaction is an operation between the `current_owner` and the `new_owner` ov
|
||||
and a _crypto fulfillment_ that satisfies a spending condition set on the unspent digital asset. A _fulfillment_
|
||||
is usually a signature proving the ownership of the digital asset.
|
||||
See [conditions and fulfillments](models.md#conditions-and-fulfillments)
|
||||
- `conditions`: List of conditions. Each _condition_ a _crypto condition_ that needs to be fulfilled by the
|
||||
- `conditions`: List of conditions. Each _condition_ is a _crypto condition_ that needs to be fulfilled by the
|
||||
new owner in order to spend the digital asset.
|
||||
See [conditions and fulfillments](models.md#conditions-and-fulfillments)
|
||||
- `operation`: String representation of the operation being performed (`CREATE`, `TRANSFER`, ...) this will define how
|
||||
|
@ -19,3 +19,7 @@ def speedtest_validate_transaction():
|
||||
b.validate_transaction(tx_signed)
|
||||
|
||||
profiler.print_stats()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
speedtest_validate_transaction()
|
||||
|
@ -714,8 +714,8 @@ class TestBigchainBlock(object):
|
||||
# run bootstrap
|
||||
initial_results = block.bootstrap()
|
||||
|
||||
# we should have gotten a queue with 100 results
|
||||
assert initial_results.qsize() - 1 == 100
|
||||
# we should have gotten a queue with 100 results minus the poison pills
|
||||
assert initial_results.qsize() - mp.cpu_count() == 100
|
||||
|
||||
def test_start(self, b, user_vk):
|
||||
# start with 100 transactions in the backlog and 100 in the changefeed
|
||||
@ -736,7 +736,9 @@ class TestBigchainBlock(object):
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
b.write_transaction(tx)
|
||||
new_transactions.put(tx)
|
||||
new_transactions.put('stop')
|
||||
|
||||
for i in range(mp.cpu_count()):
|
||||
new_transactions.put('stop')
|
||||
|
||||
# create a block instance
|
||||
block = Block(new_transactions)
|
||||
@ -744,6 +746,8 @@ class TestBigchainBlock(object):
|
||||
# start the block processes
|
||||
block.start()
|
||||
|
||||
time.sleep(6)
|
||||
|
||||
assert new_transactions.qsize() == 0
|
||||
assert r.table('backlog').count() == 0
|
||||
assert r.table('bigchain').count() == 2
|
||||
@ -755,20 +759,14 @@ class TestBigchainBlock(object):
|
||||
# create block instance
|
||||
block = Block(new_transactions)
|
||||
|
||||
# create block_process
|
||||
p_block = mp.Process(target=block.start)
|
||||
|
||||
# start block process
|
||||
p_block.start()
|
||||
block.start()
|
||||
|
||||
# wait for 6 seconds to give it time for an empty queue exception to occur
|
||||
time.sleep(6)
|
||||
|
||||
# send the poison pill
|
||||
new_transactions.put('stop')
|
||||
|
||||
# join the process
|
||||
p_block.join()
|
||||
block.kill()
|
||||
|
||||
def test_duplicated_transactions(self):
|
||||
pytest.skip('We may have duplicates in the initial_results and changefeed')
|
||||
|
@ -58,12 +58,15 @@ def test_load_consensus_plugin_raises_with_invalid_subclass(monkeypatch):
|
||||
# Monkeypatch entry_point.load to return something other than a
|
||||
# ConsensusRules instance
|
||||
from bigchaindb import config_utils
|
||||
import time
|
||||
monkeypatch.setattr(config_utils,
|
||||
'iter_entry_points',
|
||||
lambda *args: [type('entry_point', (object), {'load': lambda: object})])
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
config_utils.load_consensus_plugin()
|
||||
# Since the function is decorated with `lru_cache`, we need to
|
||||
# "miss" the cache using a name that has not been used previously
|
||||
config_utils.load_consensus_plugin(str(time.time()))
|
||||
|
||||
|
||||
def test_map_leafs_iterator():
|
||||
|
Loading…
x
Reference in New Issue
Block a user