State Machine Deployment¶
In this section, we demonstrate the best practice using aws_stepfunciton library to manage the deployment / update / execution and deletion of an AWS State Machine
Define Workflow, State and StateMachine¶
First, let’s import required libraries
[49]:
# import the aws_stepfunction top level namespace
import aws_stepfunction as sfn
# import the boto_session_manager library
from boto_session_manager import BotoSesManager
# import the pretty printer library
from rich import print
then, let’s define a simple workflow, and we hard code the input argument in the result
field in the first “Input Argument Handling” task. we expect to see this value in execution result.
[50]:
workflow = sfn.Workflow()
input_argument_handling = sfn.Pass(
id="Input Argument Handling",
result={"message": "Hello Alice"}, # <=== input argument
)
(
workflow.start_from(input_argument_handling)
.next_then(sfn.Pass(id="Task 1"))
.next_then(sfn.Pass(id="Task 2"))
.end()
)
print(workflow.serialize())
{ 'StartAt': 'Input Argument Handling', 'States': { 'Input Argument Handling': {'Type': 'Pass', 'Result': {'message': 'Hello Alice'}, 'Next': 'Task 1'}, 'Task 1': {'Type': 'Pass', 'Next': 'Task 2'}, 'Task 2': {'Type': 'Pass', 'End': True} } }
A minimal State Machine should include the following required arguments:
name
: the name of the state machineworkflow
: theaws_stepfunction.workflow.Workflow
objectrole_arn
: the IAM role to assume to grant the state machine necessary permission
By default, a State Machine only support async
execution mode. This is because that a State Machine could take very long to finish. If you want to execute a State Machine in sync
mode, you have to set the State Machine type as EXPRESS
. However, it introduces the limitation that the workflow has to be done in 5 minutes.
[51]:
state_machine = sfn.StateMachine(
name="stepfunction_deployment_example",
workflow=workflow,
role_arn="arn:aws:iam::669508176277:role/sanhe-for-everything-admin",
tags=dict(Creator="alice@example.com")
)
state_machine.set_type_as_express() # this also work :state_machine = sfn.StateMachine(..., type="EXPRESS", ...)
pass
boto_session_manager is a thin wrapper around the official boto3 library. It caches the created AWS Service Client in memory and avoid re-authentication in short period of time.
[52]:
bsm = BotoSesManager(
profile_name="aws_data_lab_sanhe_us_east_1",
region_name="us-east-1",
)
Deploy¶
Deploy is just a combination of create_state_machine and update_state_machine. If it doesn’t exist, we do create
, otherwise we do update
.
[53]:
result = state_machine.deploy(bsm)
print(result)
deploy state machine to 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example' ...
already exists, update state machine ...
done, preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/visual-editor?stateMachineArn=arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example
{ 'updateDate': datetime.datetime(2022, 8, 7, 11, 11, 43, 181000, tzinfo=tzlocal()), 'ResponseMetadata': { 'RequestId': '433346d0-90a3-4628-8fe4-4273b6add896', 'HTTPStatusCode': 200, 'HTTPHeaders': { 'x-amzn-requestid': '433346d0-90a3-4628-8fe4-4273b6add896', 'date': 'Sun, 07 Aug 2022 15:11:43 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '31' }, 'RetryAttempts': 0 }, '_deploy_action': 'update' }
Execution¶
You can execute a State Machine from python code. The output
field in the API response is the final State Machine output.
[54]:
result = state_machine.execute(bsm, sync=True) # explicitly use sync mode
print(result)
execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example'
preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:669508176277:execution:stepfunction_deployment_example:ddea193b-2c78-4fc9-9c1b-acade9d5a699
{ 'executionArn': 'arn:aws:states:us-east-1:669508176277:express:stepfunction_deployment_example:c2794158-4597-47e7-b6d3-d86ba6c21da6 :ddea193b-2c78-4fc9-9c1b-acade9d5a699', 'stateMachineArn': 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example', 'name': 'c2794158-4597-47e7-b6d3-d86ba6c21da6', 'startDate': datetime.datetime(2022, 8, 7, 11, 11, 44, 958000, tzinfo=tzlocal()), 'stopDate': datetime.datetime(2022, 8, 7, 11, 11, 44, 962000, tzinfo=tzlocal()), 'status': 'SUCCEEDED', 'input': '{}', 'inputDetails': {'included': True}, 'output': '{"message":"Hello Alice"}', 'outputDetails': {'included': True}, 'billingDetails': {'billedMemoryUsedInMB': 64, 'billedDurationInMilliseconds': 100}, 'ResponseMetadata': { 'RequestId': 'df85be9b-8ada-4f55-98b2-60d50f2b4d68', 'HTTPStatusCode': 200, 'HTTPHeaders': { 'x-amzn-requestid': 'df85be9b-8ada-4f55-98b2-60d50f2b4d68', 'date': 'Sun, 07 Aug 2022 15:11:44 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '752' }, 'RetryAttempts': 0 } }
Deploy a New Version and Execute Again¶
We changed the initial input from {"message": "Hello Alice"}
to {"message": "Hello Bob"}
, and we applied the updates.
[55]:
input_argument_handling.result = {"message": "Hello Bob"}
result = state_machine.deploy(bsm)
print(result)
deploy state machine to 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example' ...
already exists, update state machine ...
done, preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/visual-editor?stateMachineArn=arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example
{ 'updateDate': datetime.datetime(2022, 8, 7, 11, 11, 46, 64000, tzinfo=tzlocal()), 'ResponseMetadata': { 'RequestId': '0cff91cc-1688-4b3e-a573-22bdaed2b38f', 'HTTPStatusCode': 200, 'HTTPHeaders': { 'x-amzn-requestid': '0cff91cc-1688-4b3e-a573-22bdaed2b38f', 'date': 'Sun, 07 Aug 2022 15:11:46 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '31' }, 'RetryAttempts': 0 }, '_deploy_action': 'update' }
You can see that the output
field in execution result is changed.
[56]:
result = state_machine.execute(bsm, sync=True) # explicitly use sync mode
print(result)
execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example'
preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:669508176277:execution:stepfunction_deployment_example:02a8a9a0-40b8-485f-8720-da1d86c063bb
{ 'executionArn': 'arn:aws:states:us-east-1:669508176277:express:stepfunction_deployment_example:bb47dd82-6e97-4dfa-a1fc-1eb5843f5d7b :02a8a9a0-40b8-485f-8720-da1d86c063bb', 'stateMachineArn': 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example', 'name': 'bb47dd82-6e97-4dfa-a1fc-1eb5843f5d7b', 'startDate': datetime.datetime(2022, 8, 7, 11, 11, 47, 462000, tzinfo=tzlocal()), 'stopDate': datetime.datetime(2022, 8, 7, 11, 11, 47, 466000, tzinfo=tzlocal()), 'status': 'SUCCEEDED', 'input': '{}', 'inputDetails': {'included': True}, 'output': '{"message":"Hello Bob"}', 'outputDetails': {'included': True}, 'billingDetails': {'billedMemoryUsedInMB': 64, 'billedDurationInMilliseconds': 100}, 'ResponseMetadata': { 'RequestId': '46d8ec41-9dcf-41cc-93ff-f440bb0dc4f9', 'HTTPStatusCode': 200, 'HTTPHeaders': { 'x-amzn-requestid': '46d8ec41-9dcf-41cc-93ff-f440bb0dc4f9', 'date': 'Sun, 07 Aug 2022 15:11:47 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '750' }, 'RetryAttempts': 0 } }
Delete the State Machine¶
[57]:
result = state_machine.delete(bsm)
print(result)
delete state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example'
done, exam at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/statemachines/view/arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_deployment_example
{ 'ResponseMetadata': { 'RequestId': 'c1db49bc-de2d-4404-9485-58a93b023f5a', 'HTTPStatusCode': 200, 'HTTPHeaders': { 'x-amzn-requestid': 'c1db49bc-de2d-4404-9485-58a93b023f5a', 'date': 'Sun, 07 Aug 2022 15:11:51 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2' }, 'RetryAttempts': 0 } }
Final Thought¶
Immutable Deployment and Semantic Versioning
Since all State Machine / Workflow / State are just Python object, you could use semantic versioning in your python code to track deployment history. And you can use Git to roll back your deployment to any historical version.
[ ]: