Source code for pypushflow.tests.test_actors

import unittest
from unittest import mock

from pypushflow.ErrorHandler import ErrorHandler
from pypushflow.ForkActor import ForkActor
from pypushflow.JoinActor import JoinActor
from pypushflow.PythonActor import PythonActor
from pypushflow.RouterActor import RouterActor
from pypushflow.StartActor import StartActor
from pypushflow.StopActor import StopActor
from pypushflow.ThreadCounter import ThreadCounter


[docs] class ConditionalErrorHandler(ErrorHandler): trigger_on_error = True
[docs] class TestPythonActor(unittest.TestCase):
[docs] def setUp(self): self.thread_counter = ThreadCounter()
[docs] def test_PythonActor(self): script = "pypushflow.tests.tasks.pythonActorTest.py" name = "Python Actor Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) inData = {"name": "Ragnar"} actor.connect(stopActor) actor.trigger(inData) stopActor.join(timeout=10) outData = stopActor.outData self.assertIsNotNone(outData) self.assertEqual(outData["reply"], "Hello Ragnar!")
[docs] def test_PythonActorWithPositionalArguments(self): script = "pypushflow.tests.tasks.pythonActorTest.py" name = "Python Actor Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) inData = {0: "Ragnar"} actor.connect(stopActor) actor.trigger(inData) stopActor.join(timeout=10) outData = stopActor.outData self.assertIsNotNone(outData) self.assertEqual(outData["reply"], "Hello Ragnar!")
[docs] def test_ErrorHandler(self): script = "pypushflow.tests.tasks.pythonErrorHandlerTest.py" name = "Python Error Handler Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) errorHandler = ErrorHandler( name="Error handler", thread_counter=self.thread_counter ) inData = {"name": "Ragnar"} actor.connect(stopActor) actor.connectOnError(errorHandler) errorHandler.connect(stopActor) actor.logger.error = mock.Mock() actor.logger.exception = mock.Mock() actor.trigger(inData) stopActor.join(timeout=5) outData = stopActor.outData self.assertIsNotNone(outData) self.assertTrue("WorkflowException" in outData) actor.logger.error.assert_not_called() actor.logger.exception.assert_not_called()
[docs] def test_ErrorHandlerOnErrorLinkSkipsLogging(self): script = "pypushflow.tests.tasks.pythonErrorHandlerTest.py" name = "Python Error Handler Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) conditionalErrorHandler = ConditionalErrorHandler( name="Conditional error handler", thread_counter=self.thread_counter ) inData = {"name": "Ragnar"} actor.connect(stopActor) actor.connectOnError(conditionalErrorHandler) conditionalErrorHandler.connect(stopActor) actor.logger.error = mock.Mock() actor.logger.exception = mock.Mock() actor.trigger(inData) stopActor.join(timeout=5) outData = stopActor.outData self.assertIsNotNone(outData) self.assertTrue("WorkflowException" in outData) actor.logger.error.assert_not_called() actor.logger.exception.assert_not_called()
[docs] def test_ForkAndJoinActors(self): start = StartActor(thread_counter=self.thread_counter) stop = StopActor(thread_counter=self.thread_counter) fork = ForkActor(thread_counter=self.thread_counter) joinActor = JoinActor(thread_counter=self.thread_counter) pythonActor1 = PythonActor( script="pypushflow.tests.tasks.pythonActor1.py", thread_counter=self.thread_counter, ) pythonActor2 = PythonActor( script="pypushflow.tests.tasks.pythonActor2.py", thread_counter=self.thread_counter, ) # Connections start.connect(fork) fork.connect(pythonActor1) fork.connect(pythonActor2) pythonActor1.connect(joinActor) joinActor.increaseNumberOfThreads() pythonActor2.connect(joinActor) joinActor.increaseNumberOfThreads() joinActor.connect(stop) # Run inData = {"a": 1} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData)
[docs] def test_RouterActor(self): start = StartActor(thread_counter=self.thread_counter) stop = StopActor(thread_counter=self.thread_counter) router = RouterActor( parent=None, itemName="a", listPort=["other", "null"], thread_counter=self.thread_counter, ) pythonActor1 = PythonActor( script="pypushflow.tests.tasks.pythonActor1.py", thread_counter=self.thread_counter, ) pythonActor2 = PythonActor( script="pypushflow.tests.tasks.pythonActor2.py", thread_counter=self.thread_counter, ) # Connections start.connect(router) router.connect(pythonActor1, "other") router.connect(pythonActor2, "null") pythonActor1.connect(stop) pythonActor2.connect(stop) # Run inData = {"a": 1} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData) self.assertTrue(outData["actor1"]) # Run 2 inData = {"a": None} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData) self.assertTrue(outData["actor2"])