Source code for pypushflow.persistence.mongo

import logging
from datetime import datetime
from typing import Any
from typing import Mapping
from typing import Optional

from .interface import WorkflowDbClient

logger = logging.getLogger(__name__)


[docs] class MongoWorkflowDbClient(WorkflowDbClient): """Client interface of a Mongo database for storing workflow executions.""" def __init__(self): self._collection = None self._workflowId = None self._previousWorkflowId = None
[docs] def generateWorkflowId(self): raise NotImplementedError
[docs] def generateActorId(self): raise NotImplementedError
[docs] def startWorkflow(self, name: str): if self._collection is None: return if self._workflowId is not None: raise RuntimeError("Workflow start already logged") workflowInfo = self._generateInitialWorkflowInfo() workflowInfo["name"] = name workflowInfo["status"] = "started" workflowInfo["startTime"] = datetime.now() try: self._collection.insert_one(workflowInfo) except Exception: self._collection = None logger.exception("Mongo database error") self._workflowId = workflowInfo["_id"]
[docs] def endWorkflow(self, status="finished") -> None: if self._skip: return workflowInfo = self._getWorkflowInfo() if workflowInfo["status"] != "error": workflowInfo["status"] = status workflowInfo["stopTime"] = datetime.now() self._setWorkflowInfo(workflowInfo) self._workflowId, self._previousWorkflowId = None, self._workflowId
[docs] def ensureEndWorkflow(self) -> None: if self._workflowId is not None: self.endWorkflow()
[docs] def updateWorkflowInfo(self, info: dict) -> None: if self._skip: return workflowInfo = self._getWorkflowInfo() workflowInfo.update(info) self._setWorkflowInfo(workflowInfo)
[docs] def getWorkflowInfo(self) -> Optional[dict]: if self._collection is None: return return self._getWorkflowInfo()
[docs] def startActor(self, name: str, info: Optional[str] = None): if self._skip: return actorInfo = self._generateInitialActorInfo() actorInfo["name"] = name actorInfo["status"] = "started" if info: actorInfo.update(info) actorInfo["startTime"] = datetime.now() actorInfo = self.apply_actorinfo_filters(actorInfo) self._appendActorInfo(actorInfo) return actorInfo["_id"]
def _appendActorInfo(self, actorInfo: dict): raise NotImplementedError
[docs] def endActor(self, actorId, status="finished") -> None: if self._skip: return workflowInfo = self._getWorkflowInfo() for actorInfo in workflowInfo["actors"]: if actorInfo["_id"] == actorId: if actorInfo["status"] != "error": actorInfo["status"] = status actorInfo["stopTime"] = datetime.now() self._setWorkflowInfo(workflowInfo) break
[docs] def updateActorInfo(self, actorId, info: dict) -> None: if self._skip: return info = self.apply_actorinfo_filters(info) workflowInfo = self._getWorkflowInfo() for actorInfo in workflowInfo["actors"]: if actorInfo["_id"] == actorId: actorInfo.update(info) self._setWorkflowInfo(workflowInfo) break
[docs] def getActorInfo(self, actorId) -> Optional[None]: if self._skip: return workflowInfo = self._getWorkflowInfo() for actorInfo in workflowInfo["actors"]: if actorInfo["_id"] == actorId: return actorInfo
@property def _skip(self): if self._collection is None: return True if self._workflowId is None: raise RuntimeError("Workflow start not logged") return False def _generateInitialWorkflowInfo(self) -> dict: oid = self.generateWorkflowId() return { "_id": oid, "Request ID": str(oid), "name": "unknown", "status": "unknown", "actors": [], } def _generateInitialActorInfo(self) -> dict: oid = self.generateActorId() return { "_id": oid, "name": "unknown", "status": "unknown", } def _getWorkflowInfo(self) -> dict: return self._collection.find_one( {"_id": self._workflowId or self._previousWorkflowId} ) def _setWorkflowInfo(self, info: dict) -> None: self._safe_update_one({"_id": self._workflowId}, {"$set": info}, upsert=False) def _safe_update_one( self, query: Mapping[str, Any], update: Mapping[str, Any], upsert: bool = False ) -> None: """ :param query: A query that matches the document to update. :param update: The modifications to apply. :param upsert (optional): If True, perform an insert if no documents match the filter. """ try: self._collection.update_one(query, update, upsert=upsert) except Exception: self._collection.update_one(query, self._sanitize(update), upsert=upsert) def _sanitize(self, value: Any) -> Any: return value