From 41b965e13b5a90833491759fde07ea975ee2d1e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Eckel?= Date: Tue, 17 Jan 2023 21:52:30 +0100 Subject: [PATCH] xmaking multprocessing usage explicit and easily identifiable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jürgen Eckel --- planetmint/commands/utils.py | 4 ++-- planetmint/events.py | 8 ++++---- planetmint/parallel_validation.py | 10 +++++----- planetmint/utils.py | 8 ++++---- planetmint/web/server.py | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/planetmint/commands/utils.py b/planetmint/commands/utils.py index 6c9a9b7..fb0bd7f 100644 --- a/planetmint/commands/utils.py +++ b/planetmint/commands/utils.py @@ -10,7 +10,7 @@ for ``argparse.ArgumentParser``. import argparse import builtins import functools -import multiprocessing as mp +from multiprocessing import cpu_count import sys import planetmint import planetmint.config_utils @@ -132,7 +132,7 @@ def start(parser, argv, scope): if args.multiprocess is False: args.multiprocess = 1 elif args.multiprocess is None: - args.multiprocess = mp.cpu_count() + args.multiprocess = cpu_count() return func(args) diff --git a/planetmint/events.py b/planetmint/events.py index 6157138..cd9a2f8 100644 --- a/planetmint/events.py +++ b/planetmint/events.py @@ -5,7 +5,7 @@ from queue import Empty from collections import defaultdict -from multiprocessing import Queue +import multiprocessing POISON_PILL = "POISON_PILL" @@ -46,8 +46,8 @@ class Exchange: """Dispatch events to subscribers.""" def __init__(self): - self.publisher_queue = Queue() - self.started_queue = Queue() + self.publisher_queue = multiprocessing.Queue() + self.started_queue = multiprocessing.Queue() # Map queues> self.queues = defaultdict(list) @@ -80,7 +80,7 @@ class Exchange: if event_types is None: event_types = EventTypes.ALL - queue = Queue() + queue = multiprocessing.Queue() self.queues[event_types].append(queue) return queue diff --git a/planetmint/parallel_validation.py b/planetmint/parallel_validation.py index e33436d..00a2b6e 100644 --- a/planetmint/parallel_validation.py +++ b/planetmint/parallel_validation.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # Code is Apache-2.0 and docs are CC-BY-4.0 -import multiprocessing as mp +import multiprocessing from collections import defaultdict from planetmint import App @@ -44,17 +44,17 @@ EXIT = "exit" class ParallelValidator: - def __init__(self, number_of_workers=mp.cpu_count()): + def __init__(self, number_of_workers=multiprocessing.cpu_count()): self.number_of_workers = number_of_workers self.transaction_index = 0 - self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)] + self.routing_queues = [multiprocessing.Queue() for _ in range(self.number_of_workers)] self.workers = [] - self.results_queue = mp.Queue() + self.results_queue = multiprocessing.Queue() def start(self): for routing_queue in self.routing_queues: worker = ValidationWorker(routing_queue, self.results_queue) - process = mp.Process(target=worker.run) + process = multiprocessing.Process(target=worker.run) process.start() self.workers.append(process) diff --git a/planetmint/utils.py b/planetmint/utils.py index d62d73f..59e6d54 100644 --- a/planetmint/utils.py +++ b/planetmint/utils.py @@ -6,7 +6,7 @@ import contextlib import threading import queue -import multiprocessing as mp +import multiprocessing import json import setproctitle @@ -19,7 +19,7 @@ from transactions.common.crypto import key_pair_from_ed25519_key class ProcessGroup(object): def __init__(self, concurrency=None, group=None, target=None, name=None, args=None, kwargs=None, daemon=None): - self.concurrency = concurrency or mp.cpu_count() + self.concurrency = concurrency or multiprocessing.cpu_count() self.group = group self.target = target self.name = name @@ -30,7 +30,7 @@ class ProcessGroup(object): def start(self): for i in range(self.concurrency): - proc = mp.Process( + proc = multiprocessing.Process( group=self.group, target=self.target, name=self.name, @@ -42,7 +42,7 @@ class ProcessGroup(object): self.processes.append(proc) -class Process(mp.Process): +class Process(multiprocessing.Process): """Wrapper around multiprocessing.Process that uses setproctitle to set the name of the process when running the target task. diff --git a/planetmint/web/server.py b/planetmint/web/server.py index 8e86026..2f49dac 100644 --- a/planetmint/web/server.py +++ b/planetmint/web/server.py @@ -9,7 +9,7 @@ The application is implemented in Flask and runs using Gunicorn. """ import copy -import multiprocessing +from multiprocessing import cpu_count import gunicorn.app.base from flask import Flask @@ -102,7 +102,7 @@ def create_server(settings, log_config=None, planetmint_factory=None): settings = copy.deepcopy(settings) if not settings.get("workers"): - settings["workers"] = (multiprocessing.cpu_count() * 2) + 1 + settings["workers"] = (cpu_count() * 2) + 1 if not settings.get("threads"): # Note: Threading is not recommended currently, as the frontend workload