Magic Task

What is Magic Task

The nature of the Amazon State Machine Definition is just a JSON DSL (Domain specific language). It uses special syntax like InputPath, Parameters, ResultSelector, ResultPath, OutputPath and ChoiceRule to provide basic capability to allow you to manipulate input / output data, make conditional choice. The research on user community shows that “Input/Output data handling” and “Conditional Choice” are difficult to learn and also not flexible to use.

Magic Task is a feature in aws-stepfunction library that allows developer to implement “Input/Output data handling” and “Conditional Choice” in pure python function, and automatically creates the backend lambda function and hook up your tasks. With Magic Task, you no longer need to write JSON notation and ChoiceRule at all, instead, you just write your python code.

Example

This is an example from official doc:

State Input:

{
    "comment": "Example for InputPath.",
    "dataset1": {
        "val1": 1,
        "val2": 2,
        "val3": 3
    },
    "dataset2": {
        "val1": "a",
        "val2": "b",
        "val3": "c"
    }
}

InputPath:

{
    "InputPath": "$.dataset2"
}

With the previous InputPath, the following is the JSON that is passed as the input.

{
    "val1": "a",
    "val2": "b",
    "val3": "c"
}

With Magic Task

You just need to write:

def lambda_handler(event, context):
    return event["dataset2"]

Of course, you can add data schema definition to improve readability:

import dataclasses


@dataclasses.dataclass
class InputData:
    comment: str
    dataset1: dict
    dataset2: dict


@dataclasses.dataclass
class OutputData:
    val1: str
    val2: str
    val3: str


def lambda_handler(event, context):
    input_data = InputData(**event)
    output_data = OutputData(**input_data.dataset2)
    return dataclasses.asdict(output_data)

Reference:

Learn Magic Task from Example

Consider the following E-Shop Order Processing use case. When a customer placed an order, we need to do these in sequence:

  1. generate an order details

  2. go to item catalog to get the price and calculate total price for items

  3. go to shipment catalog to get the shipment cost

  4. #2 and #3 can do in parallel

  5. add up all cost together, find the final balance of the order

  6. process the payment for the order

Below is a classic State Machine definition for this workflow. You have to use the InputPath, Parameter etc …, in task definition to implement how do you want to pass the data from one state to another.

E-Shop-Order-Processing-1

With Magic Task, you just need to write pure Python function to implement the data handling logic, and it will automatically become concrete Lambda Functions that are managed by aws_stepfunction library, without worry about deployment.

For better communication, let’s use these terminologies to

  1. Business Logic Task (in orange), a lambda function task that focus on processing the business logic.

  2. Data Handling Task (in red), a lambda function task that doesn’t do any business logic, but just manipulating the data and hook up two Business Logic Tasks.

E-Shop-Order-Processing-2

First, let’s import required libraries

[4]:
import os
import json

from pathlib_mate import Path
import aws_stepfunction as sfn
from aws_stepfunction.magic import LambdaTask

from boto_session_manager import BotoSesManager
from rich import print as rprint

dir_here = Path(os.getcwd()).absolute()

bsm = BotoSesManager(
    profile_name="aws_data_lab_sanhe_us_east_1",
    region_name="us-east-1",
)

Use Magic Task to create “Business Logic Task”

First we would like to prepare the “Business Logic Task”. aws_stepfunction provide a special LambdaTask state. It represents a Lambda Function that managed (Create / Update / Delete) by the state machine. In this example, we want to skip the effort to set up the “Business Logic Task” Lambda Function, and would like to let the aws_stepfunction library to take the heavy lifting.

Here’s the scripts:

Limitation:

  • It has to be single file python script.

  • Only has standard library and boto3 related libraries.

  • NOTE: we plan to add third party libraries support soon.

[6]:
task1_get_order_detail = LambdaTask(
    id="Task1-Get-Order-Detail",
    lbd_func_name="aws_stepfunction_magic_task_demo-task1_get_order_detail",
    lbd_package="s1_get_order_detail.py",
    lbd_handler="s1_get_order_detail.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2a_2_get_item_cost = LambdaTask(
    id="Task2a-2-Get-Item-Cost",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2a_2_get_item_cost",
    lbd_package="s2a_2_get_item_cost.py",
    lbd_handler="s2a_2_get_item_cost.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2b_2_get_ship_cost = LambdaTask(
    id="Task2b-2-Get-Ship-Cost",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost",
    lbd_package="s2b_2_get_ship_cost.py",
    lbd_handler="s2b_2_get_ship_cost.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task4_process_payment = LambdaTask(
    id="Task4-Process-Payment",
    lbd_func_name="aws_stepfunction_magic_task_demo-task4_process_payment",
    lbd_package="s4_process_payment.py",
    lbd_handler="s4_process_payment.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

Use Magic Task to create “Data Handling Task”

Then we can use Magic Task LambdaTask class to create three data handling lambda functions:

  • Use Task2a 1 Extract Items to hook up Task1 Get Order Detail and Task2a 2 Get Item Cost.

  • Use Task2b 1 Extract Ship Address to hook up Task1 Get Order Detail and Task2b 2 Get Ship Cost.

  • Use Task3 Find Balance to hook up both Task2a 2 Get Item Cost and Task2b 2 Get Ship Cost with the final Task4 Process Payment

Here’s the scripts:

[7]:
task2a_1_extract_items = LambdaTask(
    id="Task2a-1-Extract-Items",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2a_1_extract_items",
    lbd_package="s2a_1_extract_items.py",
    lbd_handler="s2a_1_extract_items.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2b_1_extract_ship_address = LambdaTask(
    id="Task2b-1-Extract-Ship-Address",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address",
    lbd_package="s2b_1_extract_ship_address.py",
    lbd_handler="s2b_1_extract_ship_address.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task3_find_balance = LambdaTask(
    id="Task3-Find-Balance",
    lbd_func_name="aws_stepfunction_magic_task_demo-task3_find_balance",
    lbd_package="s3_find_balance.py",
    lbd_handler="s3_find_balance.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)
[9]:
workflow = sfn.Workflow()
(
    workflow.start_from(task1_get_order_detail)
    .parallel([
        (
            workflow.subflow_from(task2a_1_extract_items)
            .next_then(task2a_2_get_item_cost)
            .end()
        ),
        (
            workflow.subflow_from(task2b_1_extract_ship_address)
            .next_then(task2b_2_get_ship_cost)
            .end()
        ),
    ])
    .next_then(task3_find_balance)
    .next_then(task4_process_payment)
    .end()
)

print("preview workflow definition")
print(json.dumps(workflow.serialize(), indent=4))
preview workflow definition
{
    "StartAt": "Task1-Get-Order-Detail",
    "States": {
        "Task1-Get-Order-Detail": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Next": "Parallel-after-Task1-Get-Order-Detail",
            "Parameters": {
                "Payload.$": "$",
                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task1_get_order_detail"
            },
            "OutputPath": "$.Payload",
            "Retry": [
                {
                    "ErrorEquals": [
                        "Lambda.ServiceException",
                        "Lambda.AWSLambdaException",
                        "Lambda.SdkClientException"
                    ],
                    "IntervalSeconds": 2,
                    "BackoffRate": 2,
                    "MaxAttempts": 3
                }
            ]
        },
        "Parallel-after-Task1-Get-Order-Detail": {
            "Type": "Parallel",
            "Branches": [
                {
                    "StartAt": "Task2a-1-Extract-Items",
                    "States": {
                        "Task2a-1-Extract-Items": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "Next": "Task2a-2-Get-Item-Cost",
                            "Parameters": {
                                "Payload.$": "$",
                                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2a_1_extract_items"
                            },
                            "OutputPath": "$.Payload",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "Lambda.ServiceException",
                                        "Lambda.AWSLambdaException",
                                        "Lambda.SdkClientException"
                                    ],
                                    "IntervalSeconds": 2,
                                    "BackoffRate": 2,
                                    "MaxAttempts": 3
                                }
                            ]
                        },
                        "Task2a-2-Get-Item-Cost": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "End": true,
                            "Parameters": {
                                "Payload.$": "$",
                                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2a_2_get_item_cost"
                            },
                            "OutputPath": "$.Payload",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "Lambda.ServiceException",
                                        "Lambda.AWSLambdaException",
                                        "Lambda.SdkClientException"
                                    ],
                                    "IntervalSeconds": 2,
                                    "BackoffRate": 2,
                                    "MaxAttempts": 3
                                }
                            ]
                        }
                    }
                },
                {
                    "StartAt": "Task2b-1-Extract-Ship-Address",
                    "States": {
                        "Task2b-1-Extract-Ship-Address": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "Next": "Task2b-2-Get-Ship-Cost",
                            "Parameters": {
                                "Payload.$": "$",
                                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address"
                            },
                            "OutputPath": "$.Payload",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "Lambda.ServiceException",
                                        "Lambda.AWSLambdaException",
                                        "Lambda.SdkClientException"
                                    ],
                                    "IntervalSeconds": 2,
                                    "BackoffRate": 2,
                                    "MaxAttempts": 3
                                }
                            ]
                        },
                        "Task2b-2-Get-Ship-Cost": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "End": true,
                            "Parameters": {
                                "Payload.$": "$",
                                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost"
                            },
                            "OutputPath": "$.Payload",
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "Lambda.ServiceException",
                                        "Lambda.AWSLambdaException",
                                        "Lambda.SdkClientException"
                                    ],
                                    "IntervalSeconds": 2,
                                    "BackoffRate": 2,
                                    "MaxAttempts": 3
                                }
                            ]
                        }
                    }
                }
            ],
            "Next": "Task3-Find-Balance"
        },
        "Task3-Find-Balance": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Next": "Task4-Process-Payment",
            "Parameters": {
                "Payload.$": "$",
                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task3_find_balance"
            },
            "OutputPath": "$.Payload",
            "Retry": [
                {
                    "ErrorEquals": [
                        "Lambda.ServiceException",
                        "Lambda.AWSLambdaException",
                        "Lambda.SdkClientException"
                    ],
                    "IntervalSeconds": 2,
                    "BackoffRate": 2,
                    "MaxAttempts": 3
                }
            ]
        },
        "Task4-Process-Payment": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "End": true,
            "Parameters": {
                "Payload.$": "$",
                "FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task4_process_payment"
            },
            "OutputPath": "$.Payload",
            "Retry": [
                {
                    "ErrorEquals": [
                        "Lambda.ServiceException",
                        "Lambda.AWSLambdaException",
                        "Lambda.SdkClientException"
                    ],
                    "IntervalSeconds": 2,
                    "BackoffRate": 2,
                    "MaxAttempts": 3
                }
            ]
        }
    }
}

Deploy a State Machine to AWS

Now, let’s deploy the State Machine and all necessary Magic Lambda Functions and also includes default S3 Bucket (to store lambda deployment artifacts) and IAM Role (for basic lambda execution role).

[10]:
sfn_name = "aws_stepfunction_magic_task_demo"

state_machine = sfn.StateMachine(
    name=sfn_name,
    workflow=workflow,
    role_arn="arn:aws:iam::669508176277:role/sanhe-for-everything-admin",
)
state_machine.set_type_as_express()

deploy_result = state_machine.deploy(bsm, verbose=True)
detect whether the magic task is used ...
    yes
identify necessary S3 bucket and IAM role ...
    need to create S3 Bucket '669508176277-us-east-1-aws-stepfunction-python-sdk'
    we need a default IAM role for lambda function
    need to create IAM Role 'aws-stepfunction-python-sdk-magic-task-role'
    done
deploy S3 and IAM ...
    preview cloudformation stack status: https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringStatus=active&filteringText=aws-stepfunction-magic-task-demo&viewNested=true&hideStacks=false&stackId=
wait 'aws-stepfunction-magic-task-demo' stack to complete ...
    elapsed 0 seconds ...
    elapsed 5 seconds ...
    elapsed 10 seconds ...
    elapsed 15 seconds ...
    elapsed 20 seconds ...
    elapsed 25 seconds ...
    done
deploy Lambda Functions ...
    upload lambda deployment artifacts ...
        upload from /Users/sanhehu/tmp/15598fc58f34fc1f96961b2ac2bfa8d4.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/a545c26c83561866d4aba791976948d8.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task1_get_order_detail
        upload from /Users/sanhehu/tmp/e80c5b01037301fc9fc7505d74c5ff91.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/9c6078c9a0ed07b904b21dc8e710bebd.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task2a_1_extract_items
        upload from /Users/sanhehu/tmp/fa760dbc90fabef2e87d6513173ce8d9.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/b4077b1decf3da0ca2102077745c1d73.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task2a_2_get_item_cost
        upload from /Users/sanhehu/tmp/000dc7fe1ad3d583858d410068f75c6b.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/4ebd33b4da1e2dca0e2338b22ec4164f.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address
        upload from /Users/sanhehu/tmp/009d9755c81982f704e9ac16aee18c5a.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/bd0ae3f3dc396dac10a3bd61511bcddf.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost
        upload from /Users/sanhehu/tmp/fd78f0b11843a3b1b66551709b750844.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/c3b94e8b6666b94610f163818efdd567.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task3_find_balance
        upload from /Users/sanhehu/tmp/1d8e921ecf9d9e39751575d141476cbf.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/6a5f3157ee87d81c9a1a5baf4a43fb74.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task4_process_payment
deploy magic task Lambda Function ...
    preview cloudformation stack status: https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringStatus=active&filteringText=aws-stepfunction-magic-task-demo&viewNested=true&hideStacks=false&stackId=
wait 'aws-stepfunction-magic-task-demo' stack to complete ...
    elapsed 0 seconds ...
    elapsed 5 seconds ...
    elapsed 10 seconds ...
    elapsed 15 seconds ...
    done
deploy state machine to 'arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo' ...
  not exists, create state machine ...
  done, preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/visual-editor?stateMachineArn=arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo

You can click the preview link to see the workflow definition in Canvas

e-shop-order-processing-workflow

Execute the State Machine with Custom Input Data

You can run your state machine with custom input data and see the output data immediately.

[11]:
execute_result = state_machine.execute(
    bsm,
    payload={"order_id": "order-1"},
    sync=True,
)

input = json.loads(execute_result["input"])
output = json.loads(execute_result["output"])
print(f"\ninput:\n")
print(json.dumps(input))
print(f"\noutput:\n")
print(json.dumps(output))
execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo'
  preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/express-executions/details/arn:aws:states:us-east-1:669508176277:express:aws_stepfunction_magic_task_demo:34c29e53-13df-43d1-b859-6c527da193e8:6452de94-c837-45f5-8df2-8b6f6484e6a6?startDate=1668018377096

input:

{"order_id": "order-1"}

output:

{"status": "success"}

You can just click the link to preview the result and also intermediate data in a nice GUI.

execute-result

[ ]: