Source code for pypushflow.persistence.interface
from typing import Callable
from typing import Optional
[docs]
class WorkflowDbClient:
"""Client interface of a database for storing workflow executions."""
_REGISTRY = dict()
_ACTORINFO_FILTERS = list()
def __init_subclass__(cls, register_name=None) -> None:
super().__init_subclass__()
if register_name:
WorkflowDbClient._REGISTRY[register_name] = cls
[docs]
@classmethod
def get_dbclient_class(cls, name):
return WorkflowDbClient._REGISTRY.get(name, None)
[docs]
@classmethod
def register_actorinfo_filter(cls, method: Callable[[dict], dict]):
if method not in cls._ACTORINFO_FILTERS:
WorkflowDbClient._ACTORINFO_FILTERS.append(method)
[docs]
@classmethod
def apply_actorinfo_filters(cls, info: dict) -> dict:
for method in WorkflowDbClient._ACTORINFO_FILTERS:
info = method(info)
return info
[docs]
def connect(self):
raise NotImplementedError
[docs]
def disconnect(self):
raise NotImplementedError
[docs]
def startWorkflow(self, name: str):
raise NotImplementedError
[docs]
def endWorkflow(self, status="finished") -> None:
raise NotImplementedError
[docs]
def ensureEndWorkflow(self) -> None:
raise NotImplementedError
[docs]
def updateWorkflowInfo(self, info: dict) -> None:
raise NotImplementedError
[docs]
def setWorkflowStatus(self, status: str) -> None:
self.updateWorkflowInfo({"status": status})
[docs]
def getWorkflowInfo(self) -> Optional[dict]:
raise NotImplementedError
[docs]
def startActor(
self, name: str, info: Optional[dict] = None, script: Optional[str] = None
):
raise NotImplementedError
[docs]
def endActor(self, actorId, status="finished") -> None:
raise NotImplementedError
[docs]
def updateActorInfo(self, actorId, info: dict) -> None:
raise NotImplementedError
[docs]
def setActorStatus(self, actorId, status: str) -> None:
self.updateActorInfo(actorId, {"status": status})
[docs]
def getActorInfo(self, actorId) -> Optional[None]:
raise NotImplementedError