Source code for pypushflow.tests.conftest
import os
import pytest
import psutil
import logging
logger = logging.getLogger(__name__)
[docs]
@pytest.fixture()
def workflow_cleanup():
current_process = psutil.Process()
subprocesses = {child.pid for child in current_process.children(recursive=True)}
fail_on_subprocesses = True
yield
has_new_subprocesses = False
for child in current_process.children(recursive=True):
if child.pid not in subprocesses:
has_new_subprocesses = True
logger.error("dangling child %s", child)
try:
child.terminate()
except psutil.NoSuchProcess:
pass
if has_new_subprocesses and fail_on_subprocesses:
raise RuntimeError("not all subprocesses have finished")
[docs]
def gevent_patched() -> bool:
try:
from gevent.monkey import is_anything_patched
except ImportError:
return False
return is_anything_patched()
[docs]
@pytest.fixture(scope="session")
def skip_when_gevent():
if gevent_patched():
pytest.skip("does not work with 'gevent' monkey patching")
[docs]
@pytest.fixture(scope="session")
def skip_on_windows():
if os.name == "nt":
pytest.skip("does not work on windows")