Source code for pypushflow.tests.test_workflow4

from pypushflow.PythonActor import PythonActor
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.Submodel import Submodel
from pypushflow.tests.workflowTestCase import WorkflowTestCase
from pypushflow.ThreadCounter import ThreadCounter
from pypushflow.Workflow import Workflow


[docs] class Submodel4(Submodel): """ Submodel containing one python actor which throws an exception. """ def __init__(self, parent, name, thread_counter): super().__init__(parent=parent, name=name, thread_counter=thread_counter) self.pythonActor = PythonActor( parent=self, script="pypushflow.tests.tasks.pythonErrorHandlerTest.py", name="Python Error Handler Test", errorHandler=self, thread_counter=thread_counter, ) self.getPort("In").connect(self.pythonActor) self.pythonActor.connect(self.getPort("Out"))
[docs] class Workflow4(Workflow): """ Workflow containing one start actor, one submodel which throws an exception and one stop actor. """ def __init__(self, name): super().__init__(name) ctr = ThreadCounter(parent=self) self.startActor = StartActor(thread_counter=ctr) self.submodel4 = Submodel4(parent=self, name="Submodel 4", thread_counter=ctr) self.stopActor = StopActor(thread_counter=ctr) self.startActor.connect(self.submodel4.getPort("In")) self.submodel4.getPort("Out").connect(self.stopActor) self.connectOnError(self.stopActor)
[docs] class TestWorkflow4(WorkflowTestCase):
[docs] def test_workflow4(self): workflow4 = Workflow4("Test workflow 4") inData = {"name": "Dog"} outData = workflow4.run( inData, timeout=5, scaling_workers=False, max_workers=-1 ) self.assertIsNotNone(outData) self.assertTrue("WorkflowException" in outData)