Source code for pypushflow.tests.workflowTestCase
import logging
import unittest
import psutil
logger = logging.getLogger(__name__)
[docs]
class WorkflowTestCase(unittest.TestCase):
[docs]
def setUp(self) -> None:
self._current_process = psutil.Process()
self._subprocesses = {
child.pid for child in self._current_process.children(recursive=True)
}
self._fail_on_subprocesses = True
[docs]
def tearDown(self) -> None:
has_new_subprocesses = False
for child in self._current_process.children(recursive=True):
if child.pid not in self._subprocesses:
has_new_subprocesses = True
logger.error("dangling child %s", child)
try:
child.terminate()
except psutil.NoSuchProcess:
pass
if has_new_subprocesses and self._fail_on_subprocesses:
raise RuntimeError("not all subprocesses have finished")