Source code for pypushflow.tests.test_workflow10

from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ThreadCounter import ThreadCounter
from pypushflow.tests.workflowTestCase import WorkflowTestCase


[docs] class Workflow10(Workflow): def __init__(self, name): super().__init__(name) ctr = ThreadCounter(parent=self) self.startActor = StartActor(self, thread_counter=ctr) self.pythonActorAddWithoutSleep = PythonActor( parent=self, script="pypushflow.tests.tasks.pythonActorAddWithoutSleep.py", name="Add without sleep", thread_counter=ctr, ) self.pythonActorCheck = PythonActor( parent=self, script="pypushflow.tests.tasks.pythonActorCheck.py", name="Check", thread_counter=ctr, ) self.check = RouterActor( parent=self, name="Check", itemName="doContinue", listPort=["true", "false"], thread_counter=ctr, ) self.stopActor = StopActor(self, thread_counter=ctr) self.startActor.connect(self.pythonActorAddWithoutSleep) self.pythonActorAddWithoutSleep.connect(self.pythonActorCheck) self.pythonActorCheck.connect(self.check) self.check.connect(self.pythonActorAddWithoutSleep, expectedValue="true") self.check.connect(self.stopActor, expectedValue="false")
[docs] class TestWorkflow10(WorkflowTestCase):
[docs] def test_workflow10(self): limit = 10 workflow10 = Workflow10(f"Test workflow {limit}") inData = {"value": 1, "limit": limit} outData = workflow10.run( inData, timeout=1200, scaling_workers=False, max_workers=-1 ) self.assertEqual(outData["value"], limit)