[docs]@attr.sclassStateMachine(StepFunctionObject):""" Represent an instance of State Machine in AWS Console. :param name: :param workflow: :class:`~aws_stepfunction.workflow.Workflow` :param role_arn: :param type: :param logging_configuration: :param tracing_configuration: :param tags: """name:str=attr.ib()workflow:'Workflow'=attr.ib()role_arn:str=attr.ib(metadata={C.ALIAS:"roleArn"},)type:T.Optional[str]=attr.ib(default="STANDARD")logging_configuration:T.Optional[dict]=attr.ib(default=None,metadata={C.ALIAS:"loggingConfiguration"},)tracing_configuration:T.Optional[dict]=attr.ib(default=None,metadata={C.ALIAS:"tracingConfiguration"},)tags:T.Optional[dict]=attr.ib(default=None,validator=vs.optional(vs.deep_mapping(key_validator=vs.instance_of(str),value_validator=vs.instance_of(str),)))defset_type_as_standard(self)->'StateMachine':# pragma: no coverself.type="STANDARD"returnselfdefset_type_as_express(self)->'StateMachine':# pragma: no coverself.type="EXPRESS"returnselfdefis_express(self)->bool:returnself.type=="EXPRESS"def_convert_tags(self)->T.List[T.Dict[str,str]]:return[dict(key=key,value=value)forkey,valueinself.tags.items()]defget_state_machine_arn(self,bsm:'BotoSesManager')->str:return(f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:"f"stateMachine:{self.name}")defget_state_machine_console_url(self,bsm:'BotoSesManager')->str:return(f"https://{bsm.aws_region}.console.aws.amazon.com/states/"f"home?region={bsm.aws_region}#/statemachines/view/"f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:"f"stateMachine:{self.name}")defget_state_machine_visual_editor_console_url(self,bsm:'BotoSesManager')->str:return(f"https://{bsm.aws_region}.console.aws.amazon.com/states/"f"home?region={bsm.aws_region}#/visual-editor?stateMachineArn="f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:"f"stateMachine:{self.name}")defdescribe(self,bsm:'BotoSesManager'):sfn_client=bsm.get_client(AwsServiceEnum.SFN)returnsfn_client.describe_state_machine(stateMachineArn=self.get_state_machine_arn(bsm))
[docs]defexists(self,bsm:'BotoSesManager')->bool:""" Check if the state machine exists. """try:self.describe(bsm)returnTrueexceptExceptionase:if"StateMachineDoesNotExist"ine.__class__.__name__:returnFalseelse:# pragma: no coverraisee
[docs]@logger.decoratordefexecute(self,bsm:'BotoSesManager',payload:T.Optional[dict]=None,name:T.Optional[str]=None,sync:bool=False,trace_header:T.Optional[str]=None,):""" Execute state machine with custom payload. :param payload: custom payload in python dictionary :param name: the execution name, recommend to leave it empty and let step function to generate an uuid for you. :param sync: if true, you need to wait for the execution to finish otherwise, it returns immediately, and you can check the status in the console Reference: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.start_execution - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.start_sync_execution """state_machine_arn=self.get_state_machine_arn(bsm)logger.info(f"execute state machine {state_machine_arn!r}")sfn_client=bsm.get_client(AwsServiceEnum.SFN)kwargs=dict(stateMachineArn=state_machine_arn)ifpayloadisnotNone:kwargs["input"]=json.dumps(payload)ifnameisnotNone:# pragma: no coverkwargs["name"]=nameiftrace_headerisnotNone:# pragma: no coverkwargs["traceHeader"]=trace_headerifsync:# pragma: no coverres=sfn_client.start_sync_execution(**kwargs)else:res=sfn_client.start_execution(**kwargs)execution_arn=res["executionArn"]ifself.is_express():execution_id=":".join(execution_arn.split(":")[-2:])start_date_ts=int(res["startDate"].timestamp()*1000)execution_console_url=(f"https://{bsm.aws_region}.console.aws.amazon.com/states/"f"home?region={bsm.aws_region}#/express-executions/details/"f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:"f"express:{self.name}:{execution_id}?startDate={start_date_ts}")else:execution_id=execution_arn.split(":")[-1]execution_console_url=(f"https://{bsm.aws_region}.console.aws.amazon.com/states/"f"home?region={bsm.aws_region}#/v2/executions/details/"f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:"f"execution:{self.name}:{execution_id}")logger.info(f" preview at: {execution_console_url}")returnres
@logger.decoratordefdeploy(self,bsm:'BotoSesManager')->dict:self._deploy_magic(bsm)logger.info(f"deploy state machine to {self.get_state_machine_arn(bsm)!r} ...")ifself.exists(bsm):logger.info(" already exists, update state machine ...")res=self.update(bsm)logger.info(f" done, preview at: {self.get_state_machine_visual_editor_console_url(bsm)}")res["_deploy_action"]="update"else:logger.info(" not exists, create state machine ...")res=self.create(bsm)res["_deploy_action"]="create"logger.info(f" done, preview at: {self.get_state_machine_visual_editor_console_url(bsm)}")returnres# -------------------------------------------------------------------------# Magic Task# -------------------------------------------------------------------------@propertydef_stack_name(self)->str:""" Magic task cloudFormation stack name. """returnslugify(self.name)@logger.decoratordef_deploy_magic(self,bsm:'BotoSesManager'):""" Deploy magic tasks (if available) """boto_man=BotoMan(bsm=bsm)# detect whether the magic task is usedlogger.info("detect whether the magic task is used ...")defiterate_task_state(workflow:'Workflow')->T.Iterable[Task]:for_,stateinworkflow._states.items():ifisinstance(state,Task):yieldstateelifisinstance(state,Parallel):forsub_workflowinstate.branches:forstate_initerate_task_state(sub_workflow):yieldstate_else:passlbd_task_list:T.List[BaseLambdaTask]=list()forstateiniterate_task_state(self.workflow):ifstate._is_magic():ifisinstance(state,BaseLambdaTask):lbd_task_list.append(state)has_magic_task:bool=len(lbd_task_list)>0ifhas_magic_task:logger.info("yes",1)else:logger.info("no",1)# First create the necessary S3 bucket,tpl=cf.Template()need_to_deploy_s3_and_iam=FalseDEFAULT_CREATE_BY="aws-stepfunction-python-sdk"ifhas_magic_task:logger.info("identify necessary S3 bucket and IAM role ...")# create necessary S3 Buckets3_bucket_set:T.Set[str]=set()forstateinlbd_task_list:ifstate.lbd_code_s3_bucketisNone:bucket_name=boto_man.default_s3_bucket_artifactselse:bucket_name=state.lbd_code_s3_buckets3_bucket_set.add(bucket_name)forbucket_nameins3_bucket_set:try:tags=boto_man.get_s3_bucket_tags(bucket_name)iftags.get("CreatedBy","unknown")==DEFAULT_CREATE_BY:need_to_declare_this_bucket=Trueelse:need_to_declare_this_bucket=FalseexceptBucketNotExist:need_to_declare_this_bucket=Trueneed_to_deploy_s3_and_iam=Truelogger.info(f"need to create S3 Bucket {bucket_name!r}",1)ifneed_to_declare_this_bucket:s3_bucket=s3.Bucket(f"S3Bucket{camel_case(bucket_name)}",p_BucketName=bucket_name,)# logger.info(f"declare S3 Bucket {s3_bucket.p_BucketName}")tpl.add(s3_bucket)# create necessary IAM roleneed_default_iam_role=Falseforstateinlbd_task_list:ifstate.lbd_roleisNone:need_default_iam_role=Truelogger.info("we need a default IAM role for lambda function",1)breakifneed_default_iam_role:try:tags=boto_man.get_iam_role_tags(boto_man.default_iam_role_magic_task)iftags.get("CreatedBy","unknown")==DEFAULT_CREATE_BY:need_to_declare_default_iam_role=Trueelse:need_to_declare_default_iam_role=FalseexceptIamRoleNotExist:need_to_declare_default_iam_role=Trueneed_to_deploy_s3_and_iam=Truelogger.info(f"need to create IAM Role {boto_man.default_iam_role_magic_task!r}",1)ifneed_to_declare_default_iam_role:default_role=iam.Role("DefaultLambdaRole",rp_AssumeRolePolicyDocument=cf.helpers.iam.AssumeRolePolicyBuilder(cf.helpers.iam.ServicePrincipal.awslambda(),).build(),p_RoleName=boto_man.default_iam_role_magic_task,p_ManagedPolicyArns=[cf.helpers.iam.AwsManagedPolicy.AWSLambdaBasicExecutionRole])# print(f"declare IAM Role {default_role.p_RoleName}")tpl.add(default_role)logger.info("done",1)ifneed_to_deploy_s3_and_iam:self._deploy_cft(bsm=bsm,tpl=tpl,msg="deploy S3 and IAM ...",period=5,retry=12,)logger.info("deploy Lambda Functions ...")context.attach_boto_session(bsm.boto_ses)dir_home=Path.home()dir_home_tmp=dir_home/"tmp"logger.info("upload lambda deployment artifacts ...",1)forstateinlbd_task_list:path=dir_home_tmp.joinpath(f"{state.path_lbd_script.md5}.zip")state.path_lbd_script.make_zip_archive(dst=path.abspath,makedirs=True,include_dir=True,overwrite=True,compress=True,verbose=False,)# Don't update the state object directly!ifstate.lbd_roleisNone:lbd_role=boto_man.default_iam_role_arn_magic_taskelse:lbd_role=state.lbd_roleifstate.lbd_code_s3_bucketisNone:lbd_code_s3_bucket=boto_man.default_s3_bucket_artifactslbd_code_s3_key=f"{boto_man.default_s3_bucket_artifacts_prefix}/{path.md5}.zip"else:lbd_code_s3_bucket=state.lbd_code_s3_bucketlbd_code_s3_key=state.lbd_code_s3_keys3path=S3Path(lbd_code_s3_bucket,lbd_code_s3_key)logger.info(f"upload from {path} to {s3path.uri}",2)s3path.upload_file(path.abspath,overwrite=True)new_state=attr.evolve(state,lbd_role=lbd_role,lbd_code_s3_bucket=lbd_code_s3_bucket,lbd_code_s3_key=lbd_code_s3_key,)lbd_func=new_state.lambda_function()lbd_func.update_tags(overwrite_existing=True,hash=state.path_lbd_script.md5,)logger.info(f"declare Lambda Function {lbd_func.p_FunctionName}",2)tpl.add(lbd_func)self._deploy_cft(bsm=bsm,tpl=tpl,msg="deploy magic task Lambda Function ...",period=5,retry=12,)def_deploy_cft(self,bsm:BotoSesManager,tpl:'cf.Template',msg:str,period:int,retry:int,):""" A syntax sugar that deploy ``cottonformation.Template``. """logger.info(msg)tpl.batch_tagging(overwrite_existing=True,CreatedBy="aws-stepfunction-python-sdk",)env=cf.Env(bsm=bsm)boto_man=BotoMan(bsm=bsm)try:stack_console_url=(f"https://console.aws.amazon.com/cloudformation/home?"f"region={bsm.aws_region}#/stacks?"f"filteringStatus=active&"f"filteringText={self._stack_name}&"f"viewNested=true&"f"hideStacks=false&"f"stackId=")logger.info(f"preview cloudformation stack status: {stack_console_url}",1)env.deploy(template=tpl,stack_name=self._stack_name,include_iam=True,verbose=False,)boto_man.wait_cloudformation_stack_success(name=self._stack_name,period=period,retry=retry,)logger.info("done",1)exceptExceptionase:if"No updates are to be performed"instr(e):logger.info("no updates are to be performed",1)else:raisee