pypushflow.tests.test_stop_workflow.WorkflowSleep#

class pypushflow.tests.test_stop_workflow.WorkflowSleep(name, **kw)[source]#

Bases: Workflow

addActorRef(actorRef)#
connectOnError(actor)#
endWorkflow(status)#
getActorPath()#
getListActorRef()#
property pool#
run(inData, timeout=None, max_workers=None, scaling_workers=True, pool_type=None, **pool_options)#
Parameters:
  • timeout (Optional[float])

  • max_workers (Optional[int])

  • scaling_workers (bool)

  • pool_type (Optional[str])

Return type:

dict

setStatus(status)#
stop(reason='interrupt workflow', forced_interruption=None)#
Parameters:
  • reason (str)

  • forced_interruption (Optional[bool])

property stop_exception: BaseException | None#
triggerOnError(inData)#