Source code for pypushflow.tests.test_workflow11

import time

from pypushflow.PythonActor import PythonActor
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.tests.workflowTestCase import WorkflowTestCase
from pypushflow.ThreadCounter import ThreadCounter
from pypushflow.Workflow import Workflow


[docs] class Workflow11(Workflow): def __init__(self, name): super().__init__(name) ctr = ThreadCounter(parent=self) self.startActor = StartActor(self, thread_counter=ctr) self.stopActor = StopActor(self, thread_counter=ctr) self.add_actors = [ PythonActor( parent=self, script="pypushflow.tests.tasks.pythonActorAddWithoutSleep.py", name="Add without sleep", thread_counter=ctr, ) for _ in range(20) ] self.gather = PythonActor( parent=self, script="pypushflow.tests.test_workflow11.py", name="Add without sleep", thread_counter=ctr, ) for actor in self.add_actors: self.startActor.connect(actor) actor.connect(self.gather) self.gather.connect(self.stopActor)
[docs] class TestWorkflow11(WorkflowTestCase):
[docs] def test_workflow11(self): """ Test workflow with many parallel triggers of the same actor instance (gather actor). """ global NCALLS NCALLS = 0 workflow11 = Workflow11("Test workflow 11") nexpected = len(workflow11.add_actors) outData = workflow11.run( {"value": 1}, timeout=10, raise_on_timeout=True, scaling_workers=False, max_workers=nexpected + 3, pool_type="thread", ) assert NCALLS == nexpected self.assertTrue(outData["value"])
[docs] def run(file_path=None, **_): """Called from the 'gather' actor.""" time.sleep(0.1) global NCALLS NCALLS += 1 return {}