Source code for pypushflow.AbstractActor

import pprint
from pypushflow.ThreadCountingActor import ThreadCountingActor


[docs] class AbstractActor(ThreadCountingActor): def __init__(self, parent=None, name=None, **kw): super().__init__(name=name, parent=parent, **kw) self.listDownStreamActor = [] self.actorId = None self.started = False self.finished = False def __str__(self) -> str: return self.name
[docs] def connect(self, actor): self.logger.debug("connect to actor '%s'", actor.name) self.listDownStreamActor.append(actor)
[docs] def trigger(self, inData): self.logger.info("triggered with inData =\n %s", pprint.pformat(inData)) self.setStarted() self.setFinished() for actor in self.listDownStreamActor: actor.trigger(inData)
[docs] def uploadInDataToMongo(self, actorData=None, script=None): if self.parent is not None: name = self.getActorPath() + "/" + self.name if actorData: info = dict(actorData) else: info = dict() if script: info["script"] = script self.actorId = self.parent.db_client.startActor(name=name, info=info)
[docs] def uploadOutDataToMongo(self, actorData=None): if actorData and self.actorId is not None: self.parent.db_client.updateActorInfo(self.actorId, info=actorData)
[docs] def setMongoAttribute(self, attribute, value): if self.actorId is not None: self.parent.db_client.updateActorInfo(self.actorId, info={attribute: value})
[docs] def getActorPath(self): return self.parent.getActorPath()
[docs] def hasStarted(self): return self.started
[docs] def setStarted(self): self.logger.info("started") self.started = True
[docs] def hasFinished(self): return self.finished
[docs] def setFinished(self): self.logger.info("finished") self.finished = True