Source code for pypushflow.persistence.pymongo
from typing import Any
from typing import Mapping
try:
import bson
import pymongo
from bson.objectid import ObjectId
except Exception:
bson = None
pymongo = None
ObjectId = None
from .mongo import MongoWorkflowDbClient
MAX_INT64 = 2**63 - 1
MIN_INT64 = -(2**63)
[docs]
class PyMongoWorkflowDbClient(MongoWorkflowDbClient, register_name="pymongo"):
"""Client of an external Mongo database for storing workflow executions."""
def __init__(self, url: str, database: str, collection: str):
super().__init__()
self._url = url
self._database = database
self._collection = collection
[docs]
def connect(self):
if pymongo is None:
return
client = pymongo.MongoClient(self._url, serverSelectionTimeoutMS=10000)
self._client = client
self._collection = client[self._database][self._collection]
[docs]
def disconnect(self):
self._collection = None
if self._client is not None:
self._client.close()
self._client = None
[docs]
def generateWorkflowId(self) -> ObjectId:
return ObjectId()
[docs]
def generateActorId(self) -> ObjectId:
return ObjectId()
def _appendActorInfo(self, actorInfo: dict):
self._safe_update_one(
{"_id": self._workflowId}, {"$push": {"actors": actorInfo}}
)
def _sanitize(self, value: Any) -> Any:
if isinstance(value, Mapping):
return {k: self._sanitize(v) for k, v in value.items()}
if isinstance(value, (list, tuple, set)):
return [self._sanitize(v) for v in value]
if isinstance(value, int):
if value > MAX_INT64 or value < MIN_INT64:
return str(value)
return value
if not any(isinstance(value, t) for t in bson._BUILT_IN_TYPES):
return repr(value)
return value