Source code for pypushflow.persistence
"""Persistent recording of a workflow executions."""
import os
import warnings
from typing import Optional, Callable
from .interface import WorkflowDbClient
from .pymongo import PyMongoWorkflowDbClient # noqa F401
from .besdb import BesWorkflowDbClient # noqa F401
from .mongita import MemoryWorkflowDbClient # noqa F401
from .dummy import DummyWorkflowDbClient # noqa F401
DEFAULT_DB_TYPE = "dummy"
[docs]
def init_db_client(*args, db_type: Optional[str] = None, **kwargs) -> WorkflowDbClient:
"""Initializes a database client based on the specified `db_type`.
:param db_type: The type of database client to initialize.
If not specified, defaults to `DEFAULT_DB_TYPE`.
Supported values include:
- **"besdb"**: Requires the following additional parameters:
- **url** (str): URL for connecting to the BES database.
- **initiator** (str): Initiator from which the request originates.
- **host** (str): Hostname from which the request originates.
- **port** (int): Port number from which the request originates.
- **request_id** (str): Unique identifier for the request.
- **"pymongo"**: Requires the following additional parameters:
- **url** (str): Connection URL for the MongoDB instance.
- **database** (str): Name of the database to access.
- **collection** (str): Name of the collection within the database.
- **"memory"**: An in-memory database type. No additional parameters required.
- **"dummy"**: A placeholder database type for testing or development purposes. No additional parameters required.
:param args: see `db_type`.
:param kwargs: see `db_type`.
:return: An instance of `WorkflowDbClient` specific to the specified `db_type`.
"""
if db_type is None:
url = os.environ.get("PYPUSHFLOW_MONGOURL", None)
if url:
warnings.warn(
"Using BESDB environment variables is deprecated", DeprecationWarning
)
db_type = "besdb"
kwargs = {
"url": url,
"initiator": os.environ.get("PYPUSHFLOW_INITIATOR"),
"host": os.environ.get("PYPUSHFLOW_HOST"),
"port": os.environ.get("PYPUSHFLOW_PORT"),
"request_id": os.environ.get("PYPUSHFLOW_OBJECTID"),
}
else:
db_type = DEFAULT_DB_TYPE
db_client_class = WorkflowDbClient.get_dbclient_class(db_type)
return db_client_class(*args, **kwargs)
[docs]
def register_actorinfo_filter(method: Callable):
WorkflowDbClient.register_actorinfo_filter(method)