Source code for pypushflow.tests.test_workflow11
import time
from pypushflow.PythonActor import PythonActor
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.tests.workflowTestCase import WorkflowTestCase
from pypushflow.ThreadCounter import ThreadCounter
from pypushflow.Workflow import Workflow
[docs]
class Workflow11(Workflow):
def __init__(self, name):
super().__init__(name)
ctr = ThreadCounter(parent=self)
self.startActor = StartActor(self, thread_counter=ctr)
self.stopActor = StopActor(self, thread_counter=ctr)
self.add_actors = [
PythonActor(
parent=self,
script="pypushflow.tests.tasks.pythonActorAddWithoutSleep.py",
name="Add without sleep",
thread_counter=ctr,
)
for _ in range(20)
]
self.gather = PythonActor(
parent=self,
script="pypushflow.tests.test_workflow11.py",
name="Add without sleep",
thread_counter=ctr,
)
for actor in self.add_actors:
self.startActor.connect(actor)
actor.connect(self.gather)
self.gather.connect(self.stopActor)
[docs]
class TestWorkflow11(WorkflowTestCase):
[docs]
def test_workflow11(self):
"""
Test workflow with many parallel triggers of the same actor instance (gather actor).
"""
global NCALLS
NCALLS = 0
workflow11 = Workflow11("Test workflow 11")
nexpected = len(workflow11.add_actors)
outData = workflow11.run(
{"value": 1},
timeout=10,
raise_on_timeout=True,
scaling_workers=False,
max_workers=nexpected + 3,
pool_type="thread",
)
assert NCALLS == nexpected
self.assertTrue(outData["value"])
[docs]
def run(file_path=None, **_):
"""Called from the 'gather' actor."""
time.sleep(0.1)
global NCALLS
NCALLS += 1
return {}