Source code for pypushflow.Submodel
import pprint
from typing import Optional
from pypushflow.logutils import PyPushflowLoggedObject
from pypushflow.ThreadCountingActor import ThreadCountingActor
[docs]
class Port(ThreadCountingActor):
def __init__(self, errorHandler, name, **kw):
super().__init__(name=errorHandler.name + "." + name, **kw)
self.errorHandler = errorHandler
self.listActor = []
self.inPortTrigger = None
[docs]
def connect(self, actor):
self.logger.debug("connectig to '%s'", actor.name)
self.listActor.append(actor)
[docs]
def setTrigger(self, trigger):
self.inPortTrigger = trigger
def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None:
self.logger.info(
"triggered with inData =\n %s",
pprint.pformat(inData),
)
if len(self.listActor) > 0:
for actor in self.listActor:
self.logger.debug(
"In trigger '%s' -> actorName '%s'",
self.errorHandler.name,
actor.name,
)
actor.trigger(inData)
if self.inPortTrigger is not None:
self.logger.debug(
"In '%s' trigger, trigger = '%s'",
self.errorHandler.name,
self.inPortTrigger,
)
self.inPortTrigger(inData)
[docs]
class Submodel(PyPushflowLoggedObject):
def __init__(
self,
parent=None,
errorHandler=None,
name=None,
portNames=["In", "Out"],
thread_counter=None,
):
super().__init__(log_metadata={"submodel": name}, parent=parent)
self.parent = parent
self.name = name
if errorHandler is None:
self.errorHandler = parent
else:
self.errorHandler = errorHandler
self.dictPort = {}
self.listOnErrorActor = []
for portName in portNames:
self.dictPort[portName] = Port(
self, portName, thread_counter=thread_counter
)
@property
def db_client(self):
return self.parent.db_client
[docs]
def setStatus(self, status):
self.parent.setStatus(status)
[docs]
def getActorPath(self):
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
[docs]
def getPort(self, portName):
return self.dictPort[portName]
[docs]
def connect(self, actor, portName="Out"):
self.dictPort[portName].connect(actor)
[docs]
def connectOnError(self, actor):
self.logger.debug("connect to error handler '%s'", actor.name)
self.listOnErrorActor.append(actor)
[docs]
def triggerOnError(self, inData):
self.logger.info(
"triggered due to error with inData =\n %s", pprint.pformat(inData)
)
for onErrorActor in self.listOnErrorActor:
onErrorActor.trigger(inData)
if self.errorHandler is not None:
self.errorHandler.triggerOnError(inData)
[docs]
def addActorRef(self, actorRef):
if self.parent is not None:
self.parent.addActorRef(actorRef)
@property
def pool(self):
if self.parent is not None:
return self.parent.pool
@property
def stop_exception(self) -> Optional[BaseException]:
if self.parent is not None:
return self.parent.stop_exception
[docs]
def stop(self, *args, **kw):
if self.parent is not None:
return self.parent.stop(*args, **kw)