Source code for pypushflow.AbstractActor

from typing import Optional

from pypushflow.ThreadCountingActor import ThreadCountingActor


[docs] class AbstractActor(ThreadCountingActor): def __init__(self, parent=None, name=None, **kw): super().__init__(name=name, parent=parent, **kw) self.listDownStreamActor = [] self.actorId = None def __str__(self) -> str: return self.name def __repr__(self) -> str: return f"{type(self).__name__}(name={self.name})"
[docs] def connect(self, actor): self.logger.debug("connect to actor '%s'", actor.name) self.listDownStreamActor.append(actor)
def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None: self.setStarted() self.setFinished() for actor in self.listDownStreamActor: actor.trigger(inData)
[docs] def uploadInDataToMongo(self, actorData=None, script=None): if self.parent is not None: name = self.getActorPath() + "/" + self.name if actorData: info = dict(actorData) else: info = dict() if script: info["script"] = script self.actorId = self.parent.db_client.startActor(name=name, info=info)
[docs] def uploadOutDataToMongo(self, actorData=None): if actorData and self.actorId is not None: self.parent.db_client.updateActorInfo(self.actorId, info=actorData)
[docs] def setMongoAttribute(self, attribute, value): if self.actorId is not None: self.parent.db_client.updateActorInfo(self.actorId, info={attribute: value})
[docs] def getActorPath(self): return self.parent.getActorPath()
[docs] def setStarted(self): self.logger.info("started")
[docs] def setFinished(self): self.logger.info("finished")