Source code for pypushflow.StopActor

from typing import Optional

from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor


[docs] class StopActor(ThreadCountingActor): def __init__(self, parent=None, errorHandler=None, name="Stop actor", **kw): super().__init__(name=name, parent=parent, **kw) self.errorHandler = errorHandler self._outData = None def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None: if self.parent is not None and not isinstance(self.parent, Submodel.Submodel): # Parent is a Workflow self.outData = inData elif self.errorHandler is not None: self.errorHandler.errorHandler.stopActor.trigger(inData) else: self.outData = inData
[docs] def reset(self): self._outData = None
@property def outData(self): return self._outData @outData.setter def outData(self, data): if data is None: self._outData = None return if self._outData is None: self._outData = data return inerror = data.get("WorkflowException") outerror = self._outData.get("WorkflowException") if inerror and outerror: pass # keep the first error elif inerror: pass # keep the non-error result elif outerror: # keep the non-error result self._outData = data else: # keep the last result self._outData = data
[docs] def join(self, timeout=7200): if self.parent is not None: self.logger.debug("wait for scheduler threads to be finished") success = self._wait_threads_finished(timeout=timeout) if self.parent is not None: self.logger.debug("scheduler threads are finished") self._finalizeInMongo(success) return success
def _finalizeInMongo(self, success): if self.parent is None: return if success: self.logger.debug("finished") self.parent.endWorkflow("finished") else: self.logger.error("timeout detected") self.parent.endWorkflow("timeout")