Source code for pypushflow.tests.concurrent.test_apply_async
import pytest
from . import utils
[docs]
def add(a, b):
return a + b
[docs]
def error(a, b):
raise RuntimeError("intentional error")
FUNCTIONS = {"add": add, "error": error}
[docs]
@pytest.mark.parametrize("scaling", [True, False])
@pytest.mark.parametrize("max_workers", [None, 1])
@pytest.mark.parametrize("pool_type", utils.POOLS)
@pytest.mark.parametrize("func", ["add", "error"])
def test_apply_async(scaling, max_workers, pool_type, func):
callback_event = utils.Event()
failed_msg = ""
def result_callback(return_value):
nonlocal failed_msg
try:
if return_value != 2:
failed_msg = f"{return_value} != 2"
finally:
callback_event.set()
def error_callback(exception):
nonlocal failed_msg
try:
if not isinstance(exception, RuntimeError):
failed_msg = f"{exception} is not a RuntimeError"
elif str(exception) != "intentional error":
failed_msg = f"'{exception}' != 'intentional error'"
finally:
callback_event.set()
with utils.pool_context(scaling, pool_type, max_workers=max_workers) as pool:
pool.apply_async(
FUNCTIONS[func],
args=(1, 1),
callback=result_callback,
error_callback=error_callback,
)
callback_event.wait(10)
assert not failed_msg, failed_msg