Source code for pypushflow.RouterActor

from typing import Optional

from pypushflow.AbstractActor import AbstractActor


[docs] class RouterActor(AbstractActor): def __init__( self, parent=None, errorHandler=None, name="Router", itemName=None, listPort=None, **kw, ): super().__init__(parent=parent, name=name, **kw) self.errorHandler = errorHandler self.name = name self.itemName = itemName if listPort is None: self.listPort = [] else: self.listPort = listPort self.dictValues = {}
[docs] def connect(self, actor, expectedValue="other"): self.logger.debug( "connect to actor '%s' (output port %s)", actor.name, expectedValue ) if expectedValue != "other" and expectedValue not in self.listPort: raise RuntimeError( f"Port {expectedValue} not defined for router actor {self.name}!" ) if expectedValue in self.dictValues: self.dictValues[expectedValue].append(actor) else: self.dictValues[expectedValue] = [actor]
def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None: self.setStarted() self.setFinished() listActor = None if self.itemName in inData: self.logger.debug("router item = '%s'", self.itemName) value = inData[self.itemName] self.logger.debug("router item = '%s' value = %s", self.itemName, value) if value in [None, "None", "null"]: value = "null" elif isinstance(value, bool): if value: value = "true" else: value = "false" if not isinstance(value, dict) and value in self.dictValues: listActor = self.dictValues[value] if listActor is None: self.logger.debug("no router destinations for inData") if "other" in self.dictValues: listActor = self.dictValues["other"] else: raise RuntimeError(f"No 'other' port for router actor '{self.name}'") for actor in listActor: actor.trigger(inData)