# ------------------------------------------------------------------------------# Boto3# ------------------------------------------------------------------------------defcreate_log_group(bsm:BotoSesManager,log_group_name:str,):response=bsm.logs_client.describe_log_groups(logGroupNamePrefix=log_group_name,)iflen(response["logGroups"])==0:bsm.logs_client.create_log_group(logGroupName=log_group_name,)defcreate_logging_configuration(aws_account_id:str,aws_region:str,state_machine_name:str,level:str=StateMachineLoggingLevelEnum.ALL.value,include_execution_data:bool=True,log_group_arn:str=NOTHING,)->dict:logging_configuration={"level":level,"includeExecutionData":include_execution_data,}iflevel!=StateMachineLoggingLevelEnum.OFF.value:# has to define destinationiflog_group_arnisNOTHING:log_group_arn=(f"arn:aws:logs:{aws_region}:{aws_account_id}:log-group:"f"/aws/vendedlogs/states/{state_machine_name}-Logs:*")logging_configuration["destinations"]=[{"cloudWatchLogsLogGroup":{"logGroupArn":log_group_arn,}},]returnlogging_configuration
[docs]defcreate_state_machine(bsm:BotoSesManager,name:str,definition:str,role_arn:str,type:str=StateMachineTypeEnum.STANDARD.value,logging_configuration:dict=NOTHING,tracing_configuration:dict=NOTHING,tags:T.Dict[str,str]=NOTHING,)->dict:""" Ref: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.create_state_machine """iflogging_configurationisnotNOTHING:logging_level=logging_configuration["level"]if(logging_level!=StateMachineLoggingLevelEnum.OFF.value):# has to define destinationlog_group_arn=logging_configuration["destinations"][0]["cloudWatchLogsLogGroup"]["logGroupArn"]log_group_name=log_group_arn.split(":")[-2]create_log_group(bsm=bsm,log_group_name=log_group_name)iftagsisnotNOTHING:tags=to_tag_list(tags)returnbsm.stepfunctions_client.create_state_machine(**resolve_kwargs(name=name,definition=definition,roleArn=role_arn,type=type,loggingConfiguration=logging_configuration,tracingConfiguration=tracing_configuration,tags=tags,))
[docs]defdelete_state_machine(bsm:BotoSesManager,name_or_arn:str,)->bool:""" :return: a boolean flag that indicates whether a deletion happened or not. Ref: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.delete_state_machine """arn=_ensure_state_machine_arn(bsm=bsm,name_or_arn=name_or_arn)state_machine=describe_state_machine(bsm=bsm,name_or_arn=arn)ifstate_machineisNone:returnFalsebsm.stepfunctions_client.delete_state_machine(stateMachineArn=arn)returnTrue
deflist_state_machines(bsm:BotoSesManager,max_items:int=1000,page_size:int=100,)->StateMachineIterProxy:returnStateMachineIterProxy(_list_state_machines(bsm=bsm,max_items=max_items,page_size=page_size))defwait_delete_state_machine_to_finish(bsm:BotoSesManager,name_or_arn:str,delays:int=5,timeout:int=60,verbose:bool=True,):arn=_ensure_state_machine_arn(bsm=bsm,name_or_arn=name_or_arn)ifverbose:# pragma: no coverprint(f"wait for delete state machine {arn} to finish ...")succeeded_status=[]failed_status=[]for_inWaiter(delays=delays,timeout=timeout,verbose=verbose):state_machine=describe_state_machine(bsm=bsm,name_or_arn=arn)ifstate_machineisNone:ifverbose:print(f"endpoint doesn't exists.")returnFalsestatus=state_machine.statusifstatusin[StateMachineStatusEnum.ACTIVE.value,]:raiseWaiterError(f"failed with status {status!r}")else:passifverbose:# pragma: no coverprint(f"state machine is deleted.")