Source code for pypushflow.tests.test_actors

import unittest

from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.ErrorHandler import ErrorHandler
from pypushflow.ForkActor import ForkActor
from pypushflow.JoinActor import JoinActor
from pypushflow.RouterActor import RouterActor
from pypushflow.ThreadCounter import ThreadCounter


[docs] class TestPythonActor(unittest.TestCase):
[docs] def setUp(self): self.thread_counter = ThreadCounter()
[docs] def test_PythonActor(self): script = "pypushflow.tests.tasks.pythonActorTest.py" name = "Python Actor Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) inData = {"name": "Ragnar"} actor.connect(stopActor) actor.trigger(inData) stopActor.join(timeout=10) outData = stopActor.outData self.assertIsNotNone(outData) self.assertEqual(outData["reply"], "Hello Ragnar!")
[docs] def test_PythonActorWithPositionalArguments(self): script = "pypushflow.tests.tasks.pythonActorTest.py" name = "Python Actor Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) inData = {0: "Ragnar"} actor.connect(stopActor) actor.trigger(inData) stopActor.join(timeout=10) outData = stopActor.outData self.assertIsNotNone(outData) self.assertEqual(outData["reply"], "Hello Ragnar!")
[docs] def test_ErrorHandler(self): script = "pypushflow.tests.tasks.pythonErrorHandlerTest.py" name = "Python Error Handler Test" actor = PythonActor( script=script, name=name, thread_counter=self.thread_counter ) stopActor = StopActor(thread_counter=self.thread_counter) errorHandler = ErrorHandler( name="Error handler", thread_counter=self.thread_counter ) inData = {"name": "Ragnar"} actor.connect(stopActor) actor.connectOnError(errorHandler) errorHandler.connect(stopActor) actor.trigger(inData) stopActor.join(timeout=5) outData = stopActor.outData self.assertIsNotNone(outData) self.assertTrue("WorkflowException" in outData)
[docs] def test_ForkAndJoinActors(self): start = StartActor(thread_counter=self.thread_counter) stop = StopActor(thread_counter=self.thread_counter) fork = ForkActor(thread_counter=self.thread_counter) joinActor = JoinActor(thread_counter=self.thread_counter) pythonActor1 = PythonActor( script="pypushflow.tests.tasks.pythonActor1.py", thread_counter=self.thread_counter, ) pythonActor2 = PythonActor( script="pypushflow.tests.tasks.pythonActor2.py", thread_counter=self.thread_counter, ) # Connections start.connect(fork) fork.connect(pythonActor1) fork.connect(pythonActor2) pythonActor1.connect(joinActor) joinActor.increaseNumberOfThreads() pythonActor2.connect(joinActor) joinActor.increaseNumberOfThreads() joinActor.connect(stop) # Run inData = {"a": 1} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData)
[docs] def test_RouterActor(self): start = StartActor(thread_counter=self.thread_counter) stop = StopActor(thread_counter=self.thread_counter) router = RouterActor( parent=None, itemName="a", listPort=["other", "null"], thread_counter=self.thread_counter, ) pythonActor1 = PythonActor( script="pypushflow.tests.tasks.pythonActor1.py", thread_counter=self.thread_counter, ) pythonActor2 = PythonActor( script="pypushflow.tests.tasks.pythonActor2.py", thread_counter=self.thread_counter, ) # Connections start.connect(router) router.connect(pythonActor1, "other") router.connect(pythonActor2, "null") pythonActor1.connect(stop) pythonActor2.connect(stop) # Run inData = {"a": 1} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData) self.assertTrue(outData["actor1"]) # Run 2 inData = {"a": None} start.trigger(inData) stop.join(timeout=5) outData = stop.outData self.assertIsNotNone(outData) self.assertTrue(outData["actor2"])