Source code for pypushflow.concurrent.scaling
import time
from numbers import Number
from typing import Callable
from typing import Optional
from .base import BasePool
[docs]
class ScalingPool(BasePool):
def __init__(
self,
wait_on_exit: bool = True,
wait_on_exit_timeout: Optional[Number] = None,
interrupt_on_exit: bool = False,
pool_type: Optional[str] = None,
**pool_options,
):
if pool_type == "scaling":
raise ValueError("cannot nest scaling pools")
self._running_pools = list()
self._finished_pools = list()
self._pool_type = pool_type
pool_options["max_workers"] = 1
self._pool_options = pool_options
super().__init__(
wait_on_exit=wait_on_exit,
wait_on_exit_timeout=wait_on_exit_timeout,
interrupt_on_exit=interrupt_on_exit,
)
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
for pool in self._running_pools:
pool.__exit__(exc_type, exc_val, exc_tb)
self._running_pools = list()
for pool in self._finished_pools:
pool.__exit__(exc_type, exc_val, exc_tb)
self._finished_pools = list()
[docs]
def close(self):
for pool in self._finished_pools:
pool.close()
for pool in self._running_pools:
pool.close()
[docs]
def join(self, timeout: Optional[Number] = None):
if timeout is not None:
t0 = time.time()
for pool in self._running_pools:
if not pool.join(timeout=timeout):
return False
if timeout is not None:
timeout -= time.time() - t0
self._running_pools = list()
for pool in self._finished_pools:
if not pool.join(timeout=timeout):
return False
if timeout is not None:
timeout -= time.time() - t0
self._finished_pools = list()
[docs]
def interrupt(self) -> None:
for pool in list(self._running_pools):
pool.interrupt()
for pool in list(self._finished_pools):
pool.interrupt()
def _new_pool(self):
from .factory import get_pool
pool = get_pool(self._pool_type)(**self._pool_options)
self._running_pools.append(pool)
return pool
def _release_pool(self, pool):
try:
idx = self._running_pools.index(pool)
except ValueError:
return
pool = self._running_pools.pop(idx)
self._finished_pools.append(pool)
[docs]
def apply_async(
self,
fn: Callable,
callback: Optional[Callable] = None,
error_callback: Optional[Callable] = None,
args=tuple(),
kwargs=None,
):
pool = self._new_pool()
if callback is None:
def _callback(return_value):
self._release_pool(pool)
else:
def _callback(return_value):
try:
return callback(return_value)
finally:
self._release_pool(pool)
if callback is None:
def _error_callback(exception):
self._release_pool(pool)
else:
def _error_callback(exception):
try:
return error_callback(exception)
finally:
self._release_pool(pool)
future = pool.apply_async(
fn,
args=args,
kwargs=kwargs,
callback=_callback,
error_callback=_error_callback,
)
pool.close()
return future