Skip to content

prefect.states

exception_to_crashed_state

Takes an exception that occurs outside of user code and converts it to a 'Crash' exception with a 'Crashed' state.

Source code in prefect/states.py
def exception_to_crashed_state(exc: BaseException) -> State:
    """
    Takes an exception that occurs _outside_ of user code and converts it to a
    'Crash' exception with a 'Crashed' state.
    """
    state_message = None

    if isinstance(exc, anyio.get_cancelled_exc_class()):
        state_message = "Execution was cancelled by the runtime environment."

    elif isinstance(exc, KeyboardInterrupt):
        state_message = "Execution was aborted by an interrupt signal."

    elif isinstance(exc, SystemExit):
        state_message = "Execution was aborted by Python system exit call."

    elif isinstance(exc, httpx.TimeoutException):
        try:
            request: httpx.Request = exc.request
        except RuntimeError:
            # The request property is not set
            state_message = "Request timed out while attempting to contact the server."
        else:
            # TODO: We can check if this is actually our API url
            state_message = f"Request to {request.url} timed out."

    else:
        state_message = "Execution was interrupted by an unexpected exception."

    return Failed(
        name="Crashed",
        message=state_message,
        data=safe_encode_exception(exc),
    )

is_state

Check if the given object is a state type

Source code in prefect/states.py
def is_state(obj: Any) -> TypeGuard[State]:
    """
    Check if the given object is a state type
    """
    return isinstance(obj, State)

is_state_iterable

Check if a the given object is an iterable of states types

Supported iterables are: - set - list - tuple

Other iterables will return False even if they contain states.

Source code in prefect/states.py
def is_state_iterable(obj: Any) -> TypeGuard[Iterable[State]]:
    """
    Check if a the given object is an iterable of states types

    Supported iterables are:
    - set
    - list
    - tuple

    Other iterables will return `False` even if they contain states.
    """
    # We do not check for arbitary iterables because this is not intended to be used
    # for things like dictionaries, dataframes, or pydantic models

    if isinstance(obj, (list, set, tuple)) and obj:
        return all([is_state(o) for o in obj])
    else:
        return False

raise_failed_state async

Given a FAILED state, raise the contained exception.

If not given a FAILED state, this function will return immediately.

If the state contains a result of multiple states, the first FAILED state will be raised.

If the state is FAILED but does not contain an exception type result, a TypeError will be raised.

Source code in prefect/states.py
@sync_compatible
@inject_client
async def raise_failed_state(state: State, client: "OrionClient") -> None:
    """
    Given a FAILED state, raise the contained exception.

    If not given a FAILED state, this function will return immediately.

    If the state contains a result of multiple states, the first FAILED state will be
    raised.

    If the state is FAILED but does not contain an exception type result, a `TypeError`
    will be raised.
    """
    if not state.is_failed():
        return

    result = await client.resolve_datadoc(state.data)

    if isinstance(result, BaseException):
        raise result

    elif isinstance(result, State):
        # Raise the failure in the inner state
        await raise_failed_state(result)

    elif is_state_iterable(result):
        # Raise the first failure
        for state in result:
            await raise_failed_state(state)

    else:
        raise TypeError(
            f"Unexpected result for failure state: {result!r} —— "
            f"{type(result).__name__} cannot be resolved into an exception"
        )

return_value_to_state async

Given a return value from a user's function, create a State the run should be placed in.

  • If data is returned, we create a 'COMPLETED' state with the data
  • If a single, manually created state is returned, we use that state as given (manual creation is determined by the lack of ids)
  • If an upstream state or iterable of upstream states is returned, we apply the aggregate rule
  • If a future or iterable of futures is returned, we resolve it into states then apply the aggregate rule

The aggregate rule says that given multiple states we will determine the final state such that:

  • If any states are not COMPLETED the final state is FAILED
  • If all of the states are COMPLETED the final state is COMPLETED
  • The states will be placed in the final state data attribute

The aggregate rule is applied to single futures to distinguish from returning a single state. This prevents a flow from assuming the state of a single returned task future.

Source code in prefect/states.py
async def return_value_to_state(result: Any, serializer: str = "cloudpickle") -> State:
    """
    Given a return value from a user's function, create a `State` the run should
    be placed in.

    - If data is returned, we create a 'COMPLETED' state with the data
    - If a single, manually created state is returned, we use that state as given
        (manual creation is determined by the lack of ids)
    - If an upstream state or iterable of upstream states is returned, we apply the aggregate rule
    - If a future or iterable of futures is returned, we resolve it into states then
        apply the aggregate rule

    The aggregate rule says that given multiple states we will determine the final state
    such that:

    - If any states are not COMPLETED the final state is FAILED
    - If all of the states are COMPLETED the final state is COMPLETED
    - The states will be placed in the final state `data` attribute

    The aggregate rule is applied to _single_ futures to distinguish from returning a
    _single_ state. This prevents a flow from assuming the state of a single returned
    task future.
    """

    if (
        is_state(result)
        # Check for manual creation
        and not result.state_details.flow_run_id
        and not result.state_details.task_run_id
    ):
        return result

    # Ensure any futures are resolved
    result = await resolve_futures_to_states(result)

    # If we resolved a task future or futures into states, we will determine a new state
    # from their aggregate
    if is_state(result) or is_state_iterable(result):
        states = StateGroup(ensure_iterable(result))

        # Determine the new state type
        new_state_type = (
            StateType.COMPLETED if states.all_completed() else StateType.FAILED
        )

        # Generate a nice message for the aggregate
        if states.all_completed():
            message = "All states completed."
        elif states.any_failed():
            message = f"{states.fail_count}/{states.total_count} states failed."
        elif not states.all_final():
            message = (
                f"{states.not_final_count}/{states.total_count} states are not final."
            )
        else:
            message = "Given states: " + states.counts_message()

        # TODO: We may actually want to set the data to a `StateGroup` object and just
        #       allow it to be unpacked into a tuple and such so users can interact with
        #       it
        return State(
            type=new_state_type,
            message=message,
            data=DataDocument.encode(serializer, result),
        )

    # Otherwise, they just gave data and this is a completed result
    return Completed(data=DataDocument.encode(serializer, result))