Source code for pypushflow.concurrent.factory

import logging
import sys
from typing import Optional

from .process import ProcessPool
from .thread import ThreadPool

if sys.version_info >= (3, 7):
    from .ndprocess import NdProcessPool
else:
    NdProcessPool = None

try:
    from gevent.monkey import is_anything_patched

    if not is_anything_patched():
        raise ImportError("gevent did not patch anything")

    from .gevent import GreenletPool
except ImportError:
    # gevent is missing or didn't monkey patch the environment
    GreenletPool = None

    from .multiprocessing import MProcessPool
    from .ndmultiprocessing import NProcessPool

    try:
        from .billiard import BProcessPool
    except ImportError:
        BProcessPool = None
else:
    MProcessPool = None
    NProcessPool = None
    BProcessPool = None

from .scaling import ScalingPool

logger = logging.getLogger(__name__)

_POOLS = {
    "thread": ThreadPool,
    "process": ProcessPool,
    "ndprocess": NdProcessPool,
    "multiprocessing": MProcessPool,
    "ndmultiprocessing": NProcessPool,
    "billiard": BProcessPool,
    "gevent": GreenletPool,
    "scaling": ScalingPool,
}

_MESSAGES = {
    "ndprocess": "not supported in python < 3.6",
    "billiard": (
        "requires 'billiard'"
        if GreenletPool is None
        else "does not work with 'gevent' monkey patching"
    ),
    "gevent": "requires 'gevent' with monkey patching",
    "multiprocessing": "does not work with 'gevent' monkey patching",
    "ndmultiprocessing": "does not work with 'gevent' monkey patching",
}


[docs] def get_pool(pool_type: Optional[str] = None): if pool_type is None: if GreenletPool is None: pool_type = "process" else: pool_type = "gevent" pool = _POOLS[pool_type] if pool is None: raise ImportError( _MESSAGES.get(pool_type, f"pool type '{pool_type}' is unknown") ) logger.info(f"pypushflow concurrency: {pool_type}") return pool