Source code for pypushflow.tests.test_threadcounteractor

import unittest
from time import sleep
from threading import Lock
from pypushflow.ThreadCountingActor import ThreadCountingActor
from pypushflow.ThreadCounter import ThreadCounter
from concurrent.futures import ThreadPoolExecutor


[docs] class Counter: def __init__(self): self.value = 0 self._lock = Lock()
[docs] def increment(self): with self._lock: self.value += 1
[docs] class MyThreadCountingActor(ThreadCountingActor): def __init__(self, thread_counter, downstream_actors=tuple()): super().__init__(thread_counter=thread_counter, name="MyThreadCountingActor") self.downstream_actors = downstream_actors
[docs] def trigger(self, state): sleep(0.01) if not self.downstream_actors: state["ntasks"].increment() for actor in self.downstream_actors: actor.trigger(state)
[docs] class TestThreadCountingActor(unittest.TestCase):
[docs] def setUp(self): self.thread_counter = ThreadCounter()
[docs] def test_multiple_threads(self): workers1 = [MyThreadCountingActor(self.thread_counter) for _ in range(3)] workers2 = [ MyThreadCountingActor(self.thread_counter, workers1) for _ in range(5) ] workers3 = [ MyThreadCountingActor(self.thread_counter, workers2) for _ in range(10) ] state = {"ntasks": Counter()} with ThreadPoolExecutor(max_workers=10) as executor: executor.map(lambda w: w.trigger(state), workers3) self.thread_counter.wait_threads_finished() self.assertEqual(state["ntasks"].value, 150)