Source code for pypushflow.AbstractActor
from typing import Optional
from pypushflow.ThreadCountingActor import ThreadCountingActor
[docs]
class AbstractActor(ThreadCountingActor):
def __init__(self, parent=None, name=None, **kw):
super().__init__(name=name, parent=parent, **kw)
self.listDownStreamActor = []
self.actorId = None
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f"{type(self).__name__}(name={self.name})"
[docs]
def connect(self, actor):
self.logger.debug("connect to actor '%s'", actor.name)
self.listDownStreamActor.append(actor)
def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None:
self.setStarted()
self.setFinished()
for actor in self.listDownStreamActor:
actor.trigger(inData)
[docs]
def uploadInDataToMongo(self, actorData=None, script=None):
if self.parent is not None:
name = self.getActorPath() + "/" + self.name
if actorData:
info = dict(actorData)
else:
info = dict()
if script:
info["script"] = script
self.actorId = self.parent.db_client.startActor(name=name, info=info)
[docs]
def uploadOutDataToMongo(self, actorData=None):
if actorData and self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info=actorData)
[docs]
def setMongoAttribute(self, attribute, value):
if self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info={attribute: value})
[docs]
def getActorPath(self):
return self.parent.getActorPath()
[docs]
def setStarted(self):
self.logger.info("started")
[docs]
def setFinished(self):
self.logger.info("finished")