import pprint
import threading
import uuid
import warnings
from contextlib import contextmanager
from functools import wraps
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Tuple
from pypushflow.ActorInterface import ActorInterface
from pypushflow.logutils import PyPushflowLoggedObject
from pypushflow.ThreadCounter import ThreadCounter
CallbackType = Callable[..., Any]
[docs]
class ThreadCountingActor(PyPushflowLoggedObject, ActorInterface):
"""The `trigger` method will increase the thread counter
at the start and decrease the thread counter at the end.
Note: *workflow scheduler threads* have nothing to do with *system threads*.
"""
def __init__(
self,
name: Optional[str] = None,
parent=None,
thread_counter: Optional[ThreadCounter] = None,
):
if name is None:
raise RuntimeError("Actor name is None!")
if thread_counter is None:
raise ValueError("Actor requires a 'thread_counter' argument")
super().__init__(log_metadata={"actor": name}, parent=parent)
self.name = name
self.parent = parent
if parent is not None:
parent.addActorRef(self)
self.__thread_counter = thread_counter
self.__scopes: Dict[str, _TriggerScope] = {}
self.__scopes_lock = threading.Lock()
[docs]
def trigger(self, inData: dict, **kwargs):
"""This method
- starts a scheduler thread (increments the thread counter),
- calls ``ActorInterface._execute`` (actor-specific logic + triggering downstream actors),
- ends the scheduler thread (decrements the thread counter).
When ``ActorInterface._execute`` is asynchronous, the scheduler thread needs to be
deferred and executed as a callback once the asynchronous job completes.
"""
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
if "_scope_id" in kwargs:
raise RuntimeError("'_scope_id' is a reserved parameter name")
with _TriggerScope(
self.name,
self.__thread_counter,
self.__register_scope,
self.__unregister_scope,
) as scope:
self._execute(inData, _scope_id=scope.id, **kwargs)
def __register_scope(self, scope: "_TriggerScope"):
with self.__scopes_lock:
self.__scopes[scope.id] = scope
def __get_scope(self, scope_id: str) -> "_TriggerScope":
with self.__scopes_lock:
scope = self.__scopes.get(scope_id)
if scope is None:
raise RuntimeError(f"Unknown actor trigger scope {scope_id}")
return scope
def __unregister_scope(self, scope: "_TriggerScope"):
with self.__scopes_lock:
_ = self.__scopes.pop(scope.id, None)
@contextmanager
def _async_execute_context(
self,
scope_id: str,
async_success_callback: CallbackType,
async_failure_callback: CallbackType,
) -> Generator[Tuple[CallbackType, CallbackType], None, None]:
"""
Asynchronous actors use callbacks to defer triggering of
downstream actors till the asynchronous job is finished.
This method wraps the callbacks with a context that
- finalizes the scheduler thread started but `trigger`,
- ensure only one callback gets called.
"""
scope = self.__get_scope(scope_id)
with scope.async_execute_context():
async_callbacks = (async_success_callback, async_failure_callback)
wrapped = tuple(_wrap_async_callback(cb, scope) for cb in async_callbacks)
yield wrapped
def _wait_threads_finished(self, **kw):
return self.__thread_counter.wait_threads_finished(**kw)
def _wrap_async_callback(cb: CallbackType, scope: "_TriggerScope") -> CallbackType:
"""
Wrap a context around a asynchronous actor callback.
"""
@wraps(cb)
def callback_wrapper(*args, **kw):
with scope.callback_context(cb) as first_call:
if first_call:
return cb(*args, **kw)
return None
return callback_wrapper
class _TriggerScope:
"""
Manage the scope of a single `trigger` call.
"""
def __init__(
self,
name: str,
thread_counter: ThreadCounter,
register_scope: Callable[["_TriggerScope"], None],
unregister_scope: Callable[["_TriggerScope"], None],
):
self._id = str(uuid.uuid4())
self._name = name
self._thread_counter = thread_counter
self._lock = threading.Lock()
self._defer_finalization = False
self._finialized = False
self._async_callback: Optional[CallbackType] = None
self._register_scope = register_scope
self._unregister_scope = unregister_scope
@property
def id(self) -> str:
return self._id
@contextmanager
def async_execute_context(self) -> Generator[None, None, None]:
"""
Wrap the asynchronous actor execute call.
"""
with self._lock:
if self._defer_finalization:
# Mistake in the actor implementation logic
raise RuntimeError(
"async_execute_context can be used only once in a trigger scope"
)
self._defer_finalization = True
try:
yield
except BaseException:
# The asynchronous calls themsselves might have failed to start
# in which case callbacks might not be called at all so do the
# finilization synchronously when exiting the `trigger` call.
with self._lock:
self._defer_finalization = False
raise
@contextmanager
def callback_context(
self, async_callback: CallbackType
) -> Generator[bool, None, None]:
"""
Wrap a callback of the asynchronous actor execute call.
"""
with self._lock:
if self._async_callback is None:
first_call = True
self._async_callback = async_callback
else:
first_call = False
# Reasons multiple callback get called:
# - execution pool does not ensure a single call
# - mistake in the actor implementation logic
warnings.warn(
f"callback {async_callback} ignored because "
f"{self._async_callback} was already called "
f"for {self._name!r} [{self._id}]",
RuntimeWarning,
stacklevel=2,
)
try:
yield first_call
finally:
if first_call:
self._deferred_exit()
def __enter__(self):
self._thread_counter.start_thread(
msg=f"Thread started for {self._name!r} [{self._id}]"
)
self._register_scope(self)
return self
def __exit__(self, exc_type, exc, tb):
with self._lock:
if not self._defer_finalization:
self._try_finalization()
return False
def _deferred_exit(self) -> None:
with self._lock:
if self._defer_finalization:
self._try_finalization()
def _try_finalization(self) -> None:
# Skip and emit warning if finalization already happened
if self._finialized:
warnings.warn(
f"Thread ended again for {self._name!r} [{self._id}]",
RuntimeWarning,
stacklevel=2,
)
return
# Finialize
self._unregister_scope(self)
self._thread_counter.end_thread(
msg=f"Thread ended for {self._name!r} [{self._id}]"
)
self._finialized = True