Source code for pypushflow.concurrent.interrupt.process
"""Utilities to terminate process pool tasks"""
import os
import signal
from contextlib import contextmanager
from . import StopPypushflowTask
_TERMINATE_SIGNAL = signal.SIGINT
[docs]
def task_main(fn, *args, **kwargs):
with _task_context():
return fn(*args, **kwargs)
[docs]
def interrupt_task(pid) -> None:
# TODO: on windows this kills the subprocess
# despite that fact that we handle the
# signal in the subprocess
try:
os.kill(pid, _TERMINATE_SIGNAL)
except ProcessLookupError:
pass # process already ended
[docs]
def worker_initializer():
_set_terminate_handler(signal.SIG_IGN)
@contextmanager
def _task_context():
def stop_task_handler(signum, frame):
nonlocal old_handler
if old_handler is None:
old_handler = signal.SIG_IGN
signal.signal(signum, old_handler)
raise KeyboardInterrupt(f"stop task due to signal {signum}")
old_handler = _set_terminate_handler(stop_task_handler)
try:
yield
except KeyboardInterrupt as e:
raise StopPypushflowTask(str(e)) from None
finally:
_set_terminate_handler(old_handler)
def _set_terminate_handler(handler):
try:
return signal.signal(_TERMINATE_SIGNAL, handler)
except (OSError, AttributeError, ValueError, RuntimeError):
pass