prefect.states
exception_to_crashed_state
Takes an exception that occurs outside of user code and converts it to a 'Crash' exception with a 'Crashed' state.
Source code in prefect/states.py
def exception_to_crashed_state(exc: BaseException) -> State:
"""
Takes an exception that occurs _outside_ of user code and converts it to a
'Crash' exception with a 'Crashed' state.
"""
state_message = None
if isinstance(exc, anyio.get_cancelled_exc_class()):
state_message = "Execution was cancelled by the runtime environment."
elif isinstance(exc, KeyboardInterrupt):
state_message = "Execution was aborted by an interrupt signal."
elif isinstance(exc, SystemExit):
state_message = "Execution was aborted by Python system exit call."
elif isinstance(exc, httpx.TimeoutException):
try:
request: httpx.Request = exc.request
except RuntimeError:
# The request property is not set
state_message = "Request timed out while attempting to contact the server."
else:
# TODO: We can check if this is actually our API url
state_message = f"Request to {request.url} timed out."
else:
state_message = "Execution was interrupted by an unexpected exception."
return Failed(
name="Crashed",
message=state_message,
data=safe_encode_exception(exc),
)
is_state
Check if the given object is a state type
Source code in prefect/states.py
def is_state(obj: Any) -> TypeGuard[State]:
"""
Check if the given object is a state type
"""
return isinstance(obj, State)
is_state_iterable
Check if a the given object is an iterable of states types
Supported iterables are: - set - list - tuple
Other iterables will return False
even if they contain states.
Source code in prefect/states.py
def is_state_iterable(obj: Any) -> TypeGuard[Iterable[State]]:
"""
Check if a the given object is an iterable of states types
Supported iterables are:
- set
- list
- tuple
Other iterables will return `False` even if they contain states.
"""
# We do not check for arbitary iterables because this is not intended to be used
# for things like dictionaries, dataframes, or pydantic models
if isinstance(obj, (list, set, tuple)) and obj:
return all([is_state(o) for o in obj])
else:
return False
raise_failed_state
async
Given a FAILED state, raise the contained exception.
If not given a FAILED state, this function will return immediately.
If the state contains a result of multiple states, the first FAILED state will be raised.
If the state is FAILED but does not contain an exception type result, a TypeError
will be raised.
Source code in prefect/states.py
@sync_compatible
@inject_client
async def raise_failed_state(state: State, client: "OrionClient") -> None:
"""
Given a FAILED state, raise the contained exception.
If not given a FAILED state, this function will return immediately.
If the state contains a result of multiple states, the first FAILED state will be
raised.
If the state is FAILED but does not contain an exception type result, a `TypeError`
will be raised.
"""
if not state.is_failed():
return
result = await client.resolve_datadoc(state.data)
if isinstance(result, BaseException):
raise result
elif isinstance(result, State):
# Raise the failure in the inner state
await raise_failed_state(result)
elif is_state_iterable(result):
# Raise the first failure
for state in result:
await raise_failed_state(state)
else:
raise TypeError(
f"Unexpected result for failure state: {result!r} —— "
f"{type(result).__name__} cannot be resolved into an exception"
)
return_value_to_state
async
Given a return value from a user's function, create a State
the run should
be placed in.
- If data is returned, we create a 'COMPLETED' state with the data
- If a single, manually created state is returned, we use that state as given (manual creation is determined by the lack of ids)
- If an upstream state or iterable of upstream states is returned, we apply the aggregate rule
- If a future or iterable of futures is returned, we resolve it into states then apply the aggregate rule
The aggregate rule says that given multiple states we will determine the final state such that:
- If any states are not COMPLETED the final state is FAILED
- If all of the states are COMPLETED the final state is COMPLETED
- The states will be placed in the final state
data
attribute
The aggregate rule is applied to single futures to distinguish from returning a single state. This prevents a flow from assuming the state of a single returned task future.
Source code in prefect/states.py
async def return_value_to_state(result: Any, serializer: str = "cloudpickle") -> State:
"""
Given a return value from a user's function, create a `State` the run should
be placed in.
- If data is returned, we create a 'COMPLETED' state with the data
- If a single, manually created state is returned, we use that state as given
(manual creation is determined by the lack of ids)
- If an upstream state or iterable of upstream states is returned, we apply the aggregate rule
- If a future or iterable of futures is returned, we resolve it into states then
apply the aggregate rule
The aggregate rule says that given multiple states we will determine the final state
such that:
- If any states are not COMPLETED the final state is FAILED
- If all of the states are COMPLETED the final state is COMPLETED
- The states will be placed in the final state `data` attribute
The aggregate rule is applied to _single_ futures to distinguish from returning a
_single_ state. This prevents a flow from assuming the state of a single returned
task future.
"""
if (
is_state(result)
# Check for manual creation
and not result.state_details.flow_run_id
and not result.state_details.task_run_id
):
return result
# Ensure any futures are resolved
result = await resolve_futures_to_states(result)
# If we resolved a task future or futures into states, we will determine a new state
# from their aggregate
if is_state(result) or is_state_iterable(result):
states = StateGroup(ensure_iterable(result))
# Determine the new state type
new_state_type = (
StateType.COMPLETED if states.all_completed() else StateType.FAILED
)
# Generate a nice message for the aggregate
if states.all_completed():
message = "All states completed."
elif states.any_failed():
message = f"{states.fail_count}/{states.total_count} states failed."
elif not states.all_final():
message = (
f"{states.not_final_count}/{states.total_count} states are not final."
)
else:
message = "Given states: " + states.counts_message()
# TODO: We may actually want to set the data to a `StateGroup` object and just
# allow it to be unpacked into a tuple and such so users can interact with
# it
return State(
type=new_state_type,
message=message,
data=DataDocument.encode(serializer, result),
)
# Otherwise, they just gave data and this is a completed result
return Completed(data=DataDocument.encode(serializer, result))