Source code for pypushflow.tests.concurrent.utils

import gc
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor as CfPool
from contextlib import contextmanager
from multiprocessing import get_all_start_methods
from multiprocessing import get_context as get_mp_context
from multiprocessing.pool import Pool as MpPool
from typing import Any
from typing import Optional
from typing import Tuple

import pytest

try:
    from billiard import get_context as get_billiard_context
    from billiard.pool import Pool as BPool
except ImportError:
    BPool = None
    BProcess = None

from ...concurrent import exceptions
from ...concurrent import get_pool
from ...concurrent.factory import _POOLS

CONTEXTS = [None] + get_all_start_methods()
POOLS = [s for s in _POOLS if s and s != "scaling"]
TIMEOUT = 10


[docs] @contextmanager def pool_context(scaling, pool_type, **pool_options): cpool_type = pool_type if scaling: cpool_type = "scaling" try: get_pool(pool_type) except ImportError as e: pytest.skip(str(e)) try: pool_cls = get_pool(cpool_type) except ImportError as e: pytest.skip(str(e)) pool_options.setdefault("wait_on_exit_timeout", TIMEOUT) try: with pool_cls(pool_type=pool_type, **pool_options) as pool: yield pool finally: # Avoid implicit finalizer GC during pool shutdown # which can cause ReentrantCallError which leads # to a UserWarning warning about a potential object leak. # # It is unclear how this actually solves the issue. # # An alternative "solution" is to just ignore the warning: # # @pytest.mark.filterwarnings("ignore:ResourceTracker called reentrantly:UserWarning") # gc.collect()
[docs] def assert_callback(pool, name): libname, fn, result = SUCCESS[name] if fn is None: pytest.skip(f"requires {libname}") value, exception = _submit_with_cb(pool, fn) if exception is not None: raise exception assert value == result, str(value)
[docs] def assert_error_callback(pool, name): libname, fn, result = FAILURE[name] if fn is None: pytest.skip(f"requires {libname}") value, exception = _submit_with_cb(pool, fn) assert value is None assert str(exception) == str(result), str(value) assert isinstance(exception, type(result)) tb = exceptions.serialize_exception(exception)["traceBack"] msg = "\n\n" + "".join(tb) assert any("in _fn_failure" in s for s in tb), msg assert any('raise RuntimeError("expected failure")' in s for s in tb), msg
def _submit_with_cb(pool, fn) -> Tuple[Any, Optional[Exception]]: value = None exception = None ev = Event() def cb(result): nonlocal value value = result ev.set() def ecb(result): nonlocal exception exception = result ev.set() pool.apply_async(fn, callback=cb, error_callback=ecb) assert ev.wait(timeout=TIMEOUT) return value, exception def _fn_success() -> int: return 10 def _fn_failure(): raise RuntimeError("expected failure") def _fn_subprocess_success() -> int: return int(subprocess.check_output(["echo", "10"]).decode().strip()) def _fn_pool_success(pool_cls) -> int: with pool_cls() as pool: return pool.apply(_fn_success) def _fn_process_success(process_ctx) -> int: with process_ctx.Manager() as manager: result = manager.Value("i", 0) p = process_ctx.Process(target=_fn_process_success_target, args=(result,)) p.start() p.join() return result.value def _fn_process_success_target(result) -> None: result.value = 10 def _fn_mppool_success() -> int: return _fn_pool_success(MpPool) def _fn_mpprocess_success() -> int: return _fn_process_success(get_mp_context()) def _fn_cfpool_success() -> int: with CfPool() as pool: return pool.submit(_fn_success).result()
[docs] class Event: def __init__(self) -> None: self._is_set = False
[docs] def set(self): self._is_set = True
[docs] def wait(self, timeout=None): t0 = time.time() while not self._is_set: if timeout is not None: if (time.time() - t0) > timeout: return False time.sleep(0.2) return True
[docs] def reset(self): self._is_set = False
if BPool is None: _fn_bpool_success = None _fn_bprocess_success = None else: def _fn_bpool_success() -> int: return _fn_pool_success(BPool) def _fn_bprocess_success() -> int: return _fn_process_success(get_billiard_context()) SUCCESS = { "simple": ("builtins", _fn_success, 10), "subprocess": ("subprocess", _fn_subprocess_success, 10), "cfpool": ("concurrent.futures", _fn_cfpool_success, 10), "mppool": ("multiprocessing", _fn_mppool_success, 10), "mpprocess": ("multiprocessing", _fn_mpprocess_success, 10), "bpool": ("billiard", _fn_bpool_success, 10), "bprocess": ("billiard", _fn_bprocess_success, 10), } FAILURE = {"simple": ("builtins", _fn_failure, RuntimeError("expected failure"))}