Source code for pypushflow.ThreadCountingActor
from functools import wraps
from contextlib import contextmanager
from pypushflow.logutils import PyPushflowLoggedObject
from pypushflow.ActorInterface import ActorInterface
[docs]
def with_thread_context(trigger):
"""Wraps the `trigger` method of all derived classes of ThreadCountingActor"""
@wraps(trigger)
def wrapper(self, *args, **kw):
with self._thread_context():
return trigger(self, *args, **kw)
return wrapper
[docs]
def callback_with_end_thread(async_callback, end_thread, log_msg):
"""Wraps a async_callback"""
@wraps(async_callback)
def wrapper(*args, **kw):
try:
return async_callback(*args, **kw)
finally:
end_thread(msg=log_msg)
return wrapper
[docs]
class ThreadCountingActor(PyPushflowLoggedObject, ActorInterface):
"""The `trigger` method will increase the thread counter
at the start and decrease the thread counter at the end.
"""
def __init__(self, name=None, parent=None, thread_counter=None):
if name is None:
raise RuntimeError("Actor name is None!")
if thread_counter is None:
raise ValueError("Actor requires a 'thread_counter' argument")
super().__init__(log_metadata={"actor": name}, parent=parent)
self.name = name
self.parent = parent
if parent is not None:
parent.addActorRef(self)
self.__thread_counter = thread_counter
self.__in_thread_context = False
self.__postpone_end_thread = False
def __init_subclass__(subcls, **kw):
"""Wrap the `trigger` method"""
super().__init_subclass__(**kw)
subcls.trigger = with_thread_context(subcls.trigger)
@contextmanager
def _thread_context(self):
"""Re-entrant context manager that starts a thread
on first entrance and ends a thread on last exit,
unless the thread ending is post-poned until after
and async callback.
"""
if self.__in_thread_context:
yield
return
self.__thread_counter.start_thread(msg="Thread started for " + repr(self.name))
try:
self.__in_thread_context = True
self.__postpone_end_thread = False
try:
yield
finally:
self.__in_thread_context = False
finally:
if self.__postpone_end_thread:
self.__postpone_end_thread = False
else:
self.__thread_counter.end_thread(
msg="Thread ended for " + repr(self.name)
)
@contextmanager
def _postpone_end_thread(self, *async_callbacks):
"""Post-pone thread ending until after an async callback is executed.
Only one of the async callbacks is expected to be called.
"""
if self.__in_thread_context:
self.__postpone_end_thread = True
try:
async_callbacks = tuple(
callback_with_end_thread(
async_callback,
self.__thread_counter.end_thread,
"Thread ended for " + repr(self.name),
)
for async_callback in async_callbacks
)
yield async_callbacks
except BaseException:
if self.__in_thread_context:
self.__postpone_end_thread = False
raise
def _wait_threads_finished(self, **kw):
return self.__thread_counter.wait_threads_finished(**kw)