xmaking multprocessing usage explicit and easily identifiable

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-01-17 21:52:30 +01:00
parent 4472a1a3ee
commit 41b965e13b
No known key found for this signature in database
5 changed files with 17 additions and 17 deletions

View File

@ -10,7 +10,7 @@ for ``argparse.ArgumentParser``.
import argparse import argparse
import builtins import builtins
import functools import functools
import multiprocessing as mp from multiprocessing import cpu_count
import sys import sys
import planetmint import planetmint
import planetmint.config_utils import planetmint.config_utils
@ -132,7 +132,7 @@ def start(parser, argv, scope):
if args.multiprocess is False: if args.multiprocess is False:
args.multiprocess = 1 args.multiprocess = 1
elif args.multiprocess is None: elif args.multiprocess is None:
args.multiprocess = mp.cpu_count() args.multiprocess = cpu_count()
return func(args) return func(args)

View File

@ -5,7 +5,7 @@
from queue import Empty from queue import Empty
from collections import defaultdict from collections import defaultdict
from multiprocessing import Queue import multiprocessing
POISON_PILL = "POISON_PILL" POISON_PILL = "POISON_PILL"
@ -46,8 +46,8 @@ class Exchange:
"""Dispatch events to subscribers.""" """Dispatch events to subscribers."""
def __init__(self): def __init__(self):
self.publisher_queue = Queue() self.publisher_queue = multiprocessing.Queue()
self.started_queue = Queue() self.started_queue = multiprocessing.Queue()
# Map <event_types -> queues> # Map <event_types -> queues>
self.queues = defaultdict(list) self.queues = defaultdict(list)
@ -80,7 +80,7 @@ class Exchange:
if event_types is None: if event_types is None:
event_types = EventTypes.ALL event_types = EventTypes.ALL
queue = Queue() queue = multiprocessing.Queue()
self.queues[event_types].append(queue) self.queues[event_types].append(queue)
return queue return queue

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0 # Code is Apache-2.0 and docs are CC-BY-4.0
import multiprocessing as mp import multiprocessing
from collections import defaultdict from collections import defaultdict
from planetmint import App from planetmint import App
@ -44,17 +44,17 @@ EXIT = "exit"
class ParallelValidator: class ParallelValidator:
def __init__(self, number_of_workers=mp.cpu_count()): def __init__(self, number_of_workers=multiprocessing.cpu_count()):
self.number_of_workers = number_of_workers self.number_of_workers = number_of_workers
self.transaction_index = 0 self.transaction_index = 0
self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)] self.routing_queues = [multiprocessing.Queue() for _ in range(self.number_of_workers)]
self.workers = [] self.workers = []
self.results_queue = mp.Queue() self.results_queue = multiprocessing.Queue()
def start(self): def start(self):
for routing_queue in self.routing_queues: for routing_queue in self.routing_queues:
worker = ValidationWorker(routing_queue, self.results_queue) worker = ValidationWorker(routing_queue, self.results_queue)
process = mp.Process(target=worker.run) process = multiprocessing.Process(target=worker.run)
process.start() process.start()
self.workers.append(process) self.workers.append(process)

View File

@ -6,7 +6,7 @@
import contextlib import contextlib
import threading import threading
import queue import queue
import multiprocessing as mp import multiprocessing
import json import json
import setproctitle import setproctitle
@ -19,7 +19,7 @@ from transactions.common.crypto import key_pair_from_ed25519_key
class ProcessGroup(object): class ProcessGroup(object):
def __init__(self, concurrency=None, group=None, target=None, name=None, args=None, kwargs=None, daemon=None): def __init__(self, concurrency=None, group=None, target=None, name=None, args=None, kwargs=None, daemon=None):
self.concurrency = concurrency or mp.cpu_count() self.concurrency = concurrency or multiprocessing.cpu_count()
self.group = group self.group = group
self.target = target self.target = target
self.name = name self.name = name
@ -30,7 +30,7 @@ class ProcessGroup(object):
def start(self): def start(self):
for i in range(self.concurrency): for i in range(self.concurrency):
proc = mp.Process( proc = multiprocessing.Process(
group=self.group, group=self.group,
target=self.target, target=self.target,
name=self.name, name=self.name,
@ -42,7 +42,7 @@ class ProcessGroup(object):
self.processes.append(proc) self.processes.append(proc)
class Process(mp.Process): class Process(multiprocessing.Process):
"""Wrapper around multiprocessing.Process that uses """Wrapper around multiprocessing.Process that uses
setproctitle to set the name of the process when running setproctitle to set the name of the process when running
the target task. the target task.

View File

@ -9,7 +9,7 @@ The application is implemented in Flask and runs using Gunicorn.
""" """
import copy import copy
import multiprocessing from multiprocessing import cpu_count
import gunicorn.app.base import gunicorn.app.base
from flask import Flask from flask import Flask
@ -102,7 +102,7 @@ def create_server(settings, log_config=None, planetmint_factory=None):
settings = copy.deepcopy(settings) settings = copy.deepcopy(settings)
if not settings.get("workers"): if not settings.get("workers"):
settings["workers"] = (multiprocessing.cpu_count() * 2) + 1 settings["workers"] = (cpu_count() * 2) + 1
if not settings.get("threads"): if not settings.get("threads"):
# Note: Threading is not recommended currently, as the frontend workload # Note: Threading is not recommended currently, as the frontend workload