[docs]@attr.sclassWorkflow(StepFunctionObject):""" Workflow is a series of event-driven steps. This class defines the transition logic from one step to another. :param id: :param start_at: :param comment: :param states: :param version: :param timeout_seconds: Reference: - https://states-language.net/spec.html#toplevelfields """id:str=attr.ib(factory=lambda:f"Workflow-{short_uuid()}",validator=vs.instance_of(str),)comment:T.Optional[str]=attr.ib(default=None,validator=vs.optional(vs.instance_of(str)),)version:T.Optional[str]=attr.ib(default=None,validator=vs.optional(vs.instance_of(str)),)timeout_seconds:T.Optional[int]=attr.ib(default=None,validator=vs.optional(vs.instance_of(int)),)_start_at:T.Optional[str]=attr.ib(default=None,validator=vs.optional(vs.instance_of(str)),)_states:T.Dict[str,'StateType']=attr.ib(factory=dict,validator=vs.deep_mapping(key_validator=vs.instance_of(str),value_validator=vs.instance_of(StateType),),)_started:bool=attr.ib(default=False)_previous_state:T.Optional['StateType']=attr.ib(default=None)_field_order=[C.Version,C.StartAt,C.Comment,C.TimeoutSeconds,C.States,]def_add_state(self,state:'StateType',ignore_exists:bool=False,)->bool:""" Add a new state to this workflow. :return: indicate whether a new state is added """ifstate.idinself._states:ifignore_existsisFalse:raiseexc.WorkflowError.make(self,f"Cannot add State(ID={state.id!r}), "f"it is already defined!")returnFalseelse:self._states[state.id]=statereturnTruedef_remove_state(self,state:'StateType',ignore_not_exists:bool=False,)->bool:""" Remove a state from workflow. :return: indicate whether a state is removed """ifstate.idnotinself._states:ifignore_not_existsisFalse:raiseexc.WorkflowError.make(self,f"Cannot remove State(ID={state.id!r}), "f"it doesn't exist!")returnFalseelse:self._states.pop(state.id)returnTruedef_parallel(self,branches:T.Iterable['Workflow'],id:T.Optional[str]=None,)->'Parallel':""" Construct a :class:`aws_stepfunction.state.Parallel` task. """kwargs=dict(branches=branches)ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Parallel}-after-{self._previous_state.id}"else:kwargs["id"]=idparallel=Parallel(**kwargs)returnparalleldef_map(self,iterator:'Workflow',items_path:T.Optional[str]=None,max_concurrency:T.Optional[int]=None,id:T.Optional[str]=None,)->'Map':""" Construct a :class:`aws_stepfunction.state.Map` task. """kwargs=dict(iterator=iterator,items_path=items_path,max_concurrency=max_concurrency,)ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Map}-after-{self._previous_state.id}"else:kwargs["id"]=idmap_=Map(**kwargs)returnmap_def_choice(self,choices:T.List['ChoiceRule'],default:T.Optional['StateType']=None,id:T.Optional[str]=None)->'Choice':""" Construct a :class:`aws_stepfunction.state.Choice` task. """kwargs=dict(choices=choices)ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Choice}-by-{self._previous_state.id}"else:kwargs["id"]=idifdefaultisnotNone:kwargs["default"]=default.idchoice=Choice(**kwargs)returnchoice
[docs]defsubflow_from(self,state:'StateType')->'Workflow':""" Similar to :meth:`Workflow.start_from`, but it creates a new workflow instance and start from there. """workflow=Workflow()workflow.start_from(state)returnworkflow
[docs]defstart_from(self,state:'StateType')->'Workflow':""" Start the workflow from a state. """self._start_at=state.idself._add_state(state)self._started=Trueself._previous_state=statereturnself
def_check_started(self):ifself._startedisnotTrue:raiseexc.WorkflowError.make(self,"the workflow is not started yet, ""you should call one of the ""'Workflow.start()' ""'Workflow.start_from_parallel()' ""'Workflow.start_from_map()' ""'Workflow.start_from_choice()' ""first!")elifself._previous_stateisNone:raiseexc.WorkflowError.make(self,f"looks like you just defined a '{C.Choice}' state, ""you cannot call 'next_then()' immediately. because the workflow ""doesn't know which branch to continue from. ""you should call 'Workflow.continue_from()' for the first task ""of each choice branch to continue orchestration.")
[docs]defnext_then(self,state:'StateType')->'Workflow':""" Move from previous state to the next state. """self._check_started()self._previous_state.next=state.idifstate.idnotinself._states:self._add_state(state)self._previous_state=statereturnself
[docs]defparallel(self,branches:T.Iterable['Workflow'],id:T.Optional[str]=None,)->'Workflow':""" Create a :class:`~aws_stepfunction.state.Parallel` state and set it as the next. You definitely can manually create a ``Parallel`` state and pass to :meth:`next_then`. However, it damages the code readability and NOT RECOMMENDED. """self._check_started()parallel=self._parallel(branches=branches,id=id)self._previous_state.next=parallel.idself._add_state(parallel)self._started=Trueself._previous_state=parallelreturnself
[docs]defwait(self,id:T.Optional[str]=None,seconds:T.Optional[int]=None,timestamp:T.Optional[str]=None,seconds_path:T.Optional[str]=None,timestamp_path:T.Optional[str]=None,)->'Workflow':""" Create a :class:`~aws_stepfunction.state.Wait` state and set it as the next. """kwargs=dict(seconds=seconds,timestamp=timestamp,seconds_path=seconds_path,timestamp_path=timestamp_path,)ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Wait}-after-{self._previous_state.id}"else:kwargs["id"]=idwait=Wait(**kwargs)ifself._startedisFalse:self.start_from(wait)else:self._check_started()self._previous_state.next=wait.idself._add_state(wait)self._started=Trueself._previous_state=waitreturnself
[docs]defsucceed(self,id:T.Optional[str]=None,)->'Workflow':""" Create a :class:`~aws_stepfunction.state.Succeed` state and set it as the next. """kwargs=dict()ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Succeed}-after-{self._previous_state.id}"else:kwargs["id"]=idsucceed=Succeed(**kwargs)ifself._startedisFalse:self.start_from(succeed)else:self._check_started()self._previous_state.next=succeed.idself._add_state(succeed)self._started=Trueself._previous_state=succeedreturnself
[docs]deffail(self,cause:T.Optional[str]=None,error:T.Optional[str]=None,id:T.Optional[str]=None,)->'Workflow':""" Create a :class:`~aws_stepfunction.state.Fail` state and set it as the next. """kwargs=dict(cause=cause,error=error)ifidisNone:ifself._previous_stateisnotNone:kwargs["id"]=f"{C.Fail}-after-{self._previous_state.id}"else:kwargs["id"]=idfail=Fail(**kwargs)ifself._startedisFalse:self.start_from(fail)else:self._check_started()self._previous_state.next=fail.idself._add_state(fail)self._started=Trueself._previous_state=failreturnself
[docs]defend(self)->'Workflow':""" Mark the workflow is end, and also set the last state in the workflow End = True. """self._previous_state.end=Trueself._started=Falsereturnself
[docs]defcontinue_from(self,state:'StateType')->'Workflow':""" Continue workflow from a given state. """self._add_state(state,ignore_exists=True)self._started=Trueself._previous_state=statereturnself
def_pre_serialize_validation(self):ifnotself._start_at:raiseexc.WorkflowValidationError.make(self,f"'StartAt' cannot be empty string!")ifself._start_atnotinself._states:raiseexc.WorkflowValidationError(self,f"'StartAt' id {self._start_at!r} is not any of defined State ID")# this branch mostly will not hit, just put here for defensive programmingiflen(self._states)==0:# pragma: no coverraiseexc.WorkflowValidationError(self,"You have to define at least ONE state!")def_serialize(self)->dict:# set required fieldsdata={C.StartAt:self._start_at,C.States:{state_id:state.serialize()forstate_id,stateinself._states.items()},}# set optional fieldsifself.comment:data[C.Comment]=self.commentifself.version:data[C.Version]=self.versionifself.timeout_seconds:data[C.TimeoutSeconds]=self.timeout_seconds# sort the fieldsdata=self._sort_field(data)returndata