Skip to content

prefect.orion.schemas.states

State schemas.

State pydantic-model

Represents the state of a run.

Source code in prefect/orion/schemas/states.py
class State(IDBaseModel, Generic[R]):
    """Represents the state of a run."""

    class Config:
        orm_mode = True

    type: StateType
    name: str = None
    timestamp: datetime.datetime = Field(default_factory=lambda: pendulum.now("UTC"))
    message: str = Field(None, example="Run started")
    data: DataDocument[R] = Field(None)
    state_details: StateDetails = Field(default_factory=StateDetails)

    @overload
    def result(state_or_future: "State[R]", raise_on_failure: bool = True) -> R:
        ...

    @overload
    def result(
        state_or_future: "State[R]", raise_on_failure: bool = False
    ) -> Union[R, Exception]:
        ...

    def result(self, raise_on_failure: bool = True):
        """
        Convenience method for access the data on the state's data document.

        Args:
            raise_on_failure: a boolean specifying whether to raise an exception
                if the state is of type `FAILED` and the underlying data is an exception

        Raises:
            TypeError: if the state is failed but without an exception

        Returns:
            The underlying decoded data

        Examples:
            >>> from prefect import flow, task
            >>> @task
            >>> def my_task(x):
            >>>     return x

            Get the result from a task future in a flow

            >>> @flow
            >>> def my_flow():
            >>>     future = my_task("hello")
            >>>     state = future.wait()
            >>>     result = state.result()
            >>>     print(result)
            >>> my_flow()
            hello

            Get the result from a flow state

            >>> @flow
            >>> def my_flow():
            >>>     return "hello"
            >>> my_flow().result()
            hello

            Get the result from a failed state

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow()  # Error is wrapped in FAILED state
            >>> state.result()  # Raises `ValueError`

            Get the result from a failed state without erroring

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow()
            >>> result = state.result(raise_on_failure=False)
            >>> print(result)
            ValueError("oh no!")
        """
        data = None

        if self.data:
            if self.data.encoding == "blockstorage":
                return self.data
            data = self.data.decode()

        if self.is_failed() and raise_on_failure:
            if isinstance(data, Exception):
                raise data
            elif isinstance(data, BaseException):
                warnings.warn(
                    f"State result is a {type(data).__name__!r} type and is not safe "
                    "to re-raise, it will be returned instead."
                )
                return data
            elif isinstance(data, State):
                data.result()
            elif isinstance(data, Iterable) and all(
                [isinstance(o, State) for o in data]
            ):
                # raise the first failure we find
                for state in data:
                    state.result()

            # we don't make this an else in case any of the above conditionals doesn't raise
            raise TypeError(
                f"Unexpected result for failure state: {data!r} —— "
                f"{type(data).__name__} cannot be resolved into an exception"
            )

        return data

    @validator("name", always=True)
    def default_name_from_type(cls, v, *, values, **kwargs):
        """If a name is not provided, use the type"""

        # if `type` is not in `values` it means the `type` didn't pass its own
        # validation check and an error will be raised after this function is called
        if v is None and "type" in values:
            v = " ".join([v.capitalize() for v in values.get("type").value.split("_")])
        return v

    @root_validator
    def default_scheduled_start_time(cls, values):
        """
        TODO: This should throw an error instead of setting a default but is out of
              scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled
              into work refactoring state initialization
        """
        if values.get("type") == StateType.SCHEDULED:
            state_details = values.setdefault(
                "state_details", cls.__fields__["state_details"].get_default()
            )
            if not state_details.scheduled_time:
                state_details.scheduled_time = pendulum.now("utc")
        return values

    def is_scheduled(self):
        return self.type == StateType.SCHEDULED

    def is_pending(self):
        return self.type == StateType.PENDING

    def is_running(self):
        return self.type == StateType.RUNNING

    def is_completed(self):
        return self.type == StateType.COMPLETED

    def is_failed(self):
        return self.type == StateType.FAILED

    def is_cancelled(self):
        return self.type == StateType.CANCELLED

    def is_final(self):
        return self.type in TERMINAL_STATES

    def copy(self, *, update: dict = None, reset_fields: bool = False, **kwargs):
        """
        Copying API models should return an object that could be inserted into the
        database again. The 'timestamp' is reset using the default factory.
        """
        update = update or {}
        update.setdefault("timestamp", self.__fields__["timestamp"].get_default())
        return super().copy(reset_fields=reset_fields, update=update, **kwargs)

    def __repr__(self) -> str:
        """
        Generates a complete state representation appropriate for introspection
        and debugging, including the result:

        `MyCompletedState(message="my message", type=COMPLETED, result=...)`
        """

        display = dict(
            message=repr(self.message),
            type=self.type,
            result=repr(self.result(raise_on_failure=False)),
        )

        if self.state_details.task_run_id is not None:
            display["task_run_id"] = self.state_details.task_run_id
        elif self.state_details.flow_run_id is not None:
            display["flow_run_id"] = self.state_details.flow_run_id

        return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})"

    def __str__(self) -> str:
        """
        Generates a simple state representation appropriate for logging:

        `MyCompletedState("my message", type=COMPLETED)`
        """

        display_message = f"{self.message!r}" if self.message else ""

        display_type = (
            f", type={self.type}"
            if self.type.value.lower() != self.name.lower()
            else ""
        )

        return f"{self.name}({display_message}{display_type})"

    def __hash__(self) -> int:
        return hash(
            (
                getattr(self.state_details, "flow_run_id", None),
                getattr(self.state_details, "task_run_id", None),
                self.timestamp,
                self.type,
            )
        )

State.result

Convenience method for access the data on the state's data document.

Parameters:

Name Description Default
raise_on_failure

a boolean specifying whether to raise an exception if the state is of type FAILED and the underlying data is an exception

bool
True

Exceptions:

Type Description
TypeError

if the state is failed but without an exception

Returns:

Type Description

The underlying decoded data

Examples:

>>> from prefect import flow, task
>>> @task
>>> def my_task(x):
>>>     return x

Get the result from a task future in a flow

>>> @flow
>>> def my_flow():
>>>     future = my_task("hello")
>>>     state = future.wait()
>>>     result = state.result()
>>>     print(result)
>>> my_flow()
hello

Get the result from a flow state

>>> @flow
>>> def my_flow():
>>>     return "hello"
>>> my_flow().result()
hello

Get the result from a failed state

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow()  # Error is wrapped in FAILED state
>>> state.result()  # Raises `ValueError`

Get the result from a failed state without erroring

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow()
>>> result = state.result(raise_on_failure=False)
>>> print(result)
ValueError("oh no!")
Source code in prefect/orion/schemas/states.py
def result(self, raise_on_failure: bool = True):
    """
    Convenience method for access the data on the state's data document.

    Args:
        raise_on_failure: a boolean specifying whether to raise an exception
            if the state is of type `FAILED` and the underlying data is an exception

    Raises:
        TypeError: if the state is failed but without an exception

    Returns:
        The underlying decoded data

    Examples:
        >>> from prefect import flow, task
        >>> @task
        >>> def my_task(x):
        >>>     return x

        Get the result from a task future in a flow

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task("hello")
        >>>     state = future.wait()
        >>>     result = state.result()
        >>>     print(result)
        >>> my_flow()
        hello

        Get the result from a flow state

        >>> @flow
        >>> def my_flow():
        >>>     return "hello"
        >>> my_flow().result()
        hello

        Get the result from a failed state

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow()  # Error is wrapped in FAILED state
        >>> state.result()  # Raises `ValueError`

        Get the result from a failed state without erroring

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow()
        >>> result = state.result(raise_on_failure=False)
        >>> print(result)
        ValueError("oh no!")
    """
    data = None

    if self.data:
        if self.data.encoding == "blockstorage":
            return self.data
        data = self.data.decode()

    if self.is_failed() and raise_on_failure:
        if isinstance(data, Exception):
            raise data
        elif isinstance(data, BaseException):
            warnings.warn(
                f"State result is a {type(data).__name__!r} type and is not safe "
                "to re-raise, it will be returned instead."
            )
            return data
        elif isinstance(data, State):
            data.result()
        elif isinstance(data, Iterable) and all(
            [isinstance(o, State) for o in data]
        ):
            # raise the first failure we find
            for state in data:
                state.result()

        # we don't make this an else in case any of the above conditionals doesn't raise
        raise TypeError(
            f"Unexpected result for failure state: {data!r} —— "
            f"{type(data).__name__} cannot be resolved into an exception"
        )

    return data

StateType

Enumeration of state types.

Source code in prefect/orion/schemas/states.py
class StateType(AutoEnum):
    """Enumeration of state types."""

    SCHEDULED = AutoEnum.auto()
    PENDING = AutoEnum.auto()
    RUNNING = AutoEnum.auto()
    COMPLETED = AutoEnum.auto()
    FAILED = AutoEnum.auto()
    CANCELLED = AutoEnum.auto()

AwaitingRetry

Convenience function for creating AwaitingRetry states.

Returns:

Type Description
State

a AwaitingRetry state

Source code in prefect/orion/schemas/states.py
def AwaitingRetry(scheduled_time: datetime.datetime = None, **kwargs) -> State:
    """Convenience function for creating `AwaitingRetry` states.

    Returns:
        State: a AwaitingRetry state
    """
    return Scheduled(scheduled_time=scheduled_time, name="AwaitingRetry", **kwargs)

Cancelled

Convenience function for creating Cancelled states.

Returns:

Type Description
State

a Cancelled state

Source code in prefect/orion/schemas/states.py
def Cancelled(**kwargs) -> State:
    """Convenience function for creating `Cancelled` states.

    Returns:
        State: a Cancelled state
    """
    return State(type=StateType.CANCELLED, **kwargs)

Completed

Convenience function for creating Completed states.

Returns:

Type Description
State

a Completed state

Source code in prefect/orion/schemas/states.py
def Completed(**kwargs) -> State:
    """Convenience function for creating `Completed` states.

    Returns:
        State: a Completed state
    """
    return State(type=StateType.COMPLETED, **kwargs)

Failed

Convenience function for creating Failed states.

Returns:

Type Description
State

a Failed state

Source code in prefect/orion/schemas/states.py
def Failed(**kwargs) -> State:
    """Convenience function for creating `Failed` states.

    Returns:
        State: a Failed state
    """
    return State(type=StateType.FAILED, **kwargs)

Late

Convenience function for creating Late states.

Returns:

Type Description
State

a Late state

Source code in prefect/orion/schemas/states.py
def Late(scheduled_time: datetime.datetime = None, **kwargs) -> State:
    """Convenience function for creating `Late` states.

    Returns:
        State: a Late state
    """
    return Scheduled(scheduled_time=scheduled_time, name="Late", **kwargs)

Pending

Convenience function for creating Pending states.

Returns:

Type Description
State

a Pending state

Source code in prefect/orion/schemas/states.py
def Pending(**kwargs) -> State:
    """Convenience function for creating `Pending` states.

    Returns:
        State: a Pending state
    """
    return State(type=StateType.PENDING, **kwargs)

Retrying

Convenience function for creating Retrying states.

Returns:

Type Description
State

a Retrying state

Source code in prefect/orion/schemas/states.py
def Retrying(**kwargs) -> State:
    """Convenience function for creating `Retrying` states.

    Returns:
        State: a Retrying state
    """
    return State(type=StateType.RUNNING, name="Retrying", **kwargs)

Running

Convenience function for creating Running states.

Returns:

Type Description
State

a Running state

Source code in prefect/orion/schemas/states.py
def Running(**kwargs) -> State:
    """Convenience function for creating `Running` states.

    Returns:
        State: a Running state
    """
    return State(type=StateType.RUNNING, **kwargs)

Scheduled

Convenience function for creating Scheduled states.

Returns:

Type Description
State

a Scheduled state

Source code in prefect/orion/schemas/states.py
def Scheduled(scheduled_time: datetime.datetime = None, **kwargs) -> State:
    """Convenience function for creating `Scheduled` states.

    Returns:
        State: a Scheduled state
    """
    state_details = StateDetails.parse_obj(kwargs.pop("state_details", {}))
    if scheduled_time is None:
        scheduled_time = pendulum.now("UTC")
    elif state_details.scheduled_time:
        raise ValueError("An extra scheduled_time was provided in state_details")
    state_details.scheduled_time = scheduled_time

    return State(type=StateType.SCHEDULED, state_details=state_details, **kwargs)