Logic Flow

The purpose of StepFunction is to orchestrate many tasks working together, and transition from one to another conditionally. Overall, there are four generic flow patterns, and all other patterns are just combination of them:

  • Simple straight chain

  • Parallel execute sub-workflow

  • Execute the same sub-workflow many times with different parameters

  • Choice branch based on condition

In this tutorial, we would like to use ‘Pass’ state to simulate the ‘Task’ state, so we can focus on the logic flow orchestration.

[31]:
import aws_stepfunction as sfn
from rich import print

Simple Straight Chain

Let’s create the long-chain workflow in aws_stepfunction.

logic-flow-chain

[18]:
workflow = sfn.Workflow()

lambda_1 = sfn.Pass(id="Lambda 1")
lambda_2 = sfn.Pass(id="Lambda 2")
lambda_3 = sfn.Pass(id="Lambda 3")
[32]:
# use bracket to enclose your logic flow
# so you don't need to add annoying "\" at the end of each line
(
    workflow.start_from(lambda_1)
    .wait(seconds=3) # you don't have to explicitly define a 'Wait' State
    .next_then(lambda_2)
    .next_then(sfn.Wait(seconds=3)) # you still can if you want
    .next_then(lambda_3)
    .end()
)

print(workflow.serialize())
{
    'StartAt': 'Lambda 1',
    'States': {
        'Input Parameter Handling': {
            'Type': 'Pass',
            'Result': {'action': 'run lambda job'},
            'Next': 'Choice-by-Input Parameter Handling'
        },
        'Choice-by-Input Parameter Handling': {
            'Type': 'Choice',
            'Choices': [
                {'Variable': '$.action', 'StringEquals': 'run lambda job', 'Next': 'Run Lambda Job'},
                {'Variable': '$.action', 'StringEquals': 'run glue job', 'Next': 'Run Glue Job'}
            ],
            'Default': 'Fail'
        },
        'Run Lambda Job': {'Type': 'Pass', 'Next': 'Send Notification'},
        'Run Glue Job': {'Type': 'Pass', 'Next': 'Send Notification'},
        'Fail': {'Type': 'Fail'},
        'Send Notification': {'Type': 'Pass', 'End': True},
        'Lambda 1': {'Type': 'Pass', 'Next': 'Wait-after-Lambda 1'},
        'Wait-after-Lambda 1': {'Type': 'Wait', 'Seconds': 3, 'Next': 'Lambda 2'},
        'Lambda 2': {'Type': 'Pass', 'Next': 'Wait-077b428'},
        'Wait-077b428': {'Type': 'Wait', 'Seconds': 3, 'Next': 'Lambda 3'},
        'Lambda 3': {'Type': 'Pass', 'End': True}
    }
}

Parallel execute sub-workflow

From one point, we execute multiple sub-workflow concurrently and then merge back and continue.

logic-flow-parallel

[36]:
workflow = sfn.Workflow()

input_parameter_handling = sfn.Pass(id="Input Parameter Handling")
subflow1_step1 = sfn.Pass(id="Subflow 1 - Step 1")
subflow1_step2 = sfn.Pass(id="Subflow 1 - Step 2")
subflow2_step1 = sfn.Pass(id="Subflow 2 - Step 1")
subflow2_step2 = sfn.Pass(id="Subflow 2 - Step 2")
clean_up = sfn.Pass(id="Clean Up")
[37]:
(
    workflow.start_from(input_parameter_handling)
    .parallel([
        (   # create a subflow in parallel
            workflow.subflow_from(subflow1_step1)
            .next_then(subflow1_step2)
            .end() # each subflow in parallel has to end
        ),
        (
            workflow.subflow_from(subflow2_step1)
            .next_then(subflow2_step2)
            .end()
        ),
    ])
    .next_then(clean_up)
    .end()
)

print(workflow.serialize())
{
    'StartAt': 'Input Parameter Handling',
    'States': {
        'Input Parameter Handling': {'Type': 'Pass', 'Next': 'Parallel-after-Input Parameter Handling'},
        'Parallel-after-Input Parameter Handling': {
            'Type': 'Parallel',
            'Branches': [
                {
                    'StartAt': 'Subflow 1 - Step 1',
                    'States': {
                        'Subflow 1 - Step 1': {'Type': 'Pass', 'Next': 'Subflow 1 - Step 2'},
                        'Subflow 1 - Step 2': {'Type': 'Pass', 'End': True}
                    }
                },
                {
                    'StartAt': 'Subflow 2 - Step 1',
                    'States': {
                        'Subflow 2 - Step 1': {'Type': 'Pass', 'Next': 'Subflow 2 - Step 2'},
                        'Subflow 2 - Step 2': {'Type': 'Pass', 'End': True}
                    }
                }
            ],
            'Next': 'Clean Up'
        },
        'Clean Up': {'Type': 'Pass', 'End': True}
    }
}

