Magic Task¶
What is Magic Task¶
The nature of the Amazon State Machine Definition is just a JSON DSL (Domain specific language). It uses special syntax like InputPath
, Parameters
, ResultSelector
, ResultPath
, OutputPath
and ChoiceRule
to provide basic capability to allow you to manipulate input / output data, make conditional choice. The research on user community shows that “Input/Output data handling” and “Conditional Choice” are difficult to learn and also not flexible to use.
Magic Task is a feature in aws-stepfunction
library that allows developer to implement “Input/Output data handling” and “Conditional Choice” in pure python function, and automatically creates the backend lambda function and hook up your tasks. With Magic Task, you no longer need to write JSON notation
and ChoiceRule
at all, instead, you just write your python code.
Example
This is an example from official doc:
State Input:
{
"comment": "Example for InputPath.",
"dataset1": {
"val1": 1,
"val2": 2,
"val3": 3
},
"dataset2": {
"val1": "a",
"val2": "b",
"val3": "c"
}
}
InputPath
:
{
"InputPath": "$.dataset2"
}
With the previous InputPath, the following is the JSON that is passed as the input.
{
"val1": "a",
"val2": "b",
"val3": "c"
}
With Magic Task
You just need to write:
def lambda_handler(event, context):
return event["dataset2"]
Of course, you can add data schema definition to improve readability:
import dataclasses
@dataclasses.dataclass
class InputData:
comment: str
dataset1: dict
dataset2: dict
@dataclasses.dataclass
class OutputData:
val1: str
val2: str
val3: str
def lambda_handler(event, context):
input_data = InputData(**event)
output_data = OutputData(**input_data.dataset2)
return dataclasses.asdict(output_data)
Reference:
Learn Magic Task from Example¶
Consider the following E-Shop Order Processing use case. When a customer placed an order, we need to do these in sequence:
generate an order details
go to item catalog to get the price and calculate total price for items
go to shipment catalog to get the shipment cost
#2 and #3 can do in parallel
add up all cost together, find the final balance of the order
process the payment for the order
Below is a classic State Machine definition for this workflow. You have to use the InputPath
, Parameter
etc …, in task definition to implement how do you want to pass the data from one state to another.
With Magic Task, you just need to write pure Python function to implement the data handling logic, and it will automatically become concrete Lambda Functions that are managed by aws_stepfunction
library, without worry about deployment.
For better communication, let’s use these terminologies to
Business Logic Task (in orange), a lambda function task that focus on processing the business logic.
Data Handling Task (in red), a lambda function task that doesn’t do any business logic, but just manipulating the data and hook up two Business Logic Tasks.
First, let’s import required libraries
[4]:
import os
import json
from pathlib_mate import Path
import aws_stepfunction as sfn
from aws_stepfunction.magic import LambdaTask
from boto_session_manager import BotoSesManager
from rich import print as rprint
dir_here = Path(os.getcwd()).absolute()
bsm = BotoSesManager(
profile_name="aws_data_lab_sanhe_us_east_1",
region_name="us-east-1",
)
Use Magic Task to create “Business Logic Task”¶
First we would like to prepare the “Business Logic Task”. aws_stepfunction
provide a special LambdaTask
state. It represents a Lambda Function that managed (Create / Update / Delete) by the state machine. In this example, we want to skip the effort to set up the “Business Logic Task” Lambda Function, and would like to let the aws_stepfunction
library to take the heavy lifting.
Here’s the scripts:
Limitation:
It has to be single file python script.
Only has standard library and boto3 related libraries.
NOTE: we plan to add third party libraries support soon.
[6]:
task1_get_order_detail = LambdaTask(
id="Task1-Get-Order-Detail",
lbd_func_name="aws_stepfunction_magic_task_demo-task1_get_order_detail",
lbd_package="s1_get_order_detail.py",
lbd_handler="s1_get_order_detail.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
task2a_2_get_item_cost = LambdaTask(
id="Task2a-2-Get-Item-Cost",
lbd_func_name="aws_stepfunction_magic_task_demo-task2a_2_get_item_cost",
lbd_package="s2a_2_get_item_cost.py",
lbd_handler="s2a_2_get_item_cost.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
task2b_2_get_ship_cost = LambdaTask(
id="Task2b-2-Get-Ship-Cost",
lbd_func_name="aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost",
lbd_package="s2b_2_get_ship_cost.py",
lbd_handler="s2b_2_get_ship_cost.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
task4_process_payment = LambdaTask(
id="Task4-Process-Payment",
lbd_func_name="aws_stepfunction_magic_task_demo-task4_process_payment",
lbd_package="s4_process_payment.py",
lbd_handler="s4_process_payment.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
Use Magic Task to create “Data Handling Task”¶
Then we can use Magic Task LambdaTask
class to create three data handling lambda functions:
Use
Task2a 1 Extract Items
to hook upTask1 Get Order Detail
andTask2a 2 Get Item Cost
.Use
Task2b 1 Extract Ship Address
to hook upTask1 Get Order Detail
andTask2b 2 Get Ship Cost
.Use
Task3 Find Balance
to hook up bothTask2a 2 Get Item Cost
andTask2b 2 Get Ship Cost
with the finalTask4 Process Payment
Here’s the scripts:
[7]:
task2a_1_extract_items = LambdaTask(
id="Task2a-1-Extract-Items",
lbd_func_name="aws_stepfunction_magic_task_demo-task2a_1_extract_items",
lbd_package="s2a_1_extract_items.py",
lbd_handler="s2a_1_extract_items.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
task2b_1_extract_ship_address = LambdaTask(
id="Task2b-1-Extract-Ship-Address",
lbd_func_name="aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address",
lbd_package="s2b_1_extract_ship_address.py",
lbd_handler="s2b_1_extract_ship_address.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
task3_find_balance = LambdaTask(
id="Task3-Find-Balance",
lbd_func_name="aws_stepfunction_magic_task_demo-task3_find_balance",
lbd_package="s3_find_balance.py",
lbd_handler="s3_find_balance.lambda_handler",
lbd_aws_account_id=bsm.aws_account_id,
lbd_aws_region=bsm.aws_region,
)
[9]:
workflow = sfn.Workflow()
(
workflow.start_from(task1_get_order_detail)
.parallel([
(
workflow.subflow_from(task2a_1_extract_items)
.next_then(task2a_2_get_item_cost)
.end()
),
(
workflow.subflow_from(task2b_1_extract_ship_address)
.next_then(task2b_2_get_ship_cost)
.end()
),
])
.next_then(task3_find_balance)
.next_then(task4_process_payment)
.end()
)
print("preview workflow definition")
print(json.dumps(workflow.serialize(), indent=4))
preview workflow definition
{
"StartAt": "Task1-Get-Order-Detail",
"States": {
"Task1-Get-Order-Detail": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Next": "Parallel-after-Task1-Get-Order-Detail",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task1_get_order_detail"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
},
"Parallel-after-Task1-Get-Order-Detail": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Task2a-1-Extract-Items",
"States": {
"Task2a-1-Extract-Items": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Next": "Task2a-2-Get-Item-Cost",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2a_1_extract_items"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
},
"Task2a-2-Get-Item-Cost": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"End": true,
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2a_2_get_item_cost"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
}
}
},
{
"StartAt": "Task2b-1-Extract-Ship-Address",
"States": {
"Task2b-1-Extract-Ship-Address": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Next": "Task2b-2-Get-Ship-Cost",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
},
"Task2b-2-Get-Ship-Cost": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"End": true,
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
}
}
}
],
"Next": "Task3-Find-Balance"
},
"Task3-Find-Balance": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Next": "Task4-Process-Payment",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task3_find_balance"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
},
"Task4-Process-Payment": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"End": true,
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:669508176277:function:aws_stepfunction_magic_task_demo-task4_process_payment"
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"BackoffRate": 2,
"MaxAttempts": 3
}
]
}
}
}
Deploy a State Machine to AWS¶
Now, let’s deploy the State Machine and all necessary Magic Lambda Functions and also includes default S3 Bucket (to store lambda deployment artifacts) and IAM Role (for basic lambda execution role).
[10]:
sfn_name = "aws_stepfunction_magic_task_demo"
state_machine = sfn.StateMachine(
name=sfn_name,
workflow=workflow,
role_arn="arn:aws:iam::669508176277:role/sanhe-for-everything-admin",
)
state_machine.set_type_as_express()
deploy_result = state_machine.deploy(bsm, verbose=True)
detect whether the magic task is used ...
yes
identify necessary S3 bucket and IAM role ...
need to create S3 Bucket '669508176277-us-east-1-aws-stepfunction-python-sdk'
we need a default IAM role for lambda function
need to create IAM Role 'aws-stepfunction-python-sdk-magic-task-role'
done
deploy S3 and IAM ...
preview cloudformation stack status: https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringStatus=active&filteringText=aws-stepfunction-magic-task-demo&viewNested=true&hideStacks=false&stackId=
wait 'aws-stepfunction-magic-task-demo' stack to complete ...
elapsed 0 seconds ...
elapsed 5 seconds ...
elapsed 10 seconds ...
elapsed 15 seconds ...
elapsed 20 seconds ...
elapsed 25 seconds ...
done
deploy Lambda Functions ...
upload lambda deployment artifacts ...
upload from /Users/sanhehu/tmp/15598fc58f34fc1f96961b2ac2bfa8d4.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/a545c26c83561866d4aba791976948d8.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task1_get_order_detail
upload from /Users/sanhehu/tmp/e80c5b01037301fc9fc7505d74c5ff91.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/9c6078c9a0ed07b904b21dc8e710bebd.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task2a_1_extract_items
upload from /Users/sanhehu/tmp/fa760dbc90fabef2e87d6513173ce8d9.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/b4077b1decf3da0ca2102077745c1d73.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task2a_2_get_item_cost
upload from /Users/sanhehu/tmp/000dc7fe1ad3d583858d410068f75c6b.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/4ebd33b4da1e2dca0e2338b22ec4164f.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address
upload from /Users/sanhehu/tmp/009d9755c81982f704e9ac16aee18c5a.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/bd0ae3f3dc396dac10a3bd61511bcddf.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost
upload from /Users/sanhehu/tmp/fd78f0b11843a3b1b66551709b750844.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/c3b94e8b6666b94610f163818efdd567.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task3_find_balance
upload from /Users/sanhehu/tmp/1d8e921ecf9d9e39751575d141476cbf.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/6a5f3157ee87d81c9a1a5baf4a43fb74.zip
declare Lambda Function aws_stepfunction_magic_task_demo-task4_process_payment
deploy magic task Lambda Function ...
preview cloudformation stack status: https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringStatus=active&filteringText=aws-stepfunction-magic-task-demo&viewNested=true&hideStacks=false&stackId=
wait 'aws-stepfunction-magic-task-demo' stack to complete ...
elapsed 0 seconds ...
elapsed 5 seconds ...
elapsed 10 seconds ...
elapsed 15 seconds ...
done
deploy state machine to 'arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo' ...
not exists, create state machine ...
done, preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/visual-editor?stateMachineArn=arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo
You can click the preview link to see the workflow definition in Canvas
Execute the State Machine with Custom Input Data¶
You can run your state machine with custom input data and see the output data immediately.
[11]:
execute_result = state_machine.execute(
bsm,
payload={"order_id": "order-1"},
sync=True,
)
input = json.loads(execute_result["input"])
output = json.loads(execute_result["output"])
print(f"\ninput:\n")
print(json.dumps(input))
print(f"\noutput:\n")
print(json.dumps(output))
execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo'
preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/express-executions/details/arn:aws:states:us-east-1:669508176277:express:aws_stepfunction_magic_task_demo:34c29e53-13df-43d1-b859-6c527da193e8:6452de94-c837-45f5-8df2-8b6f6484e6a6?startDate=1668018377096
input:
{"order_id": "order-1"}
output:
{"status": "success"}
You can just click the link to preview the result and also intermediate data in a nice GUI.
[ ]: