Source code for pypushflow.tests.concurrent.test_interrupt

import os
import time
from numbers import Number
import pytest
from . import utils
from ...concurrent.interrupt import StopPypushflowTask


[docs] def create_file(sleep_time: Number, filename: str) -> str: time.sleep(sleep_time) with open(filename, "w"): pass return filename
[docs] @pytest.mark.parametrize("scaling", [True, False]) @pytest.mark.parametrize("max_workers", [None, 1]) @pytest.mark.parametrize("pool_type", utils.POOLS) def test_interrupt(scaling, max_workers, pool_type, tmpdir): if pool_type == "thread": pytest.skip("threads cannot be interrupted") if os.name == "nt" and pool_type in ( "process", "ndprocess", "multiprocessing", "ndmultiprocessing", "billiard", ): pytest.skip("not supported on windows") callback_event = utils.Event() exception = None result = None def reset(): nonlocal result nonlocal exception result = None exception = None callback_event.reset() def callback(r): nonlocal result result = r callback_event.set() def error_callback(e): nonlocal exception exception = e callback_event.set() def run_normal(filename): reset() pool.apply_async( create_file, args=(0, str(filename)), callback=callback, error_callback=error_callback, ) callback_event.wait(10) assert result == str(filename) assert filename.exists() def run_interrupt(filename): reset() pool.apply_async( create_file, args=(5, str(filename)), callback=callback, error_callback=error_callback, ) time.sleep(2) pool.interrupt() callback_event.wait(10) assert isinstance(exception, StopPypushflowTask), str(type(exception)) assert not filename.exists() with utils.pool_context(scaling, pool_type, max_workers=max_workers) as pool: run_normal(tmpdir / "test1.txt") run_interrupt(tmpdir / "test2.txt") run_interrupt(tmpdir / "test3.txt") run_normal(tmpdir / "test4.txt")