Source code for pypushflow.tests.test_workflow5

from pypushflow.PythonActor import PythonActor
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.Submodel import Submodel
from pypushflow.tests.workflowTestCase import WorkflowTestCase
from pypushflow.ThreadCounter import ThreadCounter
from pypushflow.Workflow import Workflow


[docs] class Submodel5(Submodel): """ Submodel containing one python actor which has a long execution time """ def __init__(self, parent, name, thread_counter): super().__init__(parent=parent, name=name, thread_counter=thread_counter) self.pythonActor = PythonActor( parent=self, script="pypushflow.tests.tasks.pythonLongExecutionTest.py", name="Python Long Execution Test", errorHandler=self, thread_counter=thread_counter, ) self.getPort("In").connect(self.pythonActor) self.pythonActor.connect(self.getPort("Out"))
[docs] class Workflow5(Workflow): """ Workflow containing one start actor, one submodel which has a long execution and one stop actor with short timeout. """ def __init__(self, name): super().__init__(name) ctr = ThreadCounter(parent=self) self.startActor = StartActor(self, thread_counter=ctr) self.submodel5 = Submodel5(self, name="Submodel 5", thread_counter=ctr) self.stopActor = StopActor(self, thread_counter=ctr) self.startActor.connect(self.submodel5.getPort("In")) self.submodel5.getPort("Out").connect(self.stopActor)
[docs] class TestWorkflow5(WorkflowTestCase):
[docs] def test_Workflow5(self): testWorkflow5 = Workflow5("Test workflow 5") inData = {"name": "Dog", "sleep": 5} outData = testWorkflow5.run( inData, timeout=1, scaling_workers=False, max_workers=-1 ) self.assertIsNone(outData) outData = testWorkflow5.run( inData, timeout=10, scaling_workers=False, max_workers=-1 ) self.assertIsNotNone(outData)