Execute the same sub-workflow many times with different parameters

We execute multiple sub-workflow with same logic but different input arguments. For example, we may have a data processing pipeline that can handle different data sources.

logic-flow-map

[40]:
workflow = sfn.Workflow()

input_parameter_handling = sfn.Pass(
    id="Input Parameter Handling",
    result={"dataset_name_list": ["customers", "sales", "orders"]}
)
read_data = sfn.Pass(id="Read Data")
process_data = sfn.Pass(id="Process Data")
write_data = sfn.Pass(id="Write Data")
clean_up = sfn.Pass(id="Clean Up")
[41]:
(
    workflow.start_from(input_parameter_handling)
    .map(
        (   # create a subflow for mapping
            workflow.subflow_from(read_data)
            .next_then(process_data)
            .next_then(write_data)
            .end()
        ),
        items_path="$.dataset_name_list", # each item in the list is the input argument in the subflow
    )
    .next_then(clean_up)
    .end()
)

print(workflow.serialize())
{
    'StartAt': 'Input Parameter Handling',
    'States': {
        'Input Parameter Handling': {
            'Type': 'Pass',
            'Result': {'dataset_name_list': ['customers', 'sales', 'orders']},
            'Next': 'Map-after-Input Parameter Handling'
        },
        'Map-after-Input Parameter Handling': {
            'Type': 'Map',
            'Iterator': {
                'StartAt': 'Read Data',
                'States': {
                    'Read Data': {'Type': 'Pass', 'Next': 'Process Data'},
                    'Process Data': {'Type': 'Pass', 'Next': 'Write Data'},
                    'Write Data': {'Type': 'Pass', 'End': True}
                }
            },
            'ItemsPath': '$.dataset_name_list',
            'Next': 'Clean Up'
        },
        'Clean Up': {'Type': 'Pass', 'End': True}
    }
}

Choice branch based on condition

We jump to different branch based on the condition test. Each branch is just another start with a state, NOT a sub-workflow.

logic-flow-choice

[45]:
workflow = sfn.Workflow()

input_parameter_handling = sfn.Pass(
    id="Input Parameter Handling",
    result={"action": "run lambda job"},
)
run_lambda_job = sfn.Pass(id="Run Lambda Job")
run_glue_job = sfn.Pass(id="Run Glue Job")
fail = sfn.Fail(id="Fail")
send_notification = sfn.Pass(id="Send Notification")
[46]:
(
    workflow.start_from(input_parameter_handling)
    .choice(
        [
            # as a human, I cannot remember all the data test expression
            # the IDE will automatically tell me the correct syntax
            sfn.Var("$.action").string_equals("run lambda job").next_then(run_lambda_job),
            sfn.Var("$.action").string_equals("run glue job").next_then(run_glue_job),
        ],
        default=fail, # set the default state
    )
)

workflow.continue_from(run_lambda_job).next_then(send_notification)
workflow.continue_from(run_glue_job).next_then(send_notification).end()

print(workflow.serialize())
{
    'StartAt': 'Input Parameter Handling',
    'States': {
        'Input Parameter Handling': {
            'Type': 'Pass',
            'Result': {'action': 'run lambda job'},
            'Next': 'Choice-by-Input Parameter Handling'
        },
        'Choice-by-Input Parameter Handling': {
            'Type': 'Choice',
            'Choices': [
                {'Variable': '$.action', 'StringEquals': 'run lambda job', 'Next': 'Run Lambda Job'},
                {'Variable': '$.action', 'StringEquals': 'run glue job', 'Next': 'Run Glue Job'}
            ],
            'Default': 'Fail'
        },
        'Run Lambda Job': {'Type': 'Pass', 'Next': 'Send Notification'},
        'Run Glue Job': {'Type': 'Pass', 'Next': 'Send Notification'},
        'Fail': {'Type': 'Fail'},
        'Send Notification': {'Type': 'Pass', 'End': True}
    }
}
[44]: