Source code for pypushflow.PythonActor

import os
import pprint
import datetime
import importlib

from .AbstractActor import AbstractActor
from .concurrent import exceptions


[docs] class PythonActor(AbstractActor): def __init__( self, parent=None, name="Python Actor", errorHandler=None, script=None, **kw ): super().__init__(parent=parent, name=name, **kw) self.parentErrorHandler = errorHandler self.listErrorHandler = [] self.script = script self.inData = None
[docs] def connectOnError(self, actor): self.logger.debug("connect to error handler '%s'", actor.name) self.listErrorHandler.append(actor)
[docs] def trigger(self, inData: dict): self.logger.info("triggered with inData =\n %s", pprint.pformat(inData)) self.setStarted() self.inData = dict(inData) self.uploadInDataToMongo(actorData={"inData": inData}, script=self.script) try: module = importlib.import_module(os.path.splitext(self.script)[0]) except Exception as e: self.logger.error("Error when trying to import script '%s'", self.script) self.errorHandler(e) return with self._postpone_end_thread(self.resultHandler, self.errorHandler) as ( resultHandler, errorHandler, ): target = module.run self.logger.debug( "%s execution of '%s.%s'", "synchronous" if self.pool is None else "asynchronous", target.__module__, target.__name__, ) args = {k: v for k, v in self.inData.items() if not isinstance(k, str)} kwargs = {k: v for k, v in self.inData.items() if isinstance(k, str)} if args: args = tuple(args.get(i) for i in range(max(args) + 1)) else: args = tuple() if self.parent is not None: stop_exception = self.parent.stop_exception if stop_exception: self.logger.error(str(stop_exception)) errorHandler(stop_exception) return if self.pool is None: try: result = target(*args, **kwargs) except BaseException as e: errorHandler(e) else: resultHandler(result) else: try: self.pool.apply_async( target, args=args, kwargs=kwargs, callback=resultHandler, error_callback=errorHandler, ) except RuntimeError: if self.parent is not None: stop_exception = self.parent.stop_exception if stop_exception: self.logger.error(str(stop_exception)) errorHandler(stop_exception) return raise
@property def pool(self): if self.parent is not None: return self.parent.pool @property def pool_resources(self): return 1
[docs] def resultHandler(self, result: dict): """Async callback in case of success""" try: # Handle the result self._finishedSuccess(result) # Trigger actors downstreamData = self.compileDownstreamData(result) self._triggerDownStreamActors(downstreamData) except Exception as e: self.errorHandler(e)
[docs] def compileDownstreamData(self, result: dict) -> dict: return {**self.inData, **result}
[docs] def errorHandler(self, exception: Exception): """Async callback in case of exception""" try: # Handle the result workflow_exception = exceptions.serialize_exception(exception) self._logException(exception) self._finishedFailure(workflow_exception) # Trigger actors result = { "WorkflowException": workflow_exception, "WorkflowExceptionInstance": exception, } downstreamData = self.compileDownstreamErrorData(result) self._triggerErrorHandlers(downstreamData) except Exception: self.logger.exception("In errorHandler for '%s'", self.name)
[docs] def compileDownstreamErrorData(self, result: dict) -> dict: return {**self.inData, **result}
def _triggerDownStreamActors(self, downstreamData: dict): for downStreamActor in self.listDownStreamActor: downStreamActor.trigger(downstreamData) def _triggerErrorHandlers(self, downstreamData: dict): for errorHandler in self.listErrorHandler: errorHandler.trigger(downstreamData) if self.parentErrorHandler is not None: self.parentErrorHandler.triggerOnError(inData=downstreamData) def _finishedSuccess(self, result: dict): self.setFinished() self.uploadOutDataToMongo( actorData={ "stopTime": datetime.datetime.now(), "status": "finished", "outData": result, } ) if "workflowLogFile" in result: self.setMongoAttribute("logFile", result["workflowLogFile"]) if "workflowDebugLogFile" in result: self.setMongoAttribute("debugLogFile", result["workflowDebugLogFile"]) def _finishedFailure(self, result: dict): self.setFinished() self.uploadOutDataToMongo( actorData={ "stopTime": datetime.datetime.now(), "status": "error", "outData": result, } ) def _logException(self, exception: Exception) -> None: if exception.__traceback__ is None: logfunc = self.logger.error else: logfunc = self.logger.exception logfunc( "Error in python actor '%s'!\n Not running down stream actors %s\n Exception:%s", self.name, [actor.name for actor in self.listDownStreamActor], exception, )