Source code for pypushflow.concurrent.multiprocessing
import logging
import sys
from multiprocessing import get_context
from multiprocessing.pool import AsyncResult as Future
from multiprocessing.pool import Pool as _Pool
from numbers import Number
from typing import Callable
from typing import Optional
from . import base
from .interrupt import process as interrupt
logger = logging.getLogger(__name__)
[docs]
class MProcessPool(base.BasePool):
"""Pool of daemonic processes (they cannot have sub-processes)."""
def __init__(
self, context: str = None, max_workers: Optional[int] = None, **kw
) -> None:
kwargs = dict()
if isinstance(context, str) or None:
context = get_context(context)
logger.info(f"pypushflow process pool context: '{type(context).__name__}'")
kwargs["context"] = context
if max_workers is not None:
kwargs["processes"] = max_workers
self._pool = _initialize_pool(kwargs)
super().__init__(**kw)
def __enter__(self):
self._pool.__enter__()
return super().__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
self._pool.__exit__(exc_type, exc_val, exc_tb)
[docs]
def shutdown(
self,
block: bool = False,
timeout: Optional[Number] = None,
interrupt: bool = False,
) -> None:
super().shutdown(block=block, timeout=timeout, interrupt=interrupt)
self._pool.terminate()
[docs]
def close(self):
self._pool.close()
[docs]
def join(self, timeout: Optional[Number] = None) -> bool:
self._pool.join()
return True
[docs]
def interrupt(self) -> None:
for p in list(self._pool._pool):
interrupt.interrupt_task(p.pid)
[docs]
def apply_async(
self,
fn: Callable,
callback: Optional[Callable] = None,
error_callback: Optional[Callable] = None,
args=tuple(),
kwargs=None,
) -> Future:
if kwargs is None:
kwargs = dict()
if args is None:
args = (fn,)
else:
args = (fn,) + args
return self._pool.apply_async(
interrupt.task_main,
args=args,
kwds=kwargs,
callback=callback,
error_callback=error_callback,
)
def _initialize_pool(kwargs) -> _Pool:
if sys.version_info >= (3, 7):
return _Pool(initializer=interrupt.worker_initializer, **kwargs)
pool = _Pool(**kwargs)
pool.apply(_worker_initializer, args=(0,))
pool.map(_worker_initializer, [1] * len(pool._pool))
return pool
def _worker_initializer(seconds):
from time import sleep
interrupt.worker_initializer()
sleep(seconds)