Source code for pypushflow.persistence.interface

from typing import Callable, Optional


[docs] class WorkflowDbClient: """Client interface of a database for storing workflow executions.""" _REGISTRY = dict() _ACTORINFO_FILTERS = list() def __init_subclass__(cls, register_name=None) -> None: super().__init_subclass__() if register_name: WorkflowDbClient._REGISTRY[register_name] = cls
[docs] @classmethod def get_dbclient_class(cls, name): return WorkflowDbClient._REGISTRY.get(name, None)
[docs] @classmethod def register_actorinfo_filter(cls, method: Callable[[dict], dict]): if method not in cls._ACTORINFO_FILTERS: WorkflowDbClient._ACTORINFO_FILTERS.append(method)
[docs] @classmethod def apply_actorinfo_filters(cls, info: dict) -> dict: for method in WorkflowDbClient._ACTORINFO_FILTERS: info = method(info) return info
[docs] def connect(self): raise NotImplementedError
[docs] def disconnect(self): raise NotImplementedError
[docs] def startWorkflow(self, name: str): raise NotImplementedError
[docs] def endWorkflow(self, status="finished") -> None: raise NotImplementedError
[docs] def ensureEndWorkflow(self) -> None: raise NotImplementedError
[docs] def updateWorkflowInfo(self, info: dict) -> None: raise NotImplementedError
[docs] def setWorkflowStatus(self, status: str) -> None: self.updateWorkflowInfo({"status": status})
[docs] def getWorkflowInfo(self) -> Optional[dict]: raise NotImplementedError
[docs] def startActor( self, name: str, info: Optional[dict] = None, script: Optional[str] = None ): raise NotImplementedError
[docs] def endActor(self, actorId, status="finished") -> None: raise NotImplementedError
[docs] def updateActorInfo(self, actorId, info: dict) -> None: raise NotImplementedError
[docs] def setActorStatus(self, actorId, status: str) -> None: self.updateActorInfo(actorId, {"status": status})
[docs] def getActorInfo(self, actorId) -> Optional[None]: raise NotImplementedError