"""Parallelization utilities."""
import multiprocessing as mp
import multiprocessing.pool
import os
from typing import Any, Callable, Iterable, Optional
[docs]
def is_macos() -> bool:
"""Return True if this is running on macOS."""
return os.uname()[0].lower() == "darwin"
[docs]
def cpus(requested_num_processes: Optional[int] = None) -> int:
"""Return the usable number of worker processes for a multiprocessing Pool,
up to `requested_num_processes` (default = max available), considering SLURM
and any other environment-specific limitations.
`1` means do all work in-process rather than forking subprocesses.
On SLURM: This reads the environment variable 'SLURM_CPUS_PER_TASK'
containing the number of CPUs requested per task but since that's only set
if the --cpus-per-task option was specified, this falls back to
'SLURM_JOB_CPUS_PER_NODE' containing the number of processors available to
the job on this node.
By default, srun sets:
SLURM_CPUS_ON_NODE=1
SLURM_JOB_CPUS_PER_NODE=1
srun -p mcovert --cpus-per-task=2:
SLURM_CPUS_PER_TASK=2
SLURM_CPUS_ON_NODE=2
SLURM_JOB_CPUS_PER_NODE=2
srun --ntasks-per-node=3 --cpus-per-task=4:
SLURM_CPUS_PER_TASK=4
SLURM_CPUS_ON_NODE=12
SLURM_JOB_CPUS_PER_NODE=12
srun --ntasks-per-node=3:
SLURM_CPUS_ON_NODE=3
SLURM_JOB_CPUS_PER_NODE=3
Args:
requested_num_processes: the requested number of worker
processes; None or 0 means return the max usable number
Returns:
num_cpus: the usable number of worker processes for a Pool, as limited
by the hardware, OS, SLURM, and `requested_num_processes`.
==> 1 means DO NOT CREATE WORKER SUBPROCESSES.
See also `pool()`.
See https://slurm.schedmd.com/sbatch.html
See https://github.com/CovertLab/wcEcoli/issues/392
"""
os_cpus = mp.cpu_count()
value = os.environ.get(
"SLURM_CPUS_PER_TASK", os.environ.get("SLURM_JOB_CPUS_PER_NODE", os_cpus)
)
slurm_cpus = int(value)
available = min(os_cpus, slurm_cpus)
if requested_num_processes is not None and requested_num_processes > 0:
if requested_num_processes > available:
print(
"Warning: Request for {} worker processes got limited to {}".format(
requested_num_processes, available
)
)
elif requested_num_processes < available:
available = requested_num_processes
return available
[docs]
class ApplyResult(object):
"""
A substitute for multiprocessing.ApplyResult() to return with apply_async.
Will get created after a successful function call so ready() and
successful() are always True.
"""
def __init__(self, result):
self._result = result
[docs]
def ready(self):
return True
[docs]
def successful(self):
return True
[docs]
def wait(self, timeout=None):
pass
[docs]
def get(self, timeout=None):
return self._result
[docs]
class InlinePool(object):
"""
A substitute for multiprocessing.Pool() that runs the work inline in the
current process. This is important because (1) a regular Pool worker cannot
construct a nested Pool (even with processes=1) since daemon processes are
not allowed to have children, and (2) it's easier to debug code running in
the main process.
"""
[docs]
def map(
self,
func: Callable[..., Any],
iterable: Iterable[Any],
chunksize: Optional[int] = None,
) -> list:
"""Map the function over the iterable."""
return list(map(func, iterable))
[docs]
def apply_async(
self,
func: Callable[..., Any],
args: Iterable[Any] = (),
kwds: Optional[dict[str, Any]] = None,
callback: Optional[Callable[..., None]] = None,
) -> ApplyResult:
"""
Apply the function to the args serially (not asynchronously since
only one process available).
"""
if kwds is None:
kwds = {}
result = func(*args, **kwds)
if callback:
callback(result)
return ApplyResult(result)
[docs]
def terminate(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
[docs]
class NoDaemonProcess(mp.Process):
@property # type: ignore[override]
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
[docs]
class NoDaemonContext(type(mp.get_context())): # type: ignore
Process = NoDaemonProcess
[docs]
class NoDaemonPool(mp.pool.Pool):
"""
A substitute for multiprocessing.Pool() that creates a pool that is not a
daemonic process. This allows for nesting pool calls that would otherwise
be prevented with an assertion error (AssertionError: daemonic processes
are not allowed to have children).
"""
def __init__(self, *args, **kwargs):
kwargs["context"] = NoDaemonContext()
super().__init__(*args, **kwargs)
[docs]
def pool(
num_processes: Optional[int] = None, nestable: bool = False
) -> mp.pool.Pool | InlinePool:
"""Return an `InlinePool` if `cpus(num_processes) == 1`, else a
multiprocessing `Pool(cpus(num_processes))`, as suitable for the current
runtime environment.
This uses the 'spawn' process start method to create a fresh python
interpreter process, avoiding threading problems and cross-platform
inconsistencies.
nestable can create a pool of non-daemon worker processes that can spawn
nested processes and have have nested pools.
See `cpus()` on figuring the number of usable processes.
See `InlinePool` about why running in-process is important.
"""
usable = cpus(num_processes)
if usable == 1:
return InlinePool()
elif nestable:
return NoDaemonPool()
else:
return mp.get_context(method="spawn").Pool(processes=usable)