Source code for aws_stepfunction.state

# -*- coding: utf-8 -*-

"""

"""

import typing as T

import attr
import attr.validators as vs

from . import exc
from .constant import (
    Constant as C,
    ErrorCodeEnum,
)
from .utils import short_uuid, is_json_path
from .model import StepFunctionObject
from .choice_rule import ChoiceRule

if T.TYPE_CHECKING:  # pragma: no cover
    from .workflow import Workflow


# ------------------------------------------------------------------------------
# State Data Model
# ------------------------------------------------------------------------------
[docs]@attr.s class State(StepFunctionObject): """ Represent a step / a state in a workflow. :param _uuid: for internal implementation. we track the associated state machine of each state object. Serialization Field Order: 1. 先展示信息量最大的, 例如 Type, Comment 2. 再展示跟当前 State 的逻辑紧密相关的, 例如 Task 就需要关注 Resource, Parallel 就需要关注 Branches, Map 就需要关注 Iterator 等等 3. 接下来展示跟流程相关的 Next, End 4. 最后展示详细的 Input Output 处理的细节 """ id: str = attr.ib( factory=lambda: short_uuid(), validator=vs.instance_of(str), ) type: str = attr.ib( default="UnknownState", metadata={C.ALIAS: C.Type}, ) comment: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Comment}, ) def _serialize(self) -> dict: data = self.to_dict() data = self._to_alias(data) data.pop("id") if data.get(C.ResultPath, None) == "null": data[C.ResultPath] = None return data def _short_repr(self) -> str: return f"{self.type}(id={self.id!r})" # since json path attribute is so common, # we should create a validator for that def _check_json_path(self, attr: str, value: str): if not is_json_path(value): raise exc.StateValidationError.make( self, ( f"State.{attr} = {value!r} is not a valid JSON path!" ) ) def _check_opt_json_path(self, attr: str, value: T.Optional[str]): if value is not None: self._check_json_path(attr, value) def _is_magic(self) -> bool: return False
@attr.s class _HasNextOrEnd(State): next: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Next} ) end: T.Optional[bool] = attr.ib( default=None, metadata={C.ALIAS: C.End} ) def _check_next_and_end(self): if self.end is True: if self.next: # when "End" is True, you can NOT have "Next" raise exc.StateValidationError.make( self, f"{C.End!r} is True, " f"but the {C.Next!r} is also defined!" ) else: if not self.next: # when "End" is not True, you HAVE TO have "Next" raise exc.StateValidationError.make( self, f"{C.End!r} is False, " f"but the {C.Next!r} is not defined!" ) @attr.s class _HasInputOutput(State): input_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.InputPath}, ) output_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.OutputPath}, ) def _check_input_output_path(self): self._check_opt_json_path(C.InputPath, self.input_path) self._check_opt_json_path(C.OutputPath, self.output_path) @attr.s class _HasParameters(State): parameters: T.Dict[str, T.Any] = attr.ib( factory=dict, metadata={C.ALIAS: C.Parameters}, ) @attr.s class _HasResultSelector(State): result_selector: T.Dict[str, T.Any] = attr.ib( factory=dict, metadata={C.ALIAS: C.ResultSelector}, ) @attr.s class _HasResultPath(State): result_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.ResultPath}, ) def _check_result_path(self): if self.result_path is not None: if self.result_path != "null": self._check_json_path(C.ResultPath, self.result_path) def use_task_result(self) -> T.Union[ 'Task', 'Parallel', 'Map', 'Pass' ]: self.result_path = "$" return self def discard_the_result_and_keep_original_input(self) -> T.Union[ 'Task', 'Parallel', 'Map', 'Pass' ]: self.result_path = "null" return self @attr.s class _RetryOrCatch(StepFunctionObject): error_equals: T.List[str] = attr.ib( factory=list, metadata={C.ALIAS: C.ErrorEquals}, ) @classmethod def new(cls) -> '_RetryOrCatch': return cls() def _check_error_codes(self): if len(self.error_equals) == 0: raise exc.StateValidationError( f"{C.ErrorEquals!r} has to be a NON EMPTY list!" ) for error_code in self.error_equals: if not ErrorCodeEnum.contains(error_code): raise exc.StateValidationError( f"{error_code!r} is not a valid Error Code!" ) def _add_error(self, error_code: str) -> '_RetryOrCatch': if error_code not in self.error_equals: self.error_equals.append(error_code) return self def if_all_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.AllError.value) def if_heartbeat_timeout_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.HeartbeatTimeoutError.value) def if_timeout_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.TimeoutError.value) def if_task_failed_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.TaskFailedError.value) def if_permissions_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.PermissionsError.value) def if_result_path_match_failure_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.ResultPathMatchFailureError.value) def if_parameter_path_failure_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.ParameterPathFailureError.value) def if_branch_failed_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.BranchFailedError.value) def if_no_choice_matched_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.NoChoiceMatchedError.value) def if_intrinsic_failure_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.IntrinsicFailureError.value) def if_data_limit_exceeded_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.DataLimitExceededError.value) def if_lambda_unknown_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.LambdaUnknownError.value) def if_lambda_service_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.LambdaServiceError.value) def if_lambda_aws_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.LambdaAWSError.value) def if_lambda_sdk_client_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.LambdaSdkClientError.value) def if_lambda_too_many_requests_error(self) -> '_RetryOrCatch': return self._add_error(ErrorCodeEnum.LambdaTooManyRequestsError.value) def _serialize(self) -> dict: data = self.to_dict() data = self._to_alias(data) return data
[docs]@attr.s class Retry(_RetryOrCatch): """ Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html#error-handling-retrying-after-an-error """ interval_seconds: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.IntervalSeconds}, ) backoff_rate: T.Optional[T.Union[float, int]] = attr.ib( default=None, metadata={C.ALIAS: C.BackoffRate}, ) max_attempts: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.MaxAttempts}, ) def with_interval_seconds(self, sec: int) -> 'Retry': self.interval_seconds = sec return self def with_back_off_rate(self, rate: T.Union[float, int]) -> 'Retry': self.backoff_rate = rate return self def with_max_attempts(self, attempts: int) -> 'Retry': self.max_attempts = attempts return self def _pre_serialize_validation(self): self._check_error_codes()
[docs]@attr.s class Catch(_RetryOrCatch): """ Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html#error-handling-fallback-states """ next: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Next}, ) result_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.ResultPath}, ) def next_then(self, state: 'StateType'): self.next = state.id return self
[docs] def with_result_path(self, result_path: str): """ A path that determines what input is sent to the state specified in the Next field. """ self.result_path = result_path return self
def _check_next(self): if self.next is None: raise exc.ValidationError(f"{C.Catch}.{C.Next} is not defined!") def _check_result(self): if self.result_path is not None: if not is_json_path(self.result_path): raise exc.ValidationError( f"{C.Catch}.{C.ResultPath} = {self.result_path!r} " f"is not a valid JSON path!" ) def _pre_serialize_validation(self): self._check_error_codes() self._check_next()
@attr.s class _HasRetryCatch(State): retry: T.List['Retry'] = attr.ib( factory=list, metadata={C.ALIAS: C.Retry}, ) catch: T.List['Catch'] = attr.ib( factory=list, metadata={C.ALIAS: C.Catch}, ) def _serialize_retry_catch_fields(self, data: dict) -> dict: if self.retry: data[C.Retry] = [ retry.serialize() for retry in self.retry ] if self.catch: data[C.Catch] = [ catch.serialize() for catch in self.catch ] return data
[docs]@attr.s class Task( _HasNextOrEnd, _HasInputOutput, _HasParameters, _HasResultSelector, _HasResultPath, _HasRetryCatch, ): """ A Task state ``("Type": "Task")`` represents a single unit of work performed by a state machine. All work in your state machine is done by tasks. A task performs work by using an activity or an AWS Lambda function, or by passing parameters to the API actions of other services. :param resource: A URI, especially an ARN that uniquely identifies the specific task to execute. :param timeout_seconds: If the task runs longer than the specified seconds, this state fails with a States.Timeout error name. Must be a positive, non-zero integer. If not provided, the default value is 99999999. The count begins after the task has been started, for example, when ``ActivityStarted`` or ``LambdaFunctionStarted`` are logged in the Execution event history. :param timeout_seconds_path: If you want to provide a timeout value dynamically from the state input using a reference path, use ``TimeoutSecondsPath``. When resolved, the reference path must select fields whose values are positive integers. :param heartbeat_seconds: If more time than the specified seconds elapses between heartbeats from the task, this state fails with a States. Timeout error name. Must be a positive, non-zero integer less than the number of seconds specified in the ``TimeoutSeconds`` field. If not provided, the default value is 99999999. For Activities, the count begins when ``GetActivityTask`` receives a token and ``ActivityStarted`` is logged in the Execution event history. :param heartbeat_seconds_path: If you want to provide a heartbeat value dynamically from the state input using a reference path, use ``HeartbeatSecondsPath``. When resolved, the reference path must select fields whose values are positive integers. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-task-state.html """ id: str = attr.ib( factory=lambda: f"{C.Task}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Task, metadata={C.ALIAS: C.Type}, ) resource: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Resource}, ) timeout_seconds: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.TimeoutSeconds}, ) timeout_seconds_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.TimeoutSecondsPath}, ) heartbeat_seconds: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.HeartbeatSeconds}, ) heartbeat_seconds_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.HeartbeatSecondsPath}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Resource, C.TimeoutSeconds, C.TimeoutSecondsPath, C.HeartbeatSeconds, C.HeartbeatSecondsPath, # flow C.Next, C.End, # input output C.InputPath, C.Parameters, C.ResultSelector, C.ResultPath, C.OutputPath, # error handling C.Retry, C.Catch, ] def update( self, id: T.Optional[str] = None, resource: T.Optional[str] = None, timeout_seconds_path: T.Optional[str] = None, timeout_seconds: T.Optional[int] = None, heartbeat_seconds_path: T.Optional[str] = None, heartbeat_seconds: T.Optional[int] = None, next: T.Optional[str] = None, end: T.Optional[bool] = None, input_path: T.Optional[str] = None, output_path: T.Optional[str] = None, parameters: T.Optional[str] = None, result_selector: T.Optional[str] = None, result_path: T.Optional[str] = None, retry: T.Optional[T.List['Retry']] = None, catch: T.Optional[T.List['Catch']] = None, ): for attr, value in [ ("id", id), ("resource", resource), ("timeout_seconds_path", timeout_seconds_path), ("timeout_seconds", timeout_seconds), ("heartbeat_seconds_path", heartbeat_seconds_path), ("heartbeat_seconds", heartbeat_seconds), ("next", next), ("end", end), ("input_path", input_path), ("output_path", output_path), ("parameters", parameters), ("result_selector", result_selector), ("result_path", result_path), ("retry", retry), ("catch", catch), ]: if value is not None: setattr(self, attr, value) return self def _check_resource(self): if self.resource is None: raise exc.StateValidationError.make( self, f"{C.Resource!r} is not defined!" ) def _check_timeout(self): if sum([ bool(self.timeout_seconds), bool(self.timeout_seconds_path), ]) == 2: raise exc.StateValidationError.make( self, "cannot include both " "'timeout_seconds' and 'timeout_seconds_path'!" ) def _check_heartbeat(self): if sum([ bool(self.heartbeat_seconds), bool(self.heartbeat_seconds_path), ]) == 2: raise exc.StateValidationError.make( self, "cannot include both " "'heartbeat_seconds' and 'heartbeat_seconds_path'!" ) def _pre_serialize_validation(self): self._check_next_and_end() self._check_input_output_path() self._check_result_path() self._check_resource() self._check_timeout() self._check_heartbeat() self._check_opt_json_path(C.TimeoutSecondsPath, self.timeout_seconds_path) self._check_opt_json_path(C.HeartbeatSecondsPath, self.heartbeat_seconds_path) def _serialize(self) -> dict: data = super()._serialize() data = self._serialize_retry_catch_fields(data) return data
[docs]@attr.s class Parallel( _HasNextOrEnd, _HasInputOutput, _HasParameters, _HasResultSelector, _HasResultPath, _HasRetryCatch, ): """ The Parallel state ``("Type": "Parallel")`` can be used to create parallel branches of execution in your state machine. In addition to the common state fields, Parallel states include these additional fields. :param branches: An array of objects that specify state machines to execute in parallel. Each such state machine object must have fields named States and StartAt, whose meanings are exactly like those in the top level of a state machine. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-parallel-state.html """ id: str = attr.ib( factory=lambda: f"{C.Parallel}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Parallel, metadata={C.ALIAS: C.Type}, ) branches: T.List['Workflow'] = attr.ib( factory=list, metadata={C.ALIAS: C.Branches}, ) _field_order = [ # common # state specific # flow # input output # error handling C.Type, C.Comment, C.Branches, C.Next, C.End, C.InputPath, C.Parameters, C.ResultSelector, C.ResultPath, C.OutputPath, C.Retry, C.Catch, ] def _check_branches(self): if len(self.branches) == 0: raise exc.StateValidationError( f"{C.Parallel!r} state can not have empty {C.Branches!r}!" ) def _pre_serialize_validation(self): self._check_next_and_end() self._check_input_output_path() self._check_result_path() self._check_branches() def _serialize(self) -> dict: data = super()._serialize() data = self._serialize_retry_catch_fields(data) branches = list() for state_machine in self.branches: dct = state_machine.serialize() branches.append(dct) data[C.Branches] = branches return data
[docs]@attr.s class Map( _HasNextOrEnd, _HasInputOutput, _HasParameters, _HasResultSelector, _HasResultPath, _HasRetryCatch, ): """ The Map state ``("Type": "Map")`` can be used to run a set of steps for each element of an input array. While the Parallel state executes multiple branches of steps using the same input, a Map state will execute the same steps for multiple entries of an array in the state input. :param iterator: The Iterator field’s value is an object that defines a state machine which will process each element of the array. :param items_path: The ItemsPath field’s value is a reference path identifying where in the effective input the array field is found. For more information, see ItemsPath. States within an Iterator field can only transition to each other, and no state outside the Iterator field can transition to a state within it. If any iteration fails, entire Map state fails, and all iterations are terminated. :param max_concurrency: The MaxConcurrencyfield’s value is an integer that provides an upper bound on how many invocations of the Iterator may run in parallel. For instance, a MaxConcurrency value of 10 will limit your Map state to 10 concurrent iterations running at one time. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html """ id: str = attr.ib( factory=lambda: f"{C.Map}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Map, metadata={C.ALIAS: C.Type}, ) iterator: T.Optional['Workflow'] = attr.ib( default=None, metadata={C.ALIAS: C.Iterator}, ) items_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.ItemsPath}, ) max_concurrency: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.MaxConcurrency}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Iterator, C.ItemsPath, C.MaxConcurrency, # flow C.Next, C.End, # input output C.InputPath, C.Parameters, C.ResultSelector, C.ResultPath, C.OutputPath, # error handling C.Retry, C.Catch, ] def _pre_serialize_validation(self): self._check_next_and_end() self._check_input_output_path() self._check_result_path() self._check_opt_json_path(C.ItemsPath, self.items_path) def _serialize(self) -> dict: data = super()._serialize() data = self._serialize_retry_catch_fields(data) data[C.Iterator] = self.iterator.serialize() return data
[docs]@attr.s class Pass( _HasInputOutput, _HasNextOrEnd, _HasResultPath, _HasParameters, ): """ A Pass state ``("Type": "Pass")`` passes its input to its output, without performing work. Pass states are useful when constructing and debugging state machines. In addition to the common state fields, Pass states allow the following fields. :param result: refers to the output of a virtual task that is passed on to the next state. If you include the 'ResultPath' field in your state machine definition, 'Result' is placed as specified by 'ResultPath' and passed on to the next state. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-pass-state.html """ id: str = attr.ib( factory=lambda: f"{C.Pass}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Pass, metadata={C.ALIAS: C.Type}, ) result: T.Optional[dict] = attr.ib( default=None, metadata={C.ALIAS: C.Result}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Result, # flow C.Next, C.End, # input output C.InputPath, C.Parameters, C.ResultPath, C.OutputPath, ] def _pre_serialize_validation(self): self._check_next_and_end() self._check_input_output_path() self._check_result_path()
[docs]@attr.s class Wait( _HasInputOutput, _HasNextOrEnd, ): """ A Wait state ``("Type": "Wait")`` delays the state machine from continuing for a specified time. You can choose either a relative time, specified in seconds from when the state begins, or an absolute end time, specified as a timestamp. In addition to the common state fields, Wait states have one of the following fields. :param seconds: A time, in seconds, to wait before beginning the state specified in the Next field. You must specify time as a positive, integer value. :param timestamp: An absolute time to wait until beginning the state specified in the Next field. Timestamps must conform to the RFC3339 profile of ISO 8601, with the further restrictions that an uppercase T must separate the date and time portions, and an uppercase Z must denote that a numeric time zone offset is not present, for example, ``2016-08-18T17:33:00Z``. :param seconds_path: A time, in seconds, to wait before beginning the state specified in the Next field, specified using a path from the state's input data. You must specify an integer value for this field. :param timestamp_path: An absolute time to wait until beginning the state specified in the Next field, specified using a path from the state's input data. .. note:: You must specify exactly one of ``Seconds``, ``Timestamp``, ``SecondsPath`` or ``TimestampPath``. In addition, the maximum wait time that you can specify for Standard Workflows and Express workflows is one year and five minutes respectively. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-wait-state.html """ id: str = attr.ib( factory=lambda: f"{C.Wait}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Wait, metadata={C.ALIAS: C.Type}, ) seconds: T.Optional[int] = attr.ib( default=None, metadata={C.ALIAS: C.Seconds}, ) timestamp: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Timestamp}, ) seconds_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.SecondsPath}, ) timestamp_path: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.TimestampPath}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Seconds, C.Timestamp, C.SecondsPath, C.TimestampPath, # flow C.Next, C.End, # input output C.InputPath, C.OutputPath, ] def _check_argument(self): if sum([ bool(self.seconds), bool(self.timestamp), bool(self.seconds_path), bool(self.timestamp_path), ]) != 1: raise exc.StateValidationError.make( self, f"You have to specify exact one of " "'seconds', " "'timestamp', " "'seconds_path', " "'timestamp_path', " ) def _pre_serialize_validation(self): self._check_next_and_end() self._check_input_output_path() self._check_argument()
[docs]@attr.s class Choice( _HasInputOutput ): """ A Choice state ``("Type": "Choice")`` adds branching logic to a state machine. In addition to most of the common state fields, Choice states introduce the following additional fields. :param choices: An array of Choice Rules that determines which state the state machine transitions to next. :param default: The name of the state to transition to if none of the transitions in Choices is taken. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-choice-state.html """ id: str = attr.ib( factory=lambda: f"{C.Choice}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Choice, metadata={C.ALIAS: C.Type}, ) choices: T.List['ChoiceRule'] = attr.ib( factory=list, metadata={C.ALIAS: C.Choices}, ) default: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Default}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Choices, C.Default, # input output C.InputPath, C.OutputPath, ] def _check_choices(self): # has to have at least one choice if len(self.choices) == 0: raise exc.StateValidationError def _pre_serialize_validation(self): self._check_input_output_path() self._check_choices() def _serialize(self) -> dict: data = super()._serialize() choices = [ choice.serialize() for choice in self.choices ] data[C.Choices] = choices return data
[docs]@attr.s class Succeed( _HasInputOutput, ): """ A Succeed state ``("Type": "Succeed")`` stops an execution successfully. The Succeed state is a useful target for Choice state branches that don't do anything but stop the execution. Because Succeed states are terminal states, they have no Next field, and don't need an End field. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-succeed-state.html """ id: str = attr.ib( factory=lambda: f"{C.Succeed}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Succeed, metadata={C.ALIAS: C.Type}, ) _field_order = [ # common C.Type, C.Comment, # input output C.InputPath, C.OutputPath, ] def _pre_serialize_validation(self): self._check_input_output_path()
[docs]@attr.s class Fail( State, ): """ A Fail state ``("Type": "Fail")`` stops the execution of the state machine and marks it as a failure, unless it is caught by a Catch block. The Fail state only allows the use of Type and Comment fields from the set of common state fields. In addition, the Fail state allows the following fields. :param cause: A custom failure string that you can specify for operational or diagnostic purposes. :param error: An error name that you can provide for operational or diagnostic purposes. Reference: - https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-fail-state.html """ id: str = attr.ib( factory=lambda: f"{C.Fail}-{short_uuid()}", validator=vs.instance_of(str), ) type: str = attr.ib( default=C.Fail, metadata={C.ALIAS: C.Type}, ) cause: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Cause}, ) error: T.Optional[str] = attr.ib( default=None, metadata={C.ALIAS: C.Error}, ) _field_order = [ # common C.Type, C.Comment, # state specific C.Cause, C.Error, ]
StateType = T.Union[ Task, Parallel, Map, Pass, Wait, Choice, Succeed, Fail, ]