Source code for pypushflow.persistence.mongita
try:
from bson.objectid import ObjectId
from mongita import MongitaClientMemory
except Exception:
ObjectId = None
MongitaClientMemory = None
from .mongo import MongoWorkflowDbClient
[docs]
class MemoryWorkflowDbClient(MongoWorkflowDbClient, register_name="memory"):
"""Client of an in-memory Mongo database for storing workflow executions.
Used for testing purposes.
"""
[docs]
def connect(self):
if MongitaClientMemory is None:
return
self._collection = MongitaClientMemory()["ppf"]["ppf"]
[docs]
def disconnect(self):
self._collection = None
[docs]
def generateWorkflowId(self) -> ObjectId:
return ObjectId()
[docs]
def generateActorId(self) -> ObjectId:
return ObjectId()
def _appendActorInfo(self, actorInfo: dict):
workflowInfo = self._getWorkflowInfo()
workflowInfo["actors"].append(actorInfo)
self._setWorkflowInfo(workflowInfo)