Source code for pypushflow.JoinActor
from typing import Optional
from pypushflow.AbstractActor import AbstractActor
[docs]
class JoinActor(AbstractActor):
def __init__(self, parent=None, name="Join actor", **kw):
super().__init__(parent=parent, name=name, **kw)
self.numberOfThreads = 0
self.listInData = []
[docs]
def increaseNumberOfThreads(self):
self.numberOfThreads += 1
def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None:
self.setStarted()
self.setFinished()
self.listInData.append(inData)
if len(self.listInData) == self.numberOfThreads:
newInData = {}
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)