Quick Start

In this tutorial, let’s learn how to use aws_stepfunction library to create / deploy / execute / delete an AWS StepFunction.

The test Lambda Function

First, let’s create a simple lambda function for testing. The logic is to json encode the input event and just return. This lambda function has 30 % chance to fail. The lambda execution won’t fail, however, it returns an 400 HTTP status code.

[4]:
# content of lambda_handler.py

import json
import random

def lambda_handler(event, context):
    if random.randint(1, 100) <= 70:
        return {
            "statusCode": 200,
            "body": json.dumps(event),
        }
    else:
        return {
            "statusCode": 400,
            "body": "failed!"
        }

Sample input output

[7]:
# succeed
lambda_handler({"message": "hello world"}, None)
[7]:
{'statusCode': 200, 'body': '{"message": "hello world"}'}
[17]:
# fail
lambda_handler({"message": "hello world"}, None)
[17]:
{'statusCode': 400, 'body': 'failed!'}

Create a Step Function

Now let’s learn the power of aws_stepfunction library.

[1]:
# ------------------------------------------------------------------------------
# Step 1. import, preparation
# ------------------------------------------------------------------------------\
# Import the "aws_stepfunction" library,
# all public API can be accessed from the "sfn" namespace
import aws_stepfunction as sfn
from boto_session_manager import BotoSesManager
from rich import print # for pretty print

# Create a boto3 session manager object
# the credential you are using should have STS.get_caller_identity permission
# it is for getting the AWS Account ID information
bsm = BotoSesManager(
    profile_name="aws_data_lab_sanhe_us_east_1",
    region_name="us-east-1",
)

# You could tell the task context about the aws_account_id and aws_region
# then you can only need to provide lambda function name without the full ARN.
sfn.task_context.aws_account_id = bsm.aws_account_id
sfn.task_context.aws_region = bsm.aws_region

# Declare a workflow object
workflow = sfn.Workflow(
    comment="The power of aws_stepfunction library!",  # put random comment here
)
print(workflow)
Workflow(
    id='Workflow-1a98433',
    comment='The power of aws_stepfunction library!',
    version=None,
    timeout_seconds=None,
    _start_at=None,
    _states=OrderedDict(),
    _started=False,
    _previous_state=None
)
[2]:
# ------------------------------------------------------------------------------
# Step 2. Define some tasks and states
# ------------------------------------------------------------------------------
# There are some helper functions to create common task.
# These helper functions are just the equivalent of
# The widget in Step Function Visual Editor

# define a lambda function invoke task
task_invoke_lambda = sfn.actions.lambda_invoke(func_name="stepfunction_quick_start")

# define a succeed state
succeed = sfn.Succeed()

# define a fail state
fail = sfn.Fail()

print(task_invoke_lambda)
print(succeed)
print(fail)
Task(
    comment=None,
    next=None,
    end=None,
    input_path=None,
    output_path='$.Payload',
    parameters={
        'Payload.$': '$',
        'FunctionName': 'arn:aws:lambda:us-east-1:669508176277:stepfunction_quick_start'
    },
    result_selector={},
    result_path=None,
    retry=[
        Retry(
            error_equals=['Lambda.ServiceException', 'Lambda.AWSLambdaException', 'Lambda.SdkClientException'],
            interval_seconds=2,
            backoff_rate=2,
            max_attempts=3
        )
    ],
    catch=[],
    id='Task-1824cda',
    type='Task',
    resource='arn:aws:states:::lambda:invoke',
    timeout_seconds_path=None,
    timeout_seconds=None,
    heartbeat_seconds_path=None,
    heartbeat_seconds=None
)
Succeed(comment=None, input_path=None, output_path=None, id='Succeed-4c067dd', type='Succeed')
Fail(comment=None, id='Fail-a6fa70c', type='Fail', cause=None, error=None)
[3]:
# ------------------------------------------------------------------------------
# Step 3. Orchestrate the Workflow
# ------------------------------------------------------------------------------
# We use this "Human-language alike", "Pythonic", "Objective Oriented"
# "Auto-complete empowered" code pattern to create a human-readable workflow
(
    workflow.start(task_invoke_lambda)
    .choice([
        # choice 1, succeed case
        (  # define condition
            sfn.not_(sfn.Var("$.body").string_equals("failed!"))
            # define next action
            .next_then(succeed)
        ),
        # choice 2, fail case
        (
            # define condition
            sfn.Var("$.body").string_equals("failed!")
            # define next action
            .next_then(fail)
        ),
    ])
)

print(workflow.serialize())
{
    'StartAt': 'Task-1824cda',
    'Comment': 'The power of aws_stepfunction library!',
    'States': {
        'Task-1824cda': {
            'Type': 'Task',
            'Resource': 'arn:aws:states:::lambda:invoke',
            'Next': 'Choice-by-Task-1824cda',
            'Parameters': {
                'Payload.$': '$',
                'FunctionName': 'arn:aws:lambda:us-east-1:669508176277:stepfunction_quick_start'
            },
            'OutputPath': '$.Payload',
            'Retry': [
                {
                    'ErrorEquals': [
                        'Lambda.ServiceException',
                        'Lambda.AWSLambdaException',
                        'Lambda.SdkClientException'
                    ],
                    'IntervalSeconds': 2,
                    'BackoffRate': 2,
                    'MaxAttempts': 3
                }
            ]
        },
        'Choice-by-Task-1824cda': {
            'Type': 'Choice',
            'Choices': [
                {'Not': {'Variable': '$.body', 'StringEquals': 'failed!'}, 'Next': 'Succeed-4c067dd'},
                {'Variable': '$.body', 'StringEquals': 'failed!', 'Next': 'Fail-a6fa70c'}
            ]
        },
        'Succeed-4c067dd': {'Type': 'Succeed'},
        'Fail-a6fa70c': {'Type': 'Fail'}
    }
}
[4]:
# ------------------------------------------------------------------------------
# Step 4. Declare an instance of AWS State Machine for AWS console
# ------------------------------------------------------------------------------
# This is the metadata of the concrete AWS State Machine resource
state_machine = sfn.StateMachine(
    name="stepfunction_quick_start",
    workflow=workflow,
    role_arn="arn:aws:iam::669508176277:role/state-machine-role",
)
print(state_machine.get_state_machine_arn(bsm))
print(state_machine.get_state_machine_console_url(bsm))
arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start
https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/statemachines/view/arn:aws:states:us-east-1:
669508176277:stateMachine:stepfunction_quick_start
[5]:
# ------------------------------------------------------------------------------
# Step 5. Deploy / Execute / Delete State Machine
# ------------------------------------------------------------------------------
# deploy (create / update)
state_machine.deploy(bsm)
deploy state machine to 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start' ...
  not exists, create state machine ...
  done, preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/visual-editor?stateMachineArn=arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start
[5]:
{'stateMachineArn': 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start',
 'creationDate': datetime.datetime(2022, 8, 5, 19, 23, 49, 785000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': 'df4a6cf2-3448-4bb0-9e7d-c06a28622de2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'df4a6cf2-3448-4bb0-9e7d-c06a28622de2',
   'date': 'Fri, 05 Aug 2022 23:23:49 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '129'},
  'RetryAttempts': 0},
 '_deploy_action': 'create'}

image0

[6]:
# execute state machine with custom payload
state_machine.execute(bsm, payload={"name": "alice"})
execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start'
  preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:669508176277:execution:stepfunction_quick_start:8f26d437-1806-498f-941a-447b390ccdac
[6]:
{'executionArn': 'arn:aws:states:us-east-1:669508176277:execution:stepfunction_quick_start:8f26d437-1806-498f-941a-447b390ccdac',
 'startDate': datetime.datetime(2022, 8, 5, 19, 24, 14, 476000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '14025e4c-5ac3-4e7b-87ae-2eff8dc7cd9b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '14025e4c-5ac3-4e7b-87ae-2eff8dc7cd9b',
   'date': 'Fri, 05 Aug 2022 23:24:14 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '157'},
  'RetryAttempts': 0}}

[7]:
# delete state machine
state_machine.delete(bsm)
delete state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start'
  done, exam at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/statemachines/view/arn:aws:states:us-east-1:669508176277:stateMachine:stepfunction_quick_start
[7]:
{'ResponseMetadata': {'RequestId': '16346f8e-6341-4c1d-a174-f9107a0e5c27',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '16346f8e-6341-4c1d-a174-f9107a0e5c27',
   'date': 'Fri, 05 Aug 2022 23:25:53 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2'},
  'RetryAttempts': 0}}

Summary

The objective of aws_stepfunction library is to provide developer a “smooth”, “interruption free”, “enjoyable” development experience. The orchestration code itself is just like human-language, and tells the story without any comments.

In addition, the API and type hint are designed for static check and auto-complete. If you use any modern IDE / Code Editor like PyCharm, VSCode, Sublime, Eclipse, the learning curve should be minimal and the IDE will tell you all the syntax you need for coding.

[ ]: