Source code for pypushflow.concurrent.thread
from concurrent.futures import ThreadPoolExecutor, Future
from numbers import Number
from typing import Callable, Optional
from . import base
[docs]
class ThreadPool(base.BasePool):
def __init__(self, max_workers: Optional[int] = None, **kw) -> None:
kwargs = dict()
if max_workers is not None:
kwargs["max_workers"] = max_workers
self._pool = ThreadPoolExecutor(**kwargs)
self._closed = False
super().__init__(**kw)
def __enter__(self):
self._pool.__enter__()
return super().__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
self._pool.__exit__(exc_type, exc_val, exc_tb)
[docs]
def close(self):
self._closed = True
[docs]
def join(self, timeout: Optional[Number] = None) -> bool:
return False
[docs]
def interrupt(self) -> None:
raise RuntimeError("Cannot stop tasks of a thread pool")
[docs]
def apply_async(
self,
fn: Callable,
callback: Optional[Callable] = None,
error_callback: Optional[Callable] = None,
args=tuple(),
kwargs=None,
) -> Future:
if self._closed:
raise RuntimeError("the pool is closed")
def cb(future):
try:
result = future.result()
except Exception as e:
if error_callback is not None:
error_callback(e)
else:
if callback is not None:
callback(result)
if kwargs is None:
kwargs = dict()
future = self._pool.submit(fn, *args, **kwargs)
future.add_done_callback(cb)
return future