Source code for pypushflow.stop_workflows
"""Utilities to stop workflows on certain signals"""
import os
import signal
from weakref import WeakKeyDictionary
from typing import Optional, Sequence
from contextlib import contextmanager
if os.name == "nt":
DEFAULT_STOP_SIGNALS = (signal.SIGINT,)
else:
DEFAULT_STOP_SIGNALS = (signal.SIGTERM,)
[docs]
class StopPypushflowWorkflow(Exception):
pass
[docs]
@contextmanager
def stop_on_signals_context(workflow, stop_signals: Optional[Sequence] = None):
if workflow in _WORKFLOWS:
yield
return
if stop_signals is None:
stop_signals = DEFAULT_STOP_SIGNALS
_WORKFLOWS[workflow] = stop_signals
try:
for signum in stop_signals:
_stop_workflows_on_signal(signum)
yield
finally:
_WORKFLOWS.pop(workflow)
_reset_signals_handlers()
_WORKFLOWS = WeakKeyDictionary()
_OLD_HANDLERS = dict()
def _stop_workflows_on_signal(signum):
if signum in _OLD_HANDLERS:
return
def stop_workflows_handler(signum, frame):
for workflow, stop_signals in list(_WORKFLOWS.items()):
if signum not in stop_signals:
continue
workflow.stop(f"stop workflow due to signal {signum}")
_OLD_HANDLERS[signum] = _set_handler(signum, stop_workflows_handler)
def _reset_signals_handlers():
keep = {signum for lst in list(_WORKFLOWS.values()) for signum in lst}
remove = set(_OLD_HANDLERS) - keep
for signum in remove:
old_handler = _OLD_HANDLERS.pop(signum)
_set_handler(signum, old_handler)
def _set_handler(signum, handler):
try:
old_handler = signal.signal(signum, handler)
except (OSError, AttributeError, ValueError, RuntimeError):
pass
if old_handler is None:
return signal.SIG_IGN
return old_handler