Skip to content

prefect.client

Asynchronous client implementation for communicating with the Orion REST API.

Explore the client by communicating with an in-memory webserver - no setup required:

$ # start python REPL with native await functionality
$ python -m asyncio
>>> from prefect.client import get_client
>>> async with get_client() as client:
...     response = await client.hello()
...     print(response.json())
👋

OrionClient

An asynchronous client for interacting with the Orion REST API.

Parameters:

Name Description Default
api

the Orion API URL or FastAPI application to connect to

Union[str, fastapi.applications.FastAPI]
required
api_key

An optional API key for authentication.

str
None
api_version

The API version this client is compatible with.

str
'0.3.1'
httpx_settings

An optional dictionary of settings to pass to the underlying httpx.AsyncClient

dict
None

Examples:

Say hello to an Orion server

>>> async with get_client() as client:
>>>     response = await client.hello()
>>>
>>> print(response.json())
👋
Source code in prefect/client.py
class OrionClient:
    """
    An asynchronous client for interacting with the [Orion REST API](/api-ref/rest-api/).

    Args:
        api: the Orion API URL or FastAPI application to connect to
        api_key: An optional API key for authentication.
        api_version: The API version this client is compatible with.
        httpx_settings: An optional dictionary of settings to pass to the underlying
            `httpx.AsyncClient`

    Examples:

        Say hello to an Orion server

        >>> async with get_client() as client:
        >>>     response = await client.hello()
        >>>
        >>> print(response.json())
        👋
    """

    def __init__(
        self,
        api: Union[str, FastAPI],
        *,
        api_key: str = None,
        api_version: str = ORION_API_VERSION,
        httpx_settings: dict = None,
    ) -> None:
        httpx_settings = httpx_settings.copy() if httpx_settings else {}
        httpx_settings.setdefault("headers", {})
        if api_version:
            httpx_settings["headers"].setdefault("X-PREFECT-API-VERSION", api_version)
        if api_key:
            httpx_settings["headers"].setdefault("Authorization", f"Bearer {api_key}")

        httpx_settings.setdefault("timeout", PREFECT_API_REQUEST_TIMEOUT.value())

        # Context management
        self._exit_stack = AsyncExitStack()
        self._ephemeral_app: Optional[FastAPI] = None
        self.manage_lifespan = True

        # Only set if this client started the lifespan of the application
        self._ephemeral_lifespan: Optional[LifespanManager] = None
        self._closed = False
        self._started = False

        # Connect to an external application
        if isinstance(api, str):
            if httpx_settings.get("app"):
                raise ValueError(
                    "Invalid httpx settings: `app` cannot be set when providing an "
                    "api url. `app` is only for use with ephemeral instances. Provide "
                    "it as the `api` parameter instead."
                )
            httpx_settings.setdefault("base_url", api)

        # Connect to an in-process application
        elif isinstance(api, FastAPI):
            self._ephemeral_app = api
            httpx_settings.setdefault("app", self._ephemeral_app)
            httpx_settings.setdefault("base_url", "http://ephemeral-orion/api")

        else:
            raise TypeError(
                f"Unexpected type {type(api).__name__!r} for argument `api`. Expected 'str' or 'FastAPI'"
            )

        self._client = PrefectHttpxClient(**httpx_settings)
        self.logger = get_logger("client")

    @property
    def api_url(self) -> httpx.URL:
        """
        Get the base URL for the API.
        """
        return self._client.base_url

    # API methods ----------------------------------------------------------------------

    async def api_healthcheck(self) -> Optional[Exception]:
        """
        Attempts to connect to the API and returns the encountered exception if not
        successful.

        If successful, returns `None`.
        """
        try:
            with anyio.fail_after(10):
                await self._client.get("/health")
                return None
        except Exception as exc:
            return exc

    async def hello(self) -> httpx.Response:
        """
        Send a GET request to /hello for testing purposes.
        """
        return await self._client.get("/hello")

    async def create_flow(self, flow: "Flow") -> UUID:
        """
        Create a flow in Orion.

        Args:
            flow: a [Flow][prefect.flows.Flow] object

        Raises:
            httpx.RequestError: if a flow was not created for any reason

        Returns:
            the ID of the flow in the backend
        """
        flow_data = schemas.actions.FlowCreate(name=flow.name)
        response = await self._client.post(
            "/flows/", json=flow_data.dict(json_compatible=True)
        )

        flow_id = response.json().get("id")
        if not flow_id:
            raise httpx.RequestError(f"Malformed response: {response}")

        # Return the id of the created flow
        return UUID(flow_id)

    async def read_flow(self, flow_id: UUID) -> schemas.core.Flow:
        """
        Query Orion for a flow by id.

        Args:
            flow_id: the flow ID of interest

        Returns:
            a [Flow model][prefect.orion.schemas.core.Flow] representation of the flow
        """
        response = await self._client.get(f"/flows/{flow_id}")
        return schemas.core.Flow.parse_obj(response.json())

    async def read_flows(
        self,
        *,
        flow_filter: schemas.filters.FlowFilter = None,
        flow_run_filter: schemas.filters.FlowRunFilter = None,
        task_run_filter: schemas.filters.TaskRunFilter = None,
        deployment_filter: schemas.filters.DeploymentFilter = None,
        limit: int = None,
        offset: int = 0,
    ) -> List[schemas.core.Flow]:
        """
        Query Orion for flows. Only flows matching all criteria will
        be returned.

        Args:
            flow_filter: filter criteria for flows
            flow_run_filter: filter criteria for flow runs
            task_run_filter: filter criteria for task runs
            deployment_filter: filter criteria for deployments
            limit: limit for the flow query
            offset: offset for the flow query

        Returns:
            a list of [Flow model][prefect.orion.schemas.core.Flow] representation
                of the flows
        """
        body = {
            "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
            "flow_runs": (
                flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
            ),
            "task_runs": (
                task_run_filter.dict(json_compatible=True) if task_run_filter else None
            ),
            "deployments": (
                deployment_filter.dict(json_compatible=True)
                if deployment_filter
                else None
            ),
            "limit": limit,
            "offset": offset,
        }

        response = await self._client.post(f"/flows/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.Flow], response.json())

    async def read_flow_by_name(
        self,
        flow_name: str,
    ) -> schemas.core.Flow:
        """
        Query Orion for a flow by name.

        Args:
            flow_name: the name of a flow

        Returns:
            a fully hydrated [Flow model][prefect.orion.schemas.core.Flow]
        """
        response = await self._client.get(f"/flows/name/{flow_name}")
        return schemas.core.Deployment.parse_obj(response.json())

    async def create_flow_run_from_deployment(
        self,
        deployment_id: UUID,
        *,
        parameters: Dict[str, Any] = None,
        context: dict = None,
        state: schemas.states.State = None,
        flow_runner: "FlowRunner" = None,
    ) -> schemas.core.FlowRun:
        """
        Create a flow run for a deployment.

        Args:
            deployment: The deployment model to create the flow run from
            parameters: Parameter overrides for this flow run. Merged with the
                deployment defaults
            context: Optional run context data
            state: The initial state for the run. If not provided, defaults to
                `Scheduled` for now. Should always be a `Scheduled` type.
            flow_runner: An optional flow runnner to use to execute this flow run.

        Raises:
            httpx.RequestError: if Orion does not successfully create a run for any reason

        Returns:
            The flow run model
        """
        parameters = parameters or {}
        context = context or {}
        state = state or Scheduled()

        flow_run_create = schemas.actions.DeploymentFlowRunCreate(
            parameters=parameters,
            context=context,
            state=state,
            flow_runner=flow_runner.to_settings() if flow_runner else None,
        )

        response = await self._client.post(
            f"/deployments/{deployment_id}/create_flow_run",
            json=flow_run_create.dict(json_compatible=True),
        )
        return schemas.core.FlowRun.parse_obj(response.json())

    async def create_flow_run(
        self,
        flow: "Flow",
        name: str = None,
        parameters: Dict[str, Any] = None,
        context: dict = None,
        tags: Iterable[str] = None,
        parent_task_run_id: UUID = None,
        state: schemas.states.State = None,
        flow_runner: "FlowRunner" = None,
    ) -> schemas.core.FlowRun:
        """
        Create a flow run for a flow.

        Args:
            flow: The flow model to create the flow run for
            name: An optional name for the flow run
            parameters: Parameter overrides for this flow run.
            context: Optional run context data
            tags: a list of tags to apply to this flow run
            parent_task_run_id: if a subflow run is being created, the placeholder task run ID
                of the parent flow
            state: The initial state for the run. If not provided, defaults to
                `Scheduled` for now. Should always be a `Scheduled` type.
            flow_runner: An optional flow runnner to use to execute this flow run.

        Raises:
            httpx.RequestError: if Orion does not successfully create a run for any reason

        Returns:
            The flow run model
        """
        parameters = parameters or {}
        context = context or {}

        if state is None:
            state = schemas.states.Pending()

        # Retrieve the flow id
        flow_id = await self.create_flow(flow)

        flow_run_create = schemas.actions.FlowRunCreate(
            flow_id=flow_id,
            flow_version=flow.version,
            name=name,
            parameters=parameters,
            context=context,
            tags=list(tags or []),
            parent_task_run_id=parent_task_run_id,
            state=state,
            flow_runner=flow_runner.to_settings() if flow_runner else None,
        )

        flow_run_create_json = flow_run_create.dict(json_compatible=True)
        response = await self._client.post("/flow_runs/", json=flow_run_create_json)
        flow_run = schemas.core.FlowRun.parse_obj(response.json())

        # Restore the parameters to the local objects to retain expectations about
        # Python objects
        flow_run.parameters = parameters

        return flow_run

    async def update_flow_run(
        self,
        flow_run_id: UUID,
        flow_version: str = None,
        parameters: dict = None,
        name: str = None,
    ) -> None:
        """
        Update a flow run's details.

        Args:
            flow_run_id: the run ID to update
            flow_version: a new version string for the flow run
            parameters: a dictionary of updated parameter values for the run
            name: a new name for the flow run

        Returns:
            an `httpx.Response` object from the PATCH request
        """
        params = {}
        if flow_version is not None:
            params["flow_version"] = flow_version
        if parameters is not None:
            params["parameters"] = parameters
        if name is not None:
            params["name"] = name

        flow_run_data = schemas.actions.FlowRunUpdate(**params)

        return await self._client.patch(
            f"/flow_runs/{flow_run_id}",
            json=flow_run_data.dict(json_compatible=True, exclude_unset=True),
        )

    async def create_concurrency_limit(
        self,
        tag: str,
        concurrency_limit: int,
    ) -> UUID:
        """
        Create a tag concurrency limit in Orion. These limits govern concurrently
        running tasks.

        Args:
            tag: a tag the concurrency limit is applied to
            concurrency_limit: the maximum number of concurrent task runs for a given tag

        Raises:
            httpx.RequestError: if the concurrency limit was not created for any reason

        Returns:
            the ID of the concurrency limit in the backend
        """

        concurrency_limit_create = schemas.actions.ConcurrencyLimitCreate(
            tag=tag,
            concurrency_limit=concurrency_limit,
        )
        response = await self._client.post(
            "/concurrency_limits/",
            json=concurrency_limit_create.dict(json_compatible=True),
        )

        concurrency_limit_id = response.json().get("id")

        if not concurrency_limit_id:
            raise httpx.RequestError(f"Malformed response: {response}")

        return UUID(concurrency_limit_id)

    async def read_concurrency_limit_by_tag(
        self,
        tag: str,
    ):
        """
        Read the concurrency limit set on a specific tag.

        Args:
            tag: a tag the concurrency limit is applied to

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: if the concurrency limit was not created for any reason

        Returns:
            the concurrency limit set on a specific tag
        """
        try:
            response = await self._client.get(
                f"/concurrency_limits/tag/{tag}",
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

        concurrency_limit_id = response.json().get("id")

        if not concurrency_limit_id:
            raise httpx.RequestError(f"Malformed response: {response}")

        concurrency_limit = schemas.core.ConcurrencyLimit.parse_obj(response.json())
        return concurrency_limit

    async def read_concurrency_limits(
        self,
        limit: int,
        offset: int,
    ):
        """
        Lists concurrency limits set on task run tags.

        Args:
            limit: the maximum number of concurrency limits returned
            offset: the concurrency limit query offset

        Returns:
            a list of concurrency limits
        """

        body = {
            "limit": limit,
            "offset": offset,
        }

        response = await self._client.post("/concurrency_limits/filter", json=body)
        return pydantic.parse_obj_as(
            List[schemas.core.ConcurrencyLimit], response.json()
        )

    async def delete_concurrency_limit_by_tag(
        self,
        tag: str,
    ):
        """
        Delete the concurrency limit set on a specific tag.

        Args:
            tag: a tag the concurrency limit is applied to

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If request fails

        """
        try:
            await self._client.delete(
                f"/concurrency_limits/tag/{tag}",
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

    async def create_work_queue(
        self,
        name: str,
        tags: List[str] = None,
        deployment_ids: List[UUID] = None,
        flow_runner_types: List[str] = None,
    ) -> UUID:
        """
        Create a work queue.

        Args:
            name: a unique name for the work queue
            tags: an optional list of tags to filter on; only work scheduled with these tags
                will be included in the queue
            deployment_ids: an optional list of deployment IDs to filter on; only work scheduled from these deployments
                will be included in the queue
            flow_runner_types: an optional list of FlowRunner types to filter on; only work scheduled with these FlowRunners
                will be included in the queue

        Raises:
            prefect.exceptions.ObjectAlreadyExists: If request returns 409
            httpx.RequestError: If request fails

        Returns:
            UUID: The UUID of the newly created workflow
        """
        data = WorkQueueCreate(
            name=name,
            filter=QueueFilter(
                tags=tags or None,
                deployment_ids=deployment_ids or None,
                flow_runner_types=flow_runner_types or None,
            ),
        ).dict(json_compatible=True)
        try:
            response = await self._client.post("/work_queues/", json=data)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_409_CONFLICT:
                raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
            else:
                raise

        work_queue_id = response.json().get("id")
        if not work_queue_id:
            raise httpx.RequestError(str(response))
        return UUID(work_queue_id)

    async def read_work_queue_by_name(self, name: str) -> schemas.core.WorkQueue:
        """
        Read a work queue by name.

        Args:
            name (str): a unique name for the work queue

        Raises:
            httpx.StatusError: if no work queue is found

        Returns:
            schemas.core.WorkQueue: a work queue API object
        """
        response = await self._client.get(f"/work_queues/name/{name}")
        return schemas.core.WorkQueue.parse_obj(response.json())

    async def update_work_queue(self, id: UUID, **kwargs):
        """
        Update properties of a work queue.

        Args:
            id: the ID of the work queue to update
            **kwargs: the fields to update

        Raises:
            ValueError: if no kwargs are provided
            prefect.exceptions.ObjectNotFound: if request returns 404
            httpx.RequestError: if the request fails

        """
        if not kwargs:
            raise ValueError("No fields provided to update.")

        data = WorkQueueUpdate(**kwargs).dict(json_compatible=True, exclude_unset=True)
        try:
            await self._client.patch(f"/work_queues/{id}", json=data)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

    async def get_runs_in_work_queue(
        self,
        id: UUID,
        limit: int = 10,
        scheduled_before: datetime.datetime = None,
    ) -> List[schemas.core.FlowRun]:
        """
        Read flow runs off a work queue.

        Args:
            id: the id of the work queue to read from
            limit: a limit on the number of runs to return
            scheduled_before: a timestamp; only runs scheduled before this time will be returned.
                Defaults to now.

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If request fails

        Returns:
            List[schemas.core.FlowRun]: a list of FlowRun objects read from the queue
        """
        json_data = {"limit": limit}
        if scheduled_before:
            json_data.update({"scheduled_before": scheduled_before.isoformat()})

        try:
            response = await self._client.post(
                f"/work_queues/{id}/get_runs",
                json=json_data,
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise
        return pydantic.parse_obj_as(List[schemas.core.FlowRun], response.json())

    async def read_work_queue(
        self,
        id: UUID,
    ) -> schemas.core.WorkQueue:
        """
        Read a work queue.

        Args:
            id: the id of the work queue to load

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If request fails

        Returns:
            WorkQueue: an instantiated WorkQueue object
        """
        try:
            response = await self._client.get(f"/work_queues/{id}")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise
        return schemas.core.WorkQueue.parse_obj(response.json())

    async def read_work_queues(
        self,
        limit: int = None,
        offset: int = 0,
    ) -> List[schemas.core.WorkQueue]:
        """
        Query Orion for work queues.

        Args:
            limit: a limit for the query
            offset: an offset for the query

        Returns:
            a list of [WorkQueue model][prefect.orion.schemas.core.WorkQueue] representations
                of the work queues
        """
        body = {
            "limit": limit,
            "offset": offset,
        }
        response = await self._client.post(f"/work_queues/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.WorkQueue], response.json())

    async def delete_work_queue_by_id(
        self,
        id: UUID,
    ):
        """
        Delete a work queue by its ID.

        Args:
            id: the id of the work queue to delete

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If requests fails
        """
        try:
            await self._client.delete(
                f"/work_queues/{id}",
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

    async def create_block(
        self,
        block: prefect.blocks.core.Block,
        block_spec_id: UUID = None,
        name: str = None,
    ) -> Optional[UUID]:
        """
        Create a block in Orion. This data is used to configure a corresponding
        Block.
        """

        api_block = block.to_api_block(name=name, block_spec_id=block_spec_id)

        # Drop fields that are not compliant with `CreateBlock`
        payload = api_block.dict(
            json_compatible=True, exclude={"block_spec", "id"}, exclude_unset=True
        )

        try:
            response = await self._client.post(
                "/blocks/",
                json=payload,
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_409_CONFLICT:
                raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
            else:
                raise
        return UUID(response.json().get("id"))

    async def read_block_spec_by_name(
        self, name: str, version: str
    ) -> schemas.core.BlockSpec:
        """
        Look up a block spec by name and version
        """
        try:
            response = await self._client.get(f"block_specs/{name}/versions/{version}")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise
        return schemas.core.BlockSpec.parse_obj(response.json())

    async def read_block_specs(self, type: str) -> List[schemas.core.BlockSpec]:
        """
        Read all block specs with the given type

        Args:
            type: The name of the type of block spec

        Raises:
            httpx.RequestError: if the block was not found for any reason

        Returns:
            A hydrated block or None.
        """
        response = await self._client.post(
            f"/block_specs/filter", json={"block_spec_type": type}
        )
        return pydantic.parse_obj_as(List[schemas.core.BlockSpec], response.json())

    async def read_block(self, block_id: UUID):
        """
        Read the block with the specified name that corresponds to a
        specific block spec name and version.

        Args:
            block_id (UUID): the block id

        Raises:
            httpx.RequestError: if the block was not found for any reason

        Returns:
            A hydrated block or None.
        """
        response = await self._client.get(f"/blocks/{block_id}")
        return create_block_from_api_block(response.json())

    async def read_block_by_name(
        self,
        name: str,
        block_spec_name: str,
        block_spec_version: str,
    ):
        """
        Read the block with the specified name that corresponds to a
        specific block spec name and version.

        Args:
            name (str): The block name.
            block_spec_name (str): the block spec name
            block_spec_version (str): the block spec version. If not provided,
                the most recent matching version will be returned.

        Raises:
            httpx.RequestError: if the block was not found for any reason

        Returns:
            A hydrated block or None.
        """
        response = await self._client.get(
            f"/block_specs/{block_spec_name}/versions/{block_spec_version}/block/{name}",
        )
        return create_block_from_api_block(response.json())

    async def read_blocks(
        self,
        block_spec_type: str = None,
        offset: int = None,
        limit: int = None,
        as_json: bool = False,
    ) -> List[Union[prefect.blocks.core.Block, Dict[str, Any]]]:
        """
        Read blocks

        Args:
            block_spec_type (str): an optional block spec type
            offset (int): an offset
            limit (int): the number of blocks to return
            as_json (bool): if False, fully hydrated Blocks are loaded. Otherwise,
                JSON is returned from the API.

        Returns:
            A list of blocks
        """
        response = await self._client.post(
            f"/blocks/filter",
            json=dict(block_spec_type=block_spec_type, offset=offset, limit=limit),
        )
        json_result = response.json()
        if as_json:
            return json_result
        return [create_block_from_api_block(b) for b in json_result]

    async def create_deployment(
        self,
        flow_id: UUID,
        name: str,
        flow_data: DataDocument,
        schedule: schemas.schedules.SCHEDULE_TYPES = None,
        parameters: Dict[str, Any] = None,
        tags: List[str] = None,
        flow_runner: "FlowRunner" = None,
    ) -> UUID:
        """
        Create a flow deployment in Orion.

        Args:
            flow_id: the flow ID to create a deployment for
            name: the name of the deployment
            flow_data: a data document that can be resolved into a flow object or script
            schedule: an optional schedule to apply to the deployment
            tags: an optional list of tags to apply to the deployment
            flow_runner: an optional flow runner to specify for this deployment

        Raises:
            httpx.RequestError: if the deployment was not created for any reason

        Returns:
            the ID of the deployment in the backend
        """
        deployment_create = schemas.actions.DeploymentCreate(
            flow_id=flow_id,
            name=name,
            schedule=schedule,
            flow_data=flow_data,
            parameters=dict(parameters or {}),
            tags=list(tags or []),
            flow_runner=flow_runner.to_settings() if flow_runner else None,
        )

        response = await self._client.post(
            "/deployments/", json=deployment_create.dict(json_compatible=True)
        )
        deployment_id = response.json().get("id")
        if not deployment_id:
            raise httpx.RequestError(f"Malformed response: {response}")

        return UUID(deployment_id)

    async def read_deployment(
        self,
        deployment_id: UUID,
    ) -> schemas.core.Deployment:
        """
        Query Orion for a deployment by id.

        Args:
            deployment_id: the deployment ID of interest

        Returns:
            a [Deployment model][prefect.orion.schemas.core.Deployment] representation of the deployment
        """
        response = await self._client.get(f"/deployments/{deployment_id}")
        return schemas.core.Deployment.parse_obj(response.json())

    async def read_deployment_by_name(
        self,
        name: str,
    ) -> schemas.core.Deployment:
        """
        Query Orion for a deployment by name.

        Args:
            name: the deployment name of interest

        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If request fails

        Returns:
            a [Deployment model][prefect.orion.schemas.core.Deployment] representation of the deployment
        """
        try:
            response = await self._client.get(f"/deployments/name/{name}")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

        return schemas.core.Deployment.parse_obj(response.json())

    async def read_deployments(
        self,
        *,
        flow_filter: schemas.filters.FlowFilter = None,
        flow_run_filter: schemas.filters.FlowRunFilter = None,
        task_run_filter: schemas.filters.TaskRunFilter = None,
        deployment_filter: schemas.filters.DeploymentFilter = None,
        limit: int = None,
        offset: int = 0,
    ) -> schemas.core.Deployment:
        """
        Query Orion for deployments. Only deployments matching all
        the provided criteria will be returned.

        Args:
            flow_filter: filter criteria for flows
            flow_run_filter: filter criteria for flow runs
            task_run_filter: filter criteria for task runs
            deployment_filter: filter criteria for deployments
            limit: a limit for the deployment query
            offset: an offset for the deployment query

        Returns:
            a list of [Deployment model][prefect.orion.schemas.core.Deployment] representation
                of the deployments
        """
        body = {
            "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
            "flow_runs": (
                flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
            ),
            "task_runs": (
                task_run_filter.dict(json_compatible=True) if task_run_filter else None
            ),
            "deployments": (
                deployment_filter.dict(json_compatible=True)
                if deployment_filter
                else None
            ),
            "limit": limit,
            "offset": offset,
        }
        response = await self._client.post(f"/deployments/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.Deployment], response.json())

    async def delete_deployment(
        self,
        deployment_id: UUID,
    ):
        """
        Delete deployment by id.

        Args:
            deployment_id: The deployment id of interest.
        Raises:
            prefect.exceptions.ObjectNotFound: If request returns 404
            httpx.RequestError: If requests fails
        """
        try:
            await self._client.delete(f"/deployments/{deployment_id}")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

    async def read_flow_run(self, flow_run_id: UUID) -> schemas.core.FlowRun:
        """
        Query Orion for a flow run by id.

        Args:
            flow_run_id: the flow run ID of interest

        Returns:
            a [Flow Run model][prefect.orion.schemas.core.FlowRun] representation of the flow run
        """
        response = await self._client.get(f"/flow_runs/{flow_run_id}")
        return schemas.core.FlowRun.parse_obj(response.json())

    async def read_flow_runs(
        self,
        *,
        flow_filter: schemas.filters.FlowFilter = None,
        flow_run_filter: schemas.filters.FlowRunFilter = None,
        task_run_filter: schemas.filters.TaskRunFilter = None,
        deployment_filter: schemas.filters.DeploymentFilter = None,
        sort: schemas.sorting.FlowRunSort = None,
        limit: int = None,
        offset: int = 0,
    ) -> List[schemas.core.FlowRun]:
        """
        Query Orion for flow runs. Only flow runs matching all criteria will
        be returned.

        Args:
            flow_filter: filter criteria for flows
            flow_run_filter: filter criteria for flow runs
            task_run_filter: filter criteria for task runs
            deployment_filter: filter criteria for deployments
            sort: sort criteria for the flow runs
            limit: limit for the flow run query
            offset: offset for the flow run query

        Returns:
            a list of [Flow Run model][prefect.orion.schemas.core.FlowRun] representation
                of the flow runs
        """
        body = {
            "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
            "flow_runs": (
                flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
            ),
            "task_runs": (
                task_run_filter.dict(json_compatible=True) if task_run_filter else None
            ),
            "deployments": (
                deployment_filter.dict(json_compatible=True)
                if deployment_filter
                else None
            ),
            "sort": sort,
            "limit": limit,
            "offset": offset,
        }

        response = await self._client.post(f"/flow_runs/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.FlowRun], response.json())

    async def get_default_storage_block(
        self, as_json: bool = False
    ) -> Optional[Union[Block, Dict[str, Any]]]:
        """Returns the default storage block

        Args:
            as_json (bool, optional): if True, the raw JSON from the API is
                returned. This can avoid instantiating a storage block (and any side
                effects) Defaults to False.

        Returns:
            Optional[Block]:
        """
        response = await self._client.post("/blocks/get_default_storage_block")
        if not response.content:
            return None
        if as_json:
            return response.json()
        return create_block_from_api_block(response.json())

    async def set_default_storage_block(self, block_id: UUID):
        try:
            await self._client.post(f"/blocks/{block_id}/set_default_storage_block")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
            else:
                raise

    async def clear_default_storage_block(self):
        await self._client.post(f"/blocks/clear_default_storage_block")

    async def persist_data(
        self, data: bytes, block: StorageBlock = None
    ) -> DataDocument:
        """
        Persist data in orion and return the orion data document

        Args:
            data: the data to persist

        Returns:
            Orion data document pointing to persisted data.
        """
        block = block or await self.get_default_storage_block()
        if not block:
            raise ValueError(
                "No storage block was provided and no default storage block is set "
                "on the server. Set a default or provide a block to use."
            )

        storage_token = await block.write(data)
        storage_datadoc = DataDocument.encode(
            encoding="blockstorage",
            data={"data": storage_token, "block_id": block._block_id},
        )
        return storage_datadoc

    async def retrieve_data(
        self,
        data_document: DataDocument,
    ) -> bytes:
        """
        Exchange a storage data document for the data previously persisted.

        Args:
            data_document: The data document used to store data.

        Returns:
            The persisted data in bytes.
        """
        block_document = data_document.decode()
        embedded_datadoc = block_document["data"]
        block_id = block_document["block_id"]
        if block_id is not None:
            storage_block = await self.read_block(block_id)
        else:
            storage_block = TempStorageBlock()
        return await storage_block.read(embedded_datadoc)

    async def persist_object(
        self, obj: Any, encoder: str = "cloudpickle", storage_block: StorageBlock = None
    ) -> DataDocument:
        """
        Persist an object in orion and return the orion data document

        Args:
            obj: the object to persist
            encoder: An optional encoder for the data document.

        Returns:
            Data document pointing to persisted data.
        """
        datadoc = DataDocument.encode(encoding=encoder, data=obj)
        return await self.persist_data(datadoc.json().encode(), block=storage_block)

    async def retrieve_object(self, storage_datadoc: DataDocument) -> Any:
        """
        Exchange a data document for the object previously persisted.

        Args:
            storage_datadoc: The storage data document to retrieve.

        Returns:
            the persisted object
        """
        datadoc = DataDocument.parse_raw(await self.retrieve_data(storage_datadoc))
        return datadoc.decode()

    async def set_flow_run_state(
        self,
        flow_run_id: UUID,
        state: schemas.states.State,
        force: bool = False,
        backend_state_data: schemas.data.DataDocument = None,
    ) -> OrchestrationResult:
        """
        Set the state of a flow run.

        Args:
            flow_run_id: the id of the flow run
            state: the state to set
            force: if True, disregard orchestration logic when setting the state,
                forcing the Orion API to accept the state
            backend_state_data: an optional data document representing the state's data,
                if provided it will override `state.data`

        Returns:
            a [OrchestrationResult model][prefect.orion.orchestration.rules.OrchestrationResult]
                representation of state orchestration output
        """
        state_data = schemas.actions.StateCreate(
            type=state.type,
            name=state.name,
            message=state.message,
            data=backend_state_data or state.data,
            state_details=state.state_details,
        )
        state_data.state_details.flow_run_id = flow_run_id

        # Attempt to serialize the given data
        try:
            state_data_json = state_data.dict(json_compatible=True)
        except TypeError:
            # Drop the user data
            state_data.data = None
            state_data_json = state_data.dict(json_compatible=True)

        response = await self._client.post(
            f"/flow_runs/{flow_run_id}/set_state",
            json=dict(state=state_data_json, force=force),
        )
        return OrchestrationResult.parse_obj(response.json())

    async def read_flow_run_states(
        self, flow_run_id: UUID
    ) -> List[schemas.states.State]:
        """
        Query for the states of a flow run

        Args:
            flow_run_id: the id of the flow run

        Returns:
            a list of [State model][prefect.orion.schemas.states.State] representation
                of the flow run states
        """
        response = await self._client.get(
            "/flow_run_states/", params=dict(flow_run_id=flow_run_id)
        )
        return pydantic.parse_obj_as(List[schemas.states.State], response.json())

    async def create_task_run(
        self,
        task: "Task",
        flow_run_id: UUID,
        dynamic_key: str,
        name: str = None,
        extra_tags: Iterable[str] = None,
        state: schemas.states.State = None,
        task_inputs: Dict[
            str,
            List[
                Union[
                    schemas.core.TaskRunResult,
                    schemas.core.Parameter,
                    schemas.core.Constant,
                ]
            ],
        ] = None,
    ) -> UUID:
        """
        Create a task run

        Args:
            task: The Task to run
            flow_run_id: The flow run id with which to associate the task run
            dynamic_key: A key unique to this particular run of a Task within the flow
            name: An optional name for the task run
            extra_tags: an optional list of extra tags to apply to the task run in
                addition to `task.tags`
            state: The initial state for the run. If not provided, defaults to
                `Pending` for now. Should always be a `Scheduled` type.
            task_inputs: the set of inputs passed to the task

        Returns:
            The UUID of the newly created task run
        """
        tags = set(task.tags).union(extra_tags or [])

        if state is None:
            state = schemas.states.Pending()

        task_run_data = schemas.actions.TaskRunCreate(
            name=name or f"{task.name}-{task.task_key[:8]}-{dynamic_key}",
            flow_run_id=flow_run_id,
            task_key=task.task_key,
            dynamic_key=dynamic_key,
            tags=list(tags),
            empirical_policy=schemas.core.TaskRunPolicy(
                max_retries=task.retries,
                retry_delay_seconds=task.retry_delay_seconds,
            ),
            state=state,
            task_inputs=task_inputs or {},
        )

        response = await self._client.post(
            "/task_runs/", json=task_run_data.dict(json_compatible=True)
        )
        return TaskRun.parse_obj(response.json())

    async def read_task_run(self, task_run_id: UUID) -> schemas.core.TaskRun:
        """
        Query Orion for a task run by id.

        Args:
            task_run_id: the task run ID of interest

        Returns:
            a [Task Run model][prefect.orion.schemas.core.TaskRun] representation of the task run
        """
        response = await self._client.get(f"/task_runs/{task_run_id}")
        return schemas.core.TaskRun.parse_obj(response.json())

    async def read_task_runs(
        self,
        *,
        flow_filter: schemas.filters.FlowFilter = None,
        flow_run_filter: schemas.filters.FlowRunFilter = None,
        task_run_filter: schemas.filters.TaskRunFilter = None,
        deployment_filter: schemas.filters.DeploymentFilter = None,
        sort: schemas.sorting.TaskRunSort = None,
        limit: int = None,
        offset: int = 0,
    ) -> List[schemas.core.TaskRun]:
        """
        Query Orion for task runs. Only task runs matching all criteria will
        be returned.

        Args:
            flow_filter: filter criteria for flows
            flow_run_filter: filter criteria for flow runs
            task_run_filter: filter criteria for task runs
            deployment_filter: filter criteria for deployments
            sort: sort criteria for the task runs
            limit: a limit for the task run query
            offset: an offset for the task run query

        Returns:
            a list of [Task Run model][prefect.orion.schemas.core.TaskRun] representation
                of the task runs
        """
        body = {
            "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
            "flow_runs": (
                flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
            ),
            "task_runs": (
                task_run_filter.dict(json_compatible=True) if task_run_filter else None
            ),
            "deployments": (
                deployment_filter.dict(json_compatible=True)
                if deployment_filter
                else None
            ),
            "sort": sort,
            "limit": limit,
            "offset": offset,
        }
        response = await self._client.post(f"/task_runs/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.TaskRun], response.json())

    async def propose_state(
        self,
        state: schemas.states.State,
        backend_state_data: DataDocument = None,
        task_run_id: UUID = None,
        flow_run_id: UUID = None,
    ) -> schemas.states.State:
        """
        Propose a new state for a flow run or task run, invoking Orion
        orchestration logic.

        If the proposed state is accepted, the provided `state` will be
        augmented with details and returned.

        If the proposed state is rejected, a new state returned by the
        Orion API will be returned.

        If the proposed state results in a WAIT instruction from the Orion
        API, the function will sleep and attempt to propose the state again.

        If the proposed state results in an ABORT instruction from the Orion
        API, an error will be raised.

        Args:
            state: a new state for the task or flow run
            backend_state_data: an optional document to store with the state in the
                database instead of its local data field. This allows the original
                state object to be retained while storing a pointer to persisted data
                in the database.
            task_run_id: an optional task run id, used when proposing task run states
            flow_run_id: an optional flow run id, used when proposing flow run states

        Returns:
            a [State model][prefect.orion.schemas.states.State] representation of the
                flow or task run state

        Raises:
            ValueError: if neither task_run_id or flow_run_id is provided
            prefect.exceptions.Abort: if an ABORT instruction is received from
                the Orion API
        """

        # Determine if working with a task run or flow run
        if not task_run_id and not flow_run_id:
            raise ValueError("You must provide either a `task_run_id` or `flow_run_id`")

        # Attempt to set the state
        if task_run_id:
            response = await self.set_task_run_state(
                task_run_id, state, backend_state_data=backend_state_data
            )
        elif flow_run_id:
            response = await self.set_flow_run_state(
                flow_run_id, state, backend_state_data=backend_state_data
            )
        else:
            raise ValueError(
                "Neither flow run id or task run id were provided. At least one must "
                "be given."
            )

        # Parse the response to return the new state
        if response.status == schemas.responses.SetStateStatus.ACCEPT:
            # Update the state with the details if provided
            if response.state.state_details:
                state.state_details = response.state.state_details
            return state

        elif response.status == schemas.responses.SetStateStatus.ABORT:
            raise prefect.exceptions.Abort(response.details.reason)

        elif response.status == schemas.responses.SetStateStatus.WAIT:
            self.logger.debug(
                f"Received wait instruction for {response.details.delay_seconds}s: "
                f"{response.details.reason}"
            )
            await anyio.sleep(response.details.delay_seconds)
            return await self.propose_state(
                state,
                task_run_id=task_run_id,
                flow_run_id=flow_run_id,
                backend_state_data=backend_state_data,
            )

        elif response.status == schemas.responses.SetStateStatus.REJECT:
            server_state = response.state
            if server_state.data:
                if server_state.data.encoding == "blockstorage":
                    datadoc = DataDocument.parse_raw(
                        await self.retrieve_data(server_state.data)
                    )
                    server_state.data = datadoc

            return server_state

        else:
            raise ValueError(
                f"Received unexpected `SetStateStatus` from server: {response.status!r}"
            )

    async def set_task_run_state(
        self,
        task_run_id: UUID,
        state: schemas.states.State,
        force: bool = False,
        backend_state_data: schemas.data.DataDocument = None,
    ) -> OrchestrationResult:
        """
        Set the state of a task run.

        Args:
            task_run_id: the id of the task run
            state: the state to set
            force: if True, disregard orchestration logic when setting the state,
                forcing the Orion API to accept the state
            backend_state_data: an optional orion data document representing the state's data,
                if provided it will override `state.data`

        Returns:
            a [OrchestrationResult model][prefect.orion.orchestration.rules.OrchestrationResult]
                representation of state orchestration output
        """
        state_data = schemas.actions.StateCreate(
            name=state.name,
            type=state.type,
            message=state.message,
            data=backend_state_data or state.data,
            state_details=state.state_details,
        )
        state_data.state_details.task_run_id = task_run_id

        # Attempt to serialize the given data
        try:
            state_data_json = state_data.dict(json_compatible=True)
        except TypeError:
            # Drop the user data
            state_data.data = None
            state_data_json = state_data.dict(json_compatible=True)

        response = await self._client.post(
            f"/task_runs/{task_run_id}/set_state",
            json=dict(state=state_data_json, force=force),
        )
        return OrchestrationResult.parse_obj(response.json())

    async def read_task_run_states(
        self, task_run_id: UUID
    ) -> List[schemas.states.State]:
        """
        Query for the states of a task run

        Args:
            task_run_id: the id of the task run

        Returns:
            a list of [State model][prefect.orion.schemas.states.State] representation
                of the task run states
        """
        response = await self._client.get(
            "/task_run_states/", params=dict(task_run_id=task_run_id)
        )
        return pydantic.parse_obj_as(List[schemas.states.State], response.json())

    async def create_logs(self, logs: Iterable[Union[LogCreate, dict]]) -> None:
        """
        Create logs for a flow or task run

        Args:
            logs: An iterable of `LogCreate` objects or already json-compatible dicts
        """
        serialized_logs = [
            log.dict(json_compatible=True) if isinstance(log, LogCreate) else log
            for log in logs
        ]
        await self._client.post(f"/logs/", json=serialized_logs)

    async def read_logs(
        self, log_filter: LogFilter = None, limit: int = None, offset: int = None
    ) -> None:
        """
        Read flow and task run logs.
        """
        body = {
            "filter": log_filter.dict(json_compatible=True) if log_filter else None,
            "limit": limit,
            "offset": offset,
        }

        response = await self._client.post(f"/logs/filter", json=body)
        return pydantic.parse_obj_as(List[schemas.core.Log], response.json())

    async def resolve_datadoc(self, datadoc: DataDocument) -> Any:
        """
        Recursively decode possibly nested data documents.

        "orion" encoded documents will be retrieved from the server.

        Args:
            datadoc: The data document to resolve

        Returns:
            a decoded object, the innermost data
        """
        if not isinstance(datadoc, DataDocument):
            raise TypeError(
                f"`resolve_datadoc` received invalid type {type(datadoc).__name__}"
            )

        async def resolve_inner(data):
            if isinstance(data, bytes):
                try:
                    data = DataDocument.parse_raw(data)
                except pydantic.ValidationError:
                    return data

            if isinstance(data, DataDocument):
                if data.encoding == "blockstorage":
                    data = await self.retrieve_data(data)
                else:
                    data = data.decode()
                return await resolve_inner(data)

            return data

        return await resolve_inner(datadoc)

    async def __aenter__(self):
        """
        Start the client.

        If the client is already started, this will raise an exception.

        If the client is already closed, this will raise an exception. Use a new client
        instance instead.
        """
        if self._closed:
            # httpx.AsyncClient does not allow reuse so we will not either.
            raise RuntimeError(
                "The client cannot be started again after closing. "
                "Retrieve a new client with `get_client()` instead."
            )

        if self._started:
            # httpx.AsyncClient does not allow reentrancy so we will not either.
            raise RuntimeError("The client cannot be started more than once.")

        await self._exit_stack.__aenter__()

        # Enter a lifespan context if using an ephemeral application.
        # See https://github.com/encode/httpx/issues/350
        if self._ephemeral_app and self.manage_lifespan:
            self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
                app_lifespan_context(self._ephemeral_app)
            )

        if self._ephemeral_app:
            self.logger.debug(
                "Using ephemeral application with database at "
                f"{PREFECT_ORION_DATABASE_CONNECTION_URL.value()}"
            )
        else:
            self.logger.debug(f"Connecting to API at {self.api_url}")

        # Enter the httpx client's context
        await self._exit_stack.enter_async_context(self._client)

        self._started = True

        return self

    async def __aexit__(self, *exc_info):
        """
        Shutdown the client.
        """
        self._closed = True
        return await self._exit_stack.__aexit__(*exc_info)

    def __enter__(self):
        raise RuntimeError(
            "The `OrionClient` must be entered with an async context. Use 'async "
            "with OrionClient(...)' not 'with OrionClient(...)'"
        )

    def __exit__(self, *_):
        assert False, "This should never be called but must be defined for __enter__"

api_url property readonly

Type: URL

Get the base URL for the API.

OrionClient.__aenter__ async special

Start the client.

If the client is already started, this will raise an exception.

If the client is already closed, this will raise an exception. Use a new client instance instead.

Source code in prefect/client.py
async def __aenter__(self):
    """
    Start the client.

    If the client is already started, this will raise an exception.

    If the client is already closed, this will raise an exception. Use a new client
    instance instead.
    """
    if self._closed:
        # httpx.AsyncClient does not allow reuse so we will not either.
        raise RuntimeError(
            "The client cannot be started again after closing. "
            "Retrieve a new client with `get_client()` instead."
        )

    if self._started:
        # httpx.AsyncClient does not allow reentrancy so we will not either.
        raise RuntimeError("The client cannot be started more than once.")

    await self._exit_stack.__aenter__()

    # Enter a lifespan context if using an ephemeral application.
    # See https://github.com/encode/httpx/issues/350
    if self._ephemeral_app and self.manage_lifespan:
        self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
            app_lifespan_context(self._ephemeral_app)
        )

    if self._ephemeral_app:
        self.logger.debug(
            "Using ephemeral application with database at "
            f"{PREFECT_ORION_DATABASE_CONNECTION_URL.value()}"
        )
    else:
        self.logger.debug(f"Connecting to API at {self.api_url}")

    # Enter the httpx client's context
    await self._exit_stack.enter_async_context(self._client)

    self._started = True

    return self

OrionClient.__aexit__ async special

Shutdown the client.

Source code in prefect/client.py
async def __aexit__(self, *exc_info):
    """
    Shutdown the client.
    """
    self._closed = True
    return await self._exit_stack.__aexit__(*exc_info)

OrionClient.api_healthcheck async

Attempts to connect to the API and returns the encountered exception if not successful.

If successful, returns None.

Source code in prefect/client.py
async def api_healthcheck(self) -> Optional[Exception]:
    """
    Attempts to connect to the API and returns the encountered exception if not
    successful.

    If successful, returns `None`.
    """
    try:
        with anyio.fail_after(10):
            await self._client.get("/health")
            return None
    except Exception as exc:
        return exc

OrionClient.create_block async

Create a block in Orion. This data is used to configure a corresponding Block.

Source code in prefect/client.py
async def create_block(
    self,
    block: prefect.blocks.core.Block,
    block_spec_id: UUID = None,
    name: str = None,
) -> Optional[UUID]:
    """
    Create a block in Orion. This data is used to configure a corresponding
    Block.
    """

    api_block = block.to_api_block(name=name, block_spec_id=block_spec_id)

    # Drop fields that are not compliant with `CreateBlock`
    payload = api_block.dict(
        json_compatible=True, exclude={"block_spec", "id"}, exclude_unset=True
    )

    try:
        response = await self._client.post(
            "/blocks/",
            json=payload,
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_409_CONFLICT:
            raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
        else:
            raise
    return UUID(response.json().get("id"))

OrionClient.create_concurrency_limit async

Create a tag concurrency limit in Orion. These limits govern concurrently running tasks.

Parameters:

Name Description Default
tag

a tag the concurrency limit is applied to

str
required
concurrency_limit

the maximum number of concurrent task runs for a given tag

int
required

Exceptions:

Type Description
httpx.RequestError

if the concurrency limit was not created for any reason

Returns:

Type Description
UUID

the ID of the concurrency limit in the backend

Source code in prefect/client.py
async def create_concurrency_limit(
    self,
    tag: str,
    concurrency_limit: int,
) -> UUID:
    """
    Create a tag concurrency limit in Orion. These limits govern concurrently
    running tasks.

    Args:
        tag: a tag the concurrency limit is applied to
        concurrency_limit: the maximum number of concurrent task runs for a given tag

    Raises:
        httpx.RequestError: if the concurrency limit was not created for any reason

    Returns:
        the ID of the concurrency limit in the backend
    """

    concurrency_limit_create = schemas.actions.ConcurrencyLimitCreate(
        tag=tag,
        concurrency_limit=concurrency_limit,
    )
    response = await self._client.post(
        "/concurrency_limits/",
        json=concurrency_limit_create.dict(json_compatible=True),
    )

    concurrency_limit_id = response.json().get("id")

    if not concurrency_limit_id:
        raise httpx.RequestError(f"Malformed response: {response}")

    return UUID(concurrency_limit_id)

OrionClient.create_deployment async

Create a flow deployment in Orion.

Parameters:

Name Description Default
flow_id

the flow ID to create a deployment for

UUID
required
name

the name of the deployment

str
required
flow_data

a data document that can be resolved into a flow object or script

DataDocument
required
schedule

an optional schedule to apply to the deployment

Union[prefect.orion.schemas.schedules.IntervalSchedule, prefect.orion.schemas.schedules.CronSchedule, prefect.orion.schemas.schedules.RRuleSchedule]
None
tags

an optional list of tags to apply to the deployment

List[str]
None
flow_runner

an optional flow runner to specify for this deployment

FlowRunner
None

Exceptions:

Type Description
httpx.RequestError

if the deployment was not created for any reason

Returns:

Type Description
UUID

the ID of the deployment in the backend

Source code in prefect/client.py
async def create_deployment(
    self,
    flow_id: UUID,
    name: str,
    flow_data: DataDocument,
    schedule: schemas.schedules.SCHEDULE_TYPES = None,
    parameters: Dict[str, Any] = None,
    tags: List[str] = None,
    flow_runner: "FlowRunner" = None,
) -> UUID:
    """
    Create a flow deployment in Orion.

    Args:
        flow_id: the flow ID to create a deployment for
        name: the name of the deployment
        flow_data: a data document that can be resolved into a flow object or script
        schedule: an optional schedule to apply to the deployment
        tags: an optional list of tags to apply to the deployment
        flow_runner: an optional flow runner to specify for this deployment

    Raises:
        httpx.RequestError: if the deployment was not created for any reason

    Returns:
        the ID of the deployment in the backend
    """
    deployment_create = schemas.actions.DeploymentCreate(
        flow_id=flow_id,
        name=name,
        schedule=schedule,
        flow_data=flow_data,
        parameters=dict(parameters or {}),
        tags=list(tags or []),
        flow_runner=flow_runner.to_settings() if flow_runner else None,
    )

    response = await self._client.post(
        "/deployments/", json=deployment_create.dict(json_compatible=True)
    )
    deployment_id = response.json().get("id")
    if not deployment_id:
        raise httpx.RequestError(f"Malformed response: {response}")

    return UUID(deployment_id)

OrionClient.create_flow async

Create a flow in Orion.

Parameters:

Name Description Default
flow

a Flow object

Flow
required

Exceptions:

Type Description
httpx.RequestError

if a flow was not created for any reason

Returns:

Type Description
UUID

the ID of the flow in the backend

Source code in prefect/client.py
async def create_flow(self, flow: "Flow") -> UUID:
    """
    Create a flow in Orion.

    Args:
        flow: a [Flow][prefect.flows.Flow] object

    Raises:
        httpx.RequestError: if a flow was not created for any reason

    Returns:
        the ID of the flow in the backend
    """
    flow_data = schemas.actions.FlowCreate(name=flow.name)
    response = await self._client.post(
        "/flows/", json=flow_data.dict(json_compatible=True)
    )

    flow_id = response.json().get("id")
    if not flow_id:
        raise httpx.RequestError(f"Malformed response: {response}")

    # Return the id of the created flow
    return UUID(flow_id)

OrionClient.create_flow_run async

Create a flow run for a flow.

Parameters:

Name Description Default
flow

The flow model to create the flow run for

Flow
required
name

An optional name for the flow run

str
None
parameters

Parameter overrides for this flow run.

Dict[str, Any]
None
context

Optional run context data

dict
None
tags

a list of tags to apply to this flow run

Iterable[str]
None
parent_task_run_id

if a subflow run is being created, the placeholder task run ID of the parent flow

UUID
None
state

The initial state for the run. If not provided, defaults to Scheduled for now. Should always be a Scheduled type.

State
None
flow_runner

An optional flow runnner to use to execute this flow run.

FlowRunner
None

Exceptions:

Type Description
httpx.RequestError

if Orion does not successfully create a run for any reason

Returns:

Type Description
FlowRun

The flow run model

Source code in prefect/client.py
async def create_flow_run(
    self,
    flow: "Flow",
    name: str = None,
    parameters: Dict[str, Any] = None,
    context: dict = None,
    tags: Iterable[str] = None,
    parent_task_run_id: UUID = None,
    state: schemas.states.State = None,
    flow_runner: "FlowRunner" = None,
) -> schemas.core.FlowRun:
    """
    Create a flow run for a flow.

    Args:
        flow: The flow model to create the flow run for
        name: An optional name for the flow run
        parameters: Parameter overrides for this flow run.
        context: Optional run context data
        tags: a list of tags to apply to this flow run
        parent_task_run_id: if a subflow run is being created, the placeholder task run ID
            of the parent flow
        state: The initial state for the run. If not provided, defaults to
            `Scheduled` for now. Should always be a `Scheduled` type.
        flow_runner: An optional flow runnner to use to execute this flow run.

    Raises:
        httpx.RequestError: if Orion does not successfully create a run for any reason

    Returns:
        The flow run model
    """
    parameters = parameters or {}
    context = context or {}

    if state is None:
        state = schemas.states.Pending()

    # Retrieve the flow id
    flow_id = await self.create_flow(flow)

    flow_run_create = schemas.actions.FlowRunCreate(
        flow_id=flow_id,
        flow_version=flow.version,
        name=name,
        parameters=parameters,
        context=context,
        tags=list(tags or []),
        parent_task_run_id=parent_task_run_id,
        state=state,
        flow_runner=flow_runner.to_settings() if flow_runner else None,
    )

    flow_run_create_json = flow_run_create.dict(json_compatible=True)
    response = await self._client.post("/flow_runs/", json=flow_run_create_json)
    flow_run = schemas.core.FlowRun.parse_obj(response.json())

    # Restore the parameters to the local objects to retain expectations about
    # Python objects
    flow_run.parameters = parameters

    return flow_run

OrionClient.create_flow_run_from_deployment async

Create a flow run for a deployment.

Parameters:

Name Description Default
deployment

The deployment model to create the flow run from

required
parameters

Parameter overrides for this flow run. Merged with the deployment defaults

Dict[str, Any]
None
context

Optional run context data

dict
None
state

The initial state for the run. If not provided, defaults to Scheduled for now. Should always be a Scheduled type.

State
None
flow_runner

An optional flow runnner to use to execute this flow run.

FlowRunner
None

Exceptions:

Type Description
httpx.RequestError

if Orion does not successfully create a run for any reason

Returns:

Type Description
FlowRun

The flow run model

Source code in prefect/client.py
async def create_flow_run_from_deployment(
    self,
    deployment_id: UUID,
    *,
    parameters: Dict[str, Any] = None,
    context: dict = None,
    state: schemas.states.State = None,
    flow_runner: "FlowRunner" = None,
) -> schemas.core.FlowRun:
    """
    Create a flow run for a deployment.

    Args:
        deployment: The deployment model to create the flow run from
        parameters: Parameter overrides for this flow run. Merged with the
            deployment defaults
        context: Optional run context data
        state: The initial state for the run. If not provided, defaults to
            `Scheduled` for now. Should always be a `Scheduled` type.
        flow_runner: An optional flow runnner to use to execute this flow run.

    Raises:
        httpx.RequestError: if Orion does not successfully create a run for any reason

    Returns:
        The flow run model
    """
    parameters = parameters or {}
    context = context or {}
    state = state or Scheduled()

    flow_run_create = schemas.actions.DeploymentFlowRunCreate(
        parameters=parameters,
        context=context,
        state=state,
        flow_runner=flow_runner.to_settings() if flow_runner else None,
    )

    response = await self._client.post(
        f"/deployments/{deployment_id}/create_flow_run",
        json=flow_run_create.dict(json_compatible=True),
    )
    return schemas.core.FlowRun.parse_obj(response.json())

OrionClient.create_logs async

Create logs for a flow or task run

Parameters:

Name Description Default
logs

An iterable of LogCreate objects or already json-compatible dicts

Iterable[Union[prefect.orion.schemas.actions.LogCreate, dict]]
required
Source code in prefect/client.py
async def create_logs(self, logs: Iterable[Union[LogCreate, dict]]) -> None:
    """
    Create logs for a flow or task run

    Args:
        logs: An iterable of `LogCreate` objects or already json-compatible dicts
    """
    serialized_logs = [
        log.dict(json_compatible=True) if isinstance(log, LogCreate) else log
        for log in logs
    ]
    await self._client.post(f"/logs/", json=serialized_logs)

OrionClient.create_task_run async

Create a task run

Parameters:

Name Description Default
task

The Task to run

Task
required
flow_run_id

The flow run id with which to associate the task run

UUID
required
dynamic_key

A key unique to this particular run of a Task within the flow

str
required
name

An optional name for the task run

str
None
extra_tags

an optional list of extra tags to apply to the task run in addition to task.tags

Iterable[str]
None
state

The initial state for the run. If not provided, defaults to Pending for now. Should always be a Scheduled type.

State
None
task_inputs

the set of inputs passed to the task

Dict[str, List[Union[prefect.orion.schemas.core.TaskRunResult, prefect.orion.schemas.core.Parameter, prefect.orion.schemas.core.Constant]]]
None

Returns:

Type Description
UUID

The UUID of the newly created task run

Source code in prefect/client.py
async def create_task_run(
    self,
    task: "Task",
    flow_run_id: UUID,
    dynamic_key: str,
    name: str = None,
    extra_tags: Iterable[str] = None,
    state: schemas.states.State = None,
    task_inputs: Dict[
        str,
        List[
            Union[
                schemas.core.TaskRunResult,
                schemas.core.Parameter,
                schemas.core.Constant,
            ]
        ],
    ] = None,
) -> UUID:
    """
    Create a task run

    Args:
        task: The Task to run
        flow_run_id: The flow run id with which to associate the task run
        dynamic_key: A key unique to this particular run of a Task within the flow
        name: An optional name for the task run
        extra_tags: an optional list of extra tags to apply to the task run in
            addition to `task.tags`
        state: The initial state for the run. If not provided, defaults to
            `Pending` for now. Should always be a `Scheduled` type.
        task_inputs: the set of inputs passed to the task

    Returns:
        The UUID of the newly created task run
    """
    tags = set(task.tags).union(extra_tags or [])

    if state is None:
        state = schemas.states.Pending()

    task_run_data = schemas.actions.TaskRunCreate(
        name=name or f"{task.name}-{task.task_key[:8]}-{dynamic_key}",
        flow_run_id=flow_run_id,
        task_key=task.task_key,
        dynamic_key=dynamic_key,
        tags=list(tags),
        empirical_policy=schemas.core.TaskRunPolicy(
            max_retries=task.retries,
            retry_delay_seconds=task.retry_delay_seconds,
        ),
        state=state,
        task_inputs=task_inputs or {},
    )

    response = await self._client.post(
        "/task_runs/", json=task_run_data.dict(json_compatible=True)
    )
    return TaskRun.parse_obj(response.json())

OrionClient.create_work_queue async

Create a work queue.

Parameters:

Name Description Default
name

a unique name for the work queue

str
required
tags

an optional list of tags to filter on; only work scheduled with these tags will be included in the queue

List[str]
None
deployment_ids

an optional list of deployment IDs to filter on; only work scheduled from these deployments will be included in the queue

List[uuid.UUID]
None
flow_runner_types

an optional list of FlowRunner types to filter on; only work scheduled with these FlowRunners will be included in the queue

List[str]
None

Exceptions:

Type Description
prefect.exceptions.ObjectAlreadyExists

If request returns 409

httpx.RequestError

If request fails

Returns:

Type Description
UUID

The UUID of the newly created workflow

Source code in prefect/client.py
async def create_work_queue(
    self,
    name: str,
    tags: List[str] = None,
    deployment_ids: List[UUID] = None,
    flow_runner_types: List[str] = None,
) -> UUID:
    """
    Create a work queue.

    Args:
        name: a unique name for the work queue
        tags: an optional list of tags to filter on; only work scheduled with these tags
            will be included in the queue
        deployment_ids: an optional list of deployment IDs to filter on; only work scheduled from these deployments
            will be included in the queue
        flow_runner_types: an optional list of FlowRunner types to filter on; only work scheduled with these FlowRunners
            will be included in the queue

    Raises:
        prefect.exceptions.ObjectAlreadyExists: If request returns 409
        httpx.RequestError: If request fails

    Returns:
        UUID: The UUID of the newly created workflow
    """
    data = WorkQueueCreate(
        name=name,
        filter=QueueFilter(
            tags=tags or None,
            deployment_ids=deployment_ids or None,
            flow_runner_types=flow_runner_types or None,
        ),
    ).dict(json_compatible=True)
    try:
        response = await self._client.post("/work_queues/", json=data)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_409_CONFLICT:
            raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
        else:
            raise

    work_queue_id = response.json().get("id")
    if not work_queue_id:
        raise httpx.RequestError(str(response))
    return UUID(work_queue_id)

OrionClient.delete_concurrency_limit_by_tag async

Delete the concurrency limit set on a specific tag.

Parameters:

Name Description Default
tag

a tag the concurrency limit is applied to

str
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If request fails

Source code in prefect/client.py
async def delete_concurrency_limit_by_tag(
    self,
    tag: str,
):
    """
    Delete the concurrency limit set on a specific tag.

    Args:
        tag: a tag the concurrency limit is applied to

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If request fails

    """
    try:
        await self._client.delete(
            f"/concurrency_limits/tag/{tag}",
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

OrionClient.delete_deployment async

Delete deployment by id.

Parameters:

Name Description Default
deployment_id

The deployment id of interest.

UUID
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If requests fails

Source code in prefect/client.py
async def delete_deployment(
    self,
    deployment_id: UUID,
):
    """
    Delete deployment by id.

    Args:
        deployment_id: The deployment id of interest.
    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If requests fails
    """
    try:
        await self._client.delete(f"/deployments/{deployment_id}")
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

OrionClient.delete_work_queue_by_id async

Delete a work queue by its ID.

Parameters:

Name Description Default
id

the id of the work queue to delete

UUID
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If requests fails

Source code in prefect/client.py
async def delete_work_queue_by_id(
    self,
    id: UUID,
):
    """
    Delete a work queue by its ID.

    Args:
        id: the id of the work queue to delete

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If requests fails
    """
    try:
        await self._client.delete(
            f"/work_queues/{id}",
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

OrionClient.get_default_storage_block async

Returns the default storage block

Parameters:

Name Description Default
as_json

if True, the raw JSON from the API is returned. This can avoid instantiating a storage block (and any side effects) Defaults to False.

bool
False

Returns:

Type Description
Optional[Block]
Source code in prefect/client.py
async def get_default_storage_block(
    self, as_json: bool = False
) -> Optional[Union[Block, Dict[str, Any]]]:
    """Returns the default storage block

    Args:
        as_json (bool, optional): if True, the raw JSON from the API is
            returned. This can avoid instantiating a storage block (and any side
            effects) Defaults to False.

    Returns:
        Optional[Block]:
    """
    response = await self._client.post("/blocks/get_default_storage_block")
    if not response.content:
        return None
    if as_json:
        return response.json()
    return create_block_from_api_block(response.json())

OrionClient.get_runs_in_work_queue async

Read flow runs off a work queue.

Parameters:

Name Description Default
id

the id of the work queue to read from

UUID
required
limit

a limit on the number of runs to return

int
10
scheduled_before

a timestamp; only runs scheduled before this time will be returned. Defaults to now.

datetime
None

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If request fails

Returns:

Type Description
List[schemas.core.FlowRun]

a list of FlowRun objects read from the queue

Source code in prefect/client.py
async def get_runs_in_work_queue(
    self,
    id: UUID,
    limit: int = 10,
    scheduled_before: datetime.datetime = None,
) -> List[schemas.core.FlowRun]:
    """
    Read flow runs off a work queue.

    Args:
        id: the id of the work queue to read from
        limit: a limit on the number of runs to return
        scheduled_before: a timestamp; only runs scheduled before this time will be returned.
            Defaults to now.

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If request fails

    Returns:
        List[schemas.core.FlowRun]: a list of FlowRun objects read from the queue
    """
    json_data = {"limit": limit}
    if scheduled_before:
        json_data.update({"scheduled_before": scheduled_before.isoformat()})

    try:
        response = await self._client.post(
            f"/work_queues/{id}/get_runs",
            json=json_data,
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise
    return pydantic.parse_obj_as(List[schemas.core.FlowRun], response.json())

OrionClient.hello async

Send a GET request to /hello for testing purposes.

Source code in prefect/client.py
async def hello(self) -> httpx.Response:
    """
    Send a GET request to /hello for testing purposes.
    """
    return await self._client.get("/hello")

OrionClient.persist_data async

Persist data in orion and return the orion data document

Parameters:

Name Description Default
data

the data to persist

bytes
required

Returns:

Type Description
DataDocument

Orion data document pointing to persisted data.

Source code in prefect/client.py
async def persist_data(
    self, data: bytes, block: StorageBlock = None
) -> DataDocument:
    """
    Persist data in orion and return the orion data document

    Args:
        data: the data to persist

    Returns:
        Orion data document pointing to persisted data.
    """
    block = block or await self.get_default_storage_block()
    if not block:
        raise ValueError(
            "No storage block was provided and no default storage block is set "
            "on the server. Set a default or provide a block to use."
        )

    storage_token = await block.write(data)
    storage_datadoc = DataDocument.encode(
        encoding="blockstorage",
        data={"data": storage_token, "block_id": block._block_id},
    )
    return storage_datadoc

OrionClient.persist_object async

Persist an object in orion and return the orion data document

Parameters:

Name Description Default
obj

the object to persist

Any
required
encoder

An optional encoder for the data document.

str
'cloudpickle'

Returns:

Type Description
DataDocument

Data document pointing to persisted data.

Source code in prefect/client.py
async def persist_object(
    self, obj: Any, encoder: str = "cloudpickle", storage_block: StorageBlock = None
) -> DataDocument:
    """
    Persist an object in orion and return the orion data document

    Args:
        obj: the object to persist
        encoder: An optional encoder for the data document.

    Returns:
        Data document pointing to persisted data.
    """
    datadoc = DataDocument.encode(encoding=encoder, data=obj)
    return await self.persist_data(datadoc.json().encode(), block=storage_block)

OrionClient.propose_state async

Propose a new state for a flow run or task run, invoking Orion orchestration logic.

If the proposed state is accepted, the provided state will be augmented with details and returned.

If the proposed state is rejected, a new state returned by the Orion API will be returned.

If the proposed state results in a WAIT instruction from the Orion API, the function will sleep and attempt to propose the state again.

If the proposed state results in an ABORT instruction from the Orion API, an error will be raised.

Parameters:

Name Description Default
state

a new state for the task or flow run

State
required
backend_state_data

an optional document to store with the state in the database instead of its local data field. This allows the original state object to be retained while storing a pointer to persisted data in the database.

DataDocument
None
task_run_id

an optional task run id, used when proposing task run states

UUID
None
flow_run_id

an optional flow run id, used when proposing flow run states

UUID
None

Returns:

Type Description
State

a State model representation of the flow or task run state

Exceptions:

Type Description
ValueError

if neither task_run_id or flow_run_id is provided

prefect.exceptions.Abort

if an ABORT instruction is received from the Orion API

Source code in prefect/client.py
async def propose_state(
    self,
    state: schemas.states.State,
    backend_state_data: DataDocument = None,
    task_run_id: UUID = None,
    flow_run_id: UUID = None,
) -> schemas.states.State:
    """
    Propose a new state for a flow run or task run, invoking Orion
    orchestration logic.

    If the proposed state is accepted, the provided `state` will be
    augmented with details and returned.

    If the proposed state is rejected, a new state returned by the
    Orion API will be returned.

    If the proposed state results in a WAIT instruction from the Orion
    API, the function will sleep and attempt to propose the state again.

    If the proposed state results in an ABORT instruction from the Orion
    API, an error will be raised.

    Args:
        state: a new state for the task or flow run
        backend_state_data: an optional document to store with the state in the
            database instead of its local data field. This allows the original
            state object to be retained while storing a pointer to persisted data
            in the database.
        task_run_id: an optional task run id, used when proposing task run states
        flow_run_id: an optional flow run id, used when proposing flow run states

    Returns:
        a [State model][prefect.orion.schemas.states.State] representation of the
            flow or task run state

    Raises:
        ValueError: if neither task_run_id or flow_run_id is provided
        prefect.exceptions.Abort: if an ABORT instruction is received from
            the Orion API
    """

    # Determine if working with a task run or flow run
    if not task_run_id and not flow_run_id:
        raise ValueError("You must provide either a `task_run_id` or `flow_run_id`")

    # Attempt to set the state
    if task_run_id:
        response = await self.set_task_run_state(
            task_run_id, state, backend_state_data=backend_state_data
        )
    elif flow_run_id:
        response = await self.set_flow_run_state(
            flow_run_id, state, backend_state_data=backend_state_data
        )
    else:
        raise ValueError(
            "Neither flow run id or task run id were provided. At least one must "
            "be given."
        )

    # Parse the response to return the new state
    if response.status == schemas.responses.SetStateStatus.ACCEPT:
        # Update the state with the details if provided
        if response.state.state_details:
            state.state_details = response.state.state_details
        return state

    elif response.status == schemas.responses.SetStateStatus.ABORT:
        raise prefect.exceptions.Abort(response.details.reason)

    elif response.status == schemas.responses.SetStateStatus.WAIT:
        self.logger.debug(
            f"Received wait instruction for {response.details.delay_seconds}s: "
            f"{response.details.reason}"
        )
        await anyio.sleep(response.details.delay_seconds)
        return await self.propose_state(
            state,
            task_run_id=task_run_id,
            flow_run_id=flow_run_id,
            backend_state_data=backend_state_data,
        )

    elif response.status == schemas.responses.SetStateStatus.REJECT:
        server_state = response.state
        if server_state.data:
            if server_state.data.encoding == "blockstorage":
                datadoc = DataDocument.parse_raw(
                    await self.retrieve_data(server_state.data)
                )
                server_state.data = datadoc

        return server_state

    else:
        raise ValueError(
            f"Received unexpected `SetStateStatus` from server: {response.status!r}"
        )

OrionClient.read_block async

Read the block with the specified name that corresponds to a specific block spec name and version.

Parameters:

Name Description Default
block_id

the block id

UUID
required

Exceptions:

Type Description
httpx.RequestError

if the block was not found for any reason

Returns:

Type Description

A hydrated block or None.

Source code in prefect/client.py
async def read_block(self, block_id: UUID):
    """
    Read the block with the specified name that corresponds to a
    specific block spec name and version.

    Args:
        block_id (UUID): the block id

    Raises:
        httpx.RequestError: if the block was not found for any reason

    Returns:
        A hydrated block or None.
    """
    response = await self._client.get(f"/blocks/{block_id}")
    return create_block_from_api_block(response.json())

OrionClient.read_block_by_name async

Read the block with the specified name that corresponds to a specific block spec name and version.

Parameters:

Name Description Default
name

The block name.

str
required
block_spec_name

the block spec name

str
required
block_spec_version

the block spec version. If not provided, the most recent matching version will be returned.

str
required

Exceptions:

Type Description
httpx.RequestError

if the block was not found for any reason

Returns:

Type Description

A hydrated block or None.

Source code in prefect/client.py
async def read_block_by_name(
    self,
    name: str,
    block_spec_name: str,
    block_spec_version: str,
):
    """
    Read the block with the specified name that corresponds to a
    specific block spec name and version.

    Args:
        name (str): The block name.
        block_spec_name (str): the block spec name
        block_spec_version (str): the block spec version. If not provided,
            the most recent matching version will be returned.

    Raises:
        httpx.RequestError: if the block was not found for any reason

    Returns:
        A hydrated block or None.
    """
    response = await self._client.get(
        f"/block_specs/{block_spec_name}/versions/{block_spec_version}/block/{name}",
    )
    return create_block_from_api_block(response.json())

OrionClient.read_block_spec_by_name async

Look up a block spec by name and version

Source code in prefect/client.py
async def read_block_spec_by_name(
    self, name: str, version: str
) -> schemas.core.BlockSpec:
    """
    Look up a block spec by name and version
    """
    try:
        response = await self._client.get(f"block_specs/{name}/versions/{version}")
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise
    return schemas.core.BlockSpec.parse_obj(response.json())

OrionClient.read_block_specs async

Read all block specs with the given type

Parameters:

Name Description Default
type

The name of the type of block spec

str
required

Exceptions:

Type Description
httpx.RequestError

if the block was not found for any reason

Returns:

Type Description
List[prefect.orion.schemas.core.BlockSpec]

A hydrated block or None.

Source code in prefect/client.py
async def read_block_specs(self, type: str) -> List[schemas.core.BlockSpec]:
    """
    Read all block specs with the given type

    Args:
        type: The name of the type of block spec

    Raises:
        httpx.RequestError: if the block was not found for any reason

    Returns:
        A hydrated block or None.
    """
    response = await self._client.post(
        f"/block_specs/filter", json={"block_spec_type": type}
    )
    return pydantic.parse_obj_as(List[schemas.core.BlockSpec], response.json())

OrionClient.read_blocks async

Read blocks

Parameters:

Name Description Default
block_spec_type

an optional block spec type

str
None
offset

an offset

int
None
limit

the number of blocks to return

int
None
as_json

if False, fully hydrated Blocks are loaded. Otherwise, JSON is returned from the API.

bool
False

Returns:

Type Description
List[Union[prefect.blocks.core.Block, Dict[str, Any]]]

A list of blocks

Source code in prefect/client.py
async def read_blocks(
    self,
    block_spec_type: str = None,
    offset: int = None,
    limit: int = None,
    as_json: bool = False,
) -> List[Union[prefect.blocks.core.Block, Dict[str, Any]]]:
    """
    Read blocks

    Args:
        block_spec_type (str): an optional block spec type
        offset (int): an offset
        limit (int): the number of blocks to return
        as_json (bool): if False, fully hydrated Blocks are loaded. Otherwise,
            JSON is returned from the API.

    Returns:
        A list of blocks
    """
    response = await self._client.post(
        f"/blocks/filter",
        json=dict(block_spec_type=block_spec_type, offset=offset, limit=limit),
    )
    json_result = response.json()
    if as_json:
        return json_result
    return [create_block_from_api_block(b) for b in json_result]

OrionClient.read_concurrency_limit_by_tag async

Read the concurrency limit set on a specific tag.

Parameters:

Name Description Default
tag

a tag the concurrency limit is applied to

str
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

if the concurrency limit was not created for any reason

Returns:

Type Description

the concurrency limit set on a specific tag

Source code in prefect/client.py
async def read_concurrency_limit_by_tag(
    self,
    tag: str,
):
    """
    Read the concurrency limit set on a specific tag.

    Args:
        tag: a tag the concurrency limit is applied to

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: if the concurrency limit was not created for any reason

    Returns:
        the concurrency limit set on a specific tag
    """
    try:
        response = await self._client.get(
            f"/concurrency_limits/tag/{tag}",
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

    concurrency_limit_id = response.json().get("id")

    if not concurrency_limit_id:
        raise httpx.RequestError(f"Malformed response: {response}")

    concurrency_limit = schemas.core.ConcurrencyLimit.parse_obj(response.json())
    return concurrency_limit

OrionClient.read_concurrency_limits async

Lists concurrency limits set on task run tags.

Parameters:

Name Description Default
limit

the maximum number of concurrency limits returned

int
required
offset

the concurrency limit query offset

int
required

Returns:

Type Description

a list of concurrency limits

Source code in prefect/client.py
async def read_concurrency_limits(
    self,
    limit: int,
    offset: int,
):
    """
    Lists concurrency limits set on task run tags.

    Args:
        limit: the maximum number of concurrency limits returned
        offset: the concurrency limit query offset

    Returns:
        a list of concurrency limits
    """

    body = {
        "limit": limit,
        "offset": offset,
    }

    response = await self._client.post("/concurrency_limits/filter", json=body)
    return pydantic.parse_obj_as(
        List[schemas.core.ConcurrencyLimit], response.json()
    )

OrionClient.read_deployment async

Query Orion for a deployment by id.

Parameters:

Name Description Default
deployment_id

the deployment ID of interest

UUID
required

Returns:

Type Description
Deployment

a Deployment model representation of the deployment

Source code in prefect/client.py
async def read_deployment(
    self,
    deployment_id: UUID,
) -> schemas.core.Deployment:
    """
    Query Orion for a deployment by id.

    Args:
        deployment_id: the deployment ID of interest

    Returns:
        a [Deployment model][prefect.orion.schemas.core.Deployment] representation of the deployment
    """
    response = await self._client.get(f"/deployments/{deployment_id}")
    return schemas.core.Deployment.parse_obj(response.json())

OrionClient.read_deployment_by_name async

Query Orion for a deployment by name.

Parameters:

Name Description Default
name

the deployment name of interest

str
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If request fails

Returns:

Type Description
Deployment

a Deployment model representation of the deployment

Source code in prefect/client.py
async def read_deployment_by_name(
    self,
    name: str,
) -> schemas.core.Deployment:
    """
    Query Orion for a deployment by name.

    Args:
        name: the deployment name of interest

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If request fails

    Returns:
        a [Deployment model][prefect.orion.schemas.core.Deployment] representation of the deployment
    """
    try:
        response = await self._client.get(f"/deployments/name/{name}")
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

    return schemas.core.Deployment.parse_obj(response.json())

OrionClient.read_deployments async

Query Orion for deployments. Only deployments matching all the provided criteria will be returned.

Parameters:

Name Description Default
flow_filter

filter criteria for flows

FlowFilter
None
flow_run_filter

filter criteria for flow runs

FlowRunFilter
None
task_run_filter

filter criteria for task runs

TaskRunFilter
None
deployment_filter

filter criteria for deployments

DeploymentFilter
None
limit

a limit for the deployment query

int
None
offset

an offset for the deployment query

int
0

Returns:

Type Description
Deployment

a list of Deployment model representation of the deployments

Source code in prefect/client.py
async def read_deployments(
    self,
    *,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    limit: int = None,
    offset: int = 0,
) -> schemas.core.Deployment:
    """
    Query Orion for deployments. Only deployments matching all
    the provided criteria will be returned.

    Args:
        flow_filter: filter criteria for flows
        flow_run_filter: filter criteria for flow runs
        task_run_filter: filter criteria for task runs
        deployment_filter: filter criteria for deployments
        limit: a limit for the deployment query
        offset: an offset for the deployment query

    Returns:
        a list of [Deployment model][prefect.orion.schemas.core.Deployment] representation
            of the deployments
    """
    body = {
        "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
        "flow_runs": (
            flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
        ),
        "task_runs": (
            task_run_filter.dict(json_compatible=True) if task_run_filter else None
        ),
        "deployments": (
            deployment_filter.dict(json_compatible=True)
            if deployment_filter
            else None
        ),
        "limit": limit,
        "offset": offset,
    }
    response = await self._client.post(f"/deployments/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.Deployment], response.json())

OrionClient.read_flow async

Query Orion for a flow by id.

Parameters:

Name Description Default
flow_id

the flow ID of interest

UUID
required

Returns:

Type Description
Flow

a Flow model representation of the flow

Source code in prefect/client.py
async def read_flow(self, flow_id: UUID) -> schemas.core.Flow:
    """
    Query Orion for a flow by id.

    Args:
        flow_id: the flow ID of interest

    Returns:
        a [Flow model][prefect.orion.schemas.core.Flow] representation of the flow
    """
    response = await self._client.get(f"/flows/{flow_id}")
    return schemas.core.Flow.parse_obj(response.json())

OrionClient.read_flow_by_name async

Query Orion for a flow by name.

Parameters:

Name Description Default
flow_name

the name of a flow

str
required

Returns:

Type Description
Flow

a fully hydrated Flow model

Source code in prefect/client.py
async def read_flow_by_name(
    self,
    flow_name: str,
) -> schemas.core.Flow:
    """
    Query Orion for a flow by name.

    Args:
        flow_name: the name of a flow

    Returns:
        a fully hydrated [Flow model][prefect.orion.schemas.core.Flow]
    """
    response = await self._client.get(f"/flows/name/{flow_name}")
    return schemas.core.Deployment.parse_obj(response.json())

OrionClient.read_flow_run async

Query Orion for a flow run by id.

Parameters:

Name Description Default
flow_run_id

the flow run ID of interest

UUID
required

Returns:

Type Description
FlowRun

a Flow Run model representation of the flow run

Source code in prefect/client.py
async def read_flow_run(self, flow_run_id: UUID) -> schemas.core.FlowRun:
    """
    Query Orion for a flow run by id.

    Args:
        flow_run_id: the flow run ID of interest

    Returns:
        a [Flow Run model][prefect.orion.schemas.core.FlowRun] representation of the flow run
    """
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
    return schemas.core.FlowRun.parse_obj(response.json())

OrionClient.read_flow_run_states async

Query for the states of a flow run

Parameters:

Name Description Default
flow_run_id

the id of the flow run

UUID
required

Returns:

Type Description
List[prefect.orion.schemas.states.State]

a list of State model representation of the flow run states

Source code in prefect/client.py
async def read_flow_run_states(
    self, flow_run_id: UUID
) -> List[schemas.states.State]:
    """
    Query for the states of a flow run

    Args:
        flow_run_id: the id of the flow run

    Returns:
        a list of [State model][prefect.orion.schemas.states.State] representation
            of the flow run states
    """
    response = await self._client.get(
        "/flow_run_states/", params=dict(flow_run_id=flow_run_id)
    )
    return pydantic.parse_obj_as(List[schemas.states.State], response.json())

OrionClient.read_flow_runs async

Query Orion for flow runs. Only flow runs matching all criteria will be returned.

Parameters:

Name Description Default
flow_filter

filter criteria for flows

FlowFilter
None
flow_run_filter

filter criteria for flow runs

FlowRunFilter
None
task_run_filter

filter criteria for task runs

TaskRunFilter
None
deployment_filter

filter criteria for deployments

DeploymentFilter
None
sort

sort criteria for the flow runs

FlowRunSort
None
limit

limit for the flow run query

int
None
offset

offset for the flow run query

int
0

Returns:

Type Description
List[prefect.orion.schemas.core.FlowRun]

a list of Flow Run model representation of the flow runs

Source code in prefect/client.py
async def read_flow_runs(
    self,
    *,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.FlowRunSort = None,
    limit: int = None,
    offset: int = 0,
) -> List[schemas.core.FlowRun]:
    """
    Query Orion for flow runs. Only flow runs matching all criteria will
    be returned.

    Args:
        flow_filter: filter criteria for flows
        flow_run_filter: filter criteria for flow runs
        task_run_filter: filter criteria for task runs
        deployment_filter: filter criteria for deployments
        sort: sort criteria for the flow runs
        limit: limit for the flow run query
        offset: offset for the flow run query

    Returns:
        a list of [Flow Run model][prefect.orion.schemas.core.FlowRun] representation
            of the flow runs
    """
    body = {
        "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
        "flow_runs": (
            flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
        ),
        "task_runs": (
            task_run_filter.dict(json_compatible=True) if task_run_filter else None
        ),
        "deployments": (
            deployment_filter.dict(json_compatible=True)
            if deployment_filter
            else None
        ),
        "sort": sort,
        "limit": limit,
        "offset": offset,
    }

    response = await self._client.post(f"/flow_runs/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.FlowRun], response.json())

OrionClient.read_flows async

Query Orion for flows. Only flows matching all criteria will be returned.

Parameters:

Name Description Default
flow_filter

filter criteria for flows

FlowFilter
None
flow_run_filter

filter criteria for flow runs

FlowRunFilter
None
task_run_filter

filter criteria for task runs

TaskRunFilter
None
deployment_filter

filter criteria for deployments

DeploymentFilter
None
limit

limit for the flow query

int
None
offset

offset for the flow query

int
0

Returns:

Type Description
List[prefect.orion.schemas.core.Flow]

a list of Flow model representation of the flows

Source code in prefect/client.py
async def read_flows(
    self,
    *,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    limit: int = None,
    offset: int = 0,
) -> List[schemas.core.Flow]:
    """
    Query Orion for flows. Only flows matching all criteria will
    be returned.

    Args:
        flow_filter: filter criteria for flows
        flow_run_filter: filter criteria for flow runs
        task_run_filter: filter criteria for task runs
        deployment_filter: filter criteria for deployments
        limit: limit for the flow query
        offset: offset for the flow query

    Returns:
        a list of [Flow model][prefect.orion.schemas.core.Flow] representation
            of the flows
    """
    body = {
        "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
        "flow_runs": (
            flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
        ),
        "task_runs": (
            task_run_filter.dict(json_compatible=True) if task_run_filter else None
        ),
        "deployments": (
            deployment_filter.dict(json_compatible=True)
            if deployment_filter
            else None
        ),
        "limit": limit,
        "offset": offset,
    }

    response = await self._client.post(f"/flows/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.Flow], response.json())

OrionClient.read_logs async

Read flow and task run logs.

Source code in prefect/client.py
async def read_logs(
    self, log_filter: LogFilter = None, limit: int = None, offset: int = None
) -> None:
    """
    Read flow and task run logs.
    """
    body = {
        "filter": log_filter.dict(json_compatible=True) if log_filter else None,
        "limit": limit,
        "offset": offset,
    }

    response = await self._client.post(f"/logs/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.Log], response.json())

OrionClient.read_task_run async

Query Orion for a task run by id.

Parameters:

Name Description Default
task_run_id

the task run ID of interest

UUID
required

Returns:

Type Description
TaskRun

a Task Run model representation of the task run

Source code in prefect/client.py
async def read_task_run(self, task_run_id: UUID) -> schemas.core.TaskRun:
    """
    Query Orion for a task run by id.

    Args:
        task_run_id: the task run ID of interest

    Returns:
        a [Task Run model][prefect.orion.schemas.core.TaskRun] representation of the task run
    """
    response = await self._client.get(f"/task_runs/{task_run_id}")
    return schemas.core.TaskRun.parse_obj(response.json())

OrionClient.read_task_run_states async

Query for the states of a task run

Parameters:

Name Description Default
task_run_id

the id of the task run

UUID
required

Returns:

Type Description
List[prefect.orion.schemas.states.State]

a list of State model representation of the task run states

Source code in prefect/client.py
async def read_task_run_states(
    self, task_run_id: UUID
) -> List[schemas.states.State]:
    """
    Query for the states of a task run

    Args:
        task_run_id: the id of the task run

    Returns:
        a list of [State model][prefect.orion.schemas.states.State] representation
            of the task run states
    """
    response = await self._client.get(
        "/task_run_states/", params=dict(task_run_id=task_run_id)
    )
    return pydantic.parse_obj_as(List[schemas.states.State], response.json())

OrionClient.read_task_runs async

Query Orion for task runs. Only task runs matching all criteria will be returned.

Parameters:

Name Description Default
flow_filter

filter criteria for flows

FlowFilter
None
flow_run_filter

filter criteria for flow runs

FlowRunFilter
None
task_run_filter

filter criteria for task runs

TaskRunFilter
None
deployment_filter

filter criteria for deployments

DeploymentFilter
None
sort

sort criteria for the task runs

TaskRunSort
None
limit

a limit for the task run query

int
None
offset

an offset for the task run query

int
0

Returns:

Type Description
List[prefect.orion.schemas.core.TaskRun]

a list of Task Run model representation of the task runs

Source code in prefect/client.py
async def read_task_runs(
    self,
    *,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.TaskRunSort = None,
    limit: int = None,
    offset: int = 0,
) -> List[schemas.core.TaskRun]:
    """
    Query Orion for task runs. Only task runs matching all criteria will
    be returned.

    Args:
        flow_filter: filter criteria for flows
        flow_run_filter: filter criteria for flow runs
        task_run_filter: filter criteria for task runs
        deployment_filter: filter criteria for deployments
        sort: sort criteria for the task runs
        limit: a limit for the task run query
        offset: an offset for the task run query

    Returns:
        a list of [Task Run model][prefect.orion.schemas.core.TaskRun] representation
            of the task runs
    """
    body = {
        "flows": (flow_filter.dict(json_compatible=True) if flow_filter else None),
        "flow_runs": (
            flow_run_filter.dict(json_compatible=True) if flow_run_filter else None
        ),
        "task_runs": (
            task_run_filter.dict(json_compatible=True) if task_run_filter else None
        ),
        "deployments": (
            deployment_filter.dict(json_compatible=True)
            if deployment_filter
            else None
        ),
        "sort": sort,
        "limit": limit,
        "offset": offset,
    }
    response = await self._client.post(f"/task_runs/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.TaskRun], response.json())

OrionClient.read_work_queue async

Read a work queue.

Parameters:

Name Description Default
id

the id of the work queue to load

UUID
required

Exceptions:

Type Description
prefect.exceptions.ObjectNotFound

If request returns 404

httpx.RequestError

If request fails

Returns:

Type Description
WorkQueue

an instantiated WorkQueue object

Source code in prefect/client.py
async def read_work_queue(
    self,
    id: UUID,
) -> schemas.core.WorkQueue:
    """
    Read a work queue.

    Args:
        id: the id of the work queue to load

    Raises:
        prefect.exceptions.ObjectNotFound: If request returns 404
        httpx.RequestError: If request fails

    Returns:
        WorkQueue: an instantiated WorkQueue object
    """
    try:
        response = await self._client.get(f"/work_queues/{id}")
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise
    return schemas.core.WorkQueue.parse_obj(response.json())

OrionClient.read_work_queue_by_name async

Read a work queue by name.

Parameters:

Name Description Default
name

a unique name for the work queue

str
required

Exceptions:

Type Description
httpx.StatusError

if no work queue is found

Returns:

Type Description
schemas.core.WorkQueue

a work queue API object

Source code in prefect/client.py
async def read_work_queue_by_name(self, name: str) -> schemas.core.WorkQueue:
    """
    Read a work queue by name.

    Args:
        name (str): a unique name for the work queue

    Raises:
        httpx.StatusError: if no work queue is found

    Returns:
        schemas.core.WorkQueue: a work queue API object
    """
    response = await self._client.get(f"/work_queues/name/{name}")
    return schemas.core.WorkQueue.parse_obj(response.json())

OrionClient.read_work_queues async

Query Orion for work queues.

Parameters:

Name Description Default
limit

a limit for the query

int
None
offset

an offset for the query

int
0

Returns:

Type Description
List[prefect.orion.schemas.core.WorkQueue]

a list of WorkQueue model representations of the work queues

Source code in prefect/client.py
async def read_work_queues(
    self,
    limit: int = None,
    offset: int = 0,
) -> List[schemas.core.WorkQueue]:
    """
    Query Orion for work queues.

    Args:
        limit: a limit for the query
        offset: an offset for the query

    Returns:
        a list of [WorkQueue model][prefect.orion.schemas.core.WorkQueue] representations
            of the work queues
    """
    body = {
        "limit": limit,
        "offset": offset,
    }
    response = await self._client.post(f"/work_queues/filter", json=body)
    return pydantic.parse_obj_as(List[schemas.core.WorkQueue], response.json())

OrionClient.resolve_datadoc async

Recursively decode possibly nested data documents.

"orion" encoded documents will be retrieved from the server.

Parameters:

Name Description Default
datadoc

The data document to resolve

DataDocument
required

Returns:

Type Description
Any

a decoded object, the innermost data

Source code in prefect/client.py
async def resolve_datadoc(self, datadoc: DataDocument) -> Any:
    """
    Recursively decode possibly nested data documents.

    "orion" encoded documents will be retrieved from the server.

    Args:
        datadoc: The data document to resolve

    Returns:
        a decoded object, the innermost data
    """
    if not isinstance(datadoc, DataDocument):
        raise TypeError(
            f"`resolve_datadoc` received invalid type {type(datadoc).__name__}"
        )

    async def resolve_inner(data):
        if isinstance(data, bytes):
            try:
                data = DataDocument.parse_raw(data)
            except pydantic.ValidationError:
                return data

        if isinstance(data, DataDocument):
            if data.encoding == "blockstorage":
                data = await self.retrieve_data(data)
            else:
                data = data.decode()
            return await resolve_inner(data)

        return data

    return await resolve_inner(datadoc)

OrionClient.retrieve_data async

Exchange a storage data document for the data previously persisted.

Parameters:

Name Description Default
data_document

The data document used to store data.

DataDocument
required

Returns:

Type Description
bytes

The persisted data in bytes.

Source code in prefect/client.py
async def retrieve_data(
    self,
    data_document: DataDocument,
) -> bytes:
    """
    Exchange a storage data document for the data previously persisted.

    Args:
        data_document: The data document used to store data.

    Returns:
        The persisted data in bytes.
    """
    block_document = data_document.decode()
    embedded_datadoc = block_document["data"]
    block_id = block_document["block_id"]
    if block_id is not None:
        storage_block = await self.read_block(block_id)
    else:
        storage_block = TempStorageBlock()
    return await storage_block.read(embedded_datadoc)

OrionClient.retrieve_object async

Exchange a data document for the object previously persisted.

Parameters:

Name Description Default
storage_datadoc

The storage data document to retrieve.

DataDocument
required

Returns:

Type Description
Any

the persisted object

Source code in prefect/client.py
async def retrieve_object(self, storage_datadoc: DataDocument) -> Any:
    """
    Exchange a data document for the object previously persisted.

    Args:
        storage_datadoc: The storage data document to retrieve.

    Returns:
        the persisted object
    """
    datadoc = DataDocument.parse_raw(await self.retrieve_data(storage_datadoc))
    return datadoc.decode()

OrionClient.set_flow_run_state async

Set the state of a flow run.

Parameters:

Name Description Default
flow_run_id

the id of the flow run

UUID
required
state

the state to set

State
required
force

if True, disregard orchestration logic when setting the state, forcing the Orion API to accept the state

bool
False
backend_state_data

an optional data document representing the state's data, if provided it will override state.data

DataDocument
None

Returns:

Type Description
OrchestrationResult

a OrchestrationResult model representation of state orchestration output

Source code in prefect/client.py
async def set_flow_run_state(
    self,
    flow_run_id: UUID,
    state: schemas.states.State,
    force: bool = False,
    backend_state_data: schemas.data.DataDocument = None,
) -> OrchestrationResult:
    """
    Set the state of a flow run.

    Args:
        flow_run_id: the id of the flow run
        state: the state to set
        force: if True, disregard orchestration logic when setting the state,
            forcing the Orion API to accept the state
        backend_state_data: an optional data document representing the state's data,
            if provided it will override `state.data`

    Returns:
        a [OrchestrationResult model][prefect.orion.orchestration.rules.OrchestrationResult]
            representation of state orchestration output
    """
    state_data = schemas.actions.StateCreate(
        type=state.type,
        name=state.name,
        message=state.message,
        data=backend_state_data or state.data,
        state_details=state.state_details,
    )
    state_data.state_details.flow_run_id = flow_run_id

    # Attempt to serialize the given data
    try:
        state_data_json = state_data.dict(json_compatible=True)
    except TypeError:
        # Drop the user data
        state_data.data = None
        state_data_json = state_data.dict(json_compatible=True)

    response = await self._client.post(
        f"/flow_runs/{flow_run_id}/set_state",
        json=dict(state=state_data_json, force=force),
    )
    return OrchestrationResult.parse_obj(response.json())

OrionClient.set_task_run_state async

Set the state of a task run.

Parameters:

Name Description Default
task_run_id

the id of the task run

UUID
required
state

the state to set

State
required
force

if True, disregard orchestration logic when setting the state, forcing the Orion API to accept the state

bool
False
backend_state_data

an optional orion data document representing the state's data, if provided it will override state.data

DataDocument
None

Returns:

Type Description
OrchestrationResult

a OrchestrationResult model representation of state orchestration output

Source code in prefect/client.py
async def set_task_run_state(
    self,
    task_run_id: UUID,
    state: schemas.states.State,
    force: bool = False,
    backend_state_data: schemas.data.DataDocument = None,
) -> OrchestrationResult:
    """
    Set the state of a task run.

    Args:
        task_run_id: the id of the task run
        state: the state to set
        force: if True, disregard orchestration logic when setting the state,
            forcing the Orion API to accept the state
        backend_state_data: an optional orion data document representing the state's data,
            if provided it will override `state.data`

    Returns:
        a [OrchestrationResult model][prefect.orion.orchestration.rules.OrchestrationResult]
            representation of state orchestration output
    """
    state_data = schemas.actions.StateCreate(
        name=state.name,
        type=state.type,
        message=state.message,
        data=backend_state_data or state.data,
        state_details=state.state_details,
    )
    state_data.state_details.task_run_id = task_run_id

    # Attempt to serialize the given data
    try:
        state_data_json = state_data.dict(json_compatible=True)
    except TypeError:
        # Drop the user data
        state_data.data = None
        state_data_json = state_data.dict(json_compatible=True)

    response = await self._client.post(
        f"/task_runs/{task_run_id}/set_state",
        json=dict(state=state_data_json, force=force),
    )
    return OrchestrationResult.parse_obj(response.json())

OrionClient.update_flow_run async

Update a flow run's details.

Parameters:

Name Description Default
flow_run_id

the run ID to update

UUID
required
flow_version

a new version string for the flow run

str
None
parameters

a dictionary of updated parameter values for the run

dict
None
name

a new name for the flow run

str
None

Returns:

Type Description
None

an httpx.Response object from the PATCH request

Source code in prefect/client.py
async def update_flow_run(
    self,
    flow_run_id: UUID,
    flow_version: str = None,
    parameters: dict = None,
    name: str = None,
) -> None:
    """
    Update a flow run's details.

    Args:
        flow_run_id: the run ID to update
        flow_version: a new version string for the flow run
        parameters: a dictionary of updated parameter values for the run
        name: a new name for the flow run

    Returns:
        an `httpx.Response` object from the PATCH request
    """
    params = {}
    if flow_version is not None:
        params["flow_version"] = flow_version
    if parameters is not None:
        params["parameters"] = parameters
    if name is not None:
        params["name"] = name

    flow_run_data = schemas.actions.FlowRunUpdate(**params)

    return await self._client.patch(
        f"/flow_runs/{flow_run_id}",
        json=flow_run_data.dict(json_compatible=True, exclude_unset=True),
    )

OrionClient.update_work_queue async

Update properties of a work queue.

Parameters:

Name Description Default
id

the ID of the work queue to update

UUID
required
**kwargs

the fields to update

{}

Exceptions:

Type Description
ValueError

if no kwargs are provided

prefect.exceptions.ObjectNotFound

if request returns 404

httpx.RequestError

if the request fails

Source code in prefect/client.py
async def update_work_queue(self, id: UUID, **kwargs):
    """
    Update properties of a work queue.

    Args:
        id: the ID of the work queue to update
        **kwargs: the fields to update

    Raises:
        ValueError: if no kwargs are provided
        prefect.exceptions.ObjectNotFound: if request returns 404
        httpx.RequestError: if the request fails

    """
    if not kwargs:
        raise ValueError("No fields provided to update.")

    data = WorkQueueUpdate(**kwargs).dict(json_compatible=True, exclude_unset=True)
    try:
        await self._client.patch(f"/work_queues/{id}", json=data)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
        else:
            raise

PrefectHttpxClient

A Prefect wrapper for the async httpx client with support for CloudFlare-style rate limiting.

Additionally, this client will always call raise_for_status on responses.

For more details on rate limit headers, see: - https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI

Source code in prefect/client.py
class PrefectHttpxClient(httpx.AsyncClient):
    """
    A Prefect wrapper for the async httpx client with support for CloudFlare-style
    rate limiting.

    Additionally, this client will always call `raise_for_status` on responses.

    For more details on rate limit headers, see:
    - https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI
    """

    RETRY_MAX = 5

    async def send(self, *args, **kwargs) -> Response:
        retry_count = 0
        response = await super().send(*args, **kwargs)

        while (
            response.status_code == status.HTTP_429_TOO_MANY_REQUESTS
            and retry_count < self.RETRY_MAX
        ):
            retry_count += 1

            # Respect the "Retry-After" header, falling back to an exponential back-off
            retry_after = response.headers.get("Retry-After")
            if retry_after:
                retry_seconds = float(retry_after)
            else:
                retry_seconds = 2**retry_count

            await anyio.sleep(retry_seconds)
            response = await super().send(*args, **kwargs)

        # Always raise bad responses
        # NOTE: We may want to remove this and handle responses per route in the
        #       `OrionClient`
        response.raise_for_status()

        return response

PrefectHttpxClient.send async

Send a request.

The request is sent as-is, unmodified.

Typically you'll want to build one with AsyncClient.build_request() so that any client-level configuration is merged into the request, but passing an explicit httpx.Request() is supported as well.

See also: Request instances

Source code in prefect/client.py
async def send(self, *args, **kwargs) -> Response:
    retry_count = 0
    response = await super().send(*args, **kwargs)

    while (
        response.status_code == status.HTTP_429_TOO_MANY_REQUESTS
        and retry_count < self.RETRY_MAX
    ):
        retry_count += 1

        # Respect the "Retry-After" header, falling back to an exponential back-off
        retry_after = response.headers.get("Retry-After")
        if retry_after:
            retry_seconds = float(retry_after)
        else:
            retry_seconds = 2**retry_count

        await anyio.sleep(retry_seconds)
        response = await super().send(*args, **kwargs)

    # Always raise bad responses
    # NOTE: We may want to remove this and handle responses per route in the
    #       `OrionClient`
    response.raise_for_status()

    return response

app_lifespan_context

A context manager that calls startup/shutdown hooks for the given application.

Lifespan contexts are cached per application to avoid calling the lifespan hooks more than once if the context is entered in nested code. A no-op context will be returned if the context for the given application is already being managed.

This manager is robust to concurrent access within the event loop. For example, if you have concurrent contexts for the same application, it is guaranteed that startup hooks will be called before their context starts and shutdown hooks will only be called after their context exits.

A reference count is used to support nested use of clients without running lifespan hooks excessively. The first client context entered will create and enter a lifespan context. Each subsequent client will increment a reference count but will not create a new lifespan context. When each client context exits, the reference count is decremented. When the last client context exits, the lifespan will be closed.

In simple nested cases, the first client context will be the one to exit the lifespan. However, if client contexts are entered concurrently they may not exit in a consistent order. If the first client context was responsible for closing the lifespan, it would have to wait until all other client contexts to exit to avoid firing shutdown hooks while the application is in use. Waiting for the other clients to exit can introduce deadlocks, so, instead, the first client will exit without closing the lifespan context and reference counts will be used to ensure the lifespan is closed once all of the clients are done.

Source code in prefect/client.py
@asynccontextmanager
async def app_lifespan_context(app: FastAPI) -> ContextManager[None]:
    """
    A context manager that calls startup/shutdown hooks for the given application.

    Lifespan contexts are cached per application to avoid calling the lifespan hooks
    more than once if the context is entered in nested code. A no-op context will be
    returned if the context for the given application is already being managed.

    This manager is robust to concurrent access within the event loop. For example,
    if you have concurrent contexts for the same application, it is guaranteed that
    startup hooks will be called before their context starts and shutdown hooks will
    only be called after their context exits.

    A reference count is used to support nested use of clients without running
    lifespan hooks excessively. The first client context entered will create and enter
    a lifespan context. Each subsequent client will increment a reference count but will
    not create a new lifespan context. When each client context exits, the reference
    count is decremented. When the last client context exits, the lifespan will be
    closed.

    In simple nested cases, the first client context will be the one to exit the
    lifespan. However, if client contexts are entered concurrently they may not exit
    in a consistent order. If the first client context was responsible for closing
    the lifespan, it would have to wait until all other client contexts to exit to
    avoid firing shutdown hooks while the application is in use. Waiting for the other
    clients to exit can introduce deadlocks, so, instead, the first client will exit
    without closing the lifespan context and reference counts will be used to ensure
    the lifespan is closed once all of the clients are done.
    """
    # TODO: A deadlock has been observed during multithreaded use of clients while this
    #       lifespan context is being used. This has only been reproduced on Python 3.7
    #       and while we hope to discourage using multiple event loops in threads, this
    #       bug may emerge again.
    #       See https://github.com/PrefectHQ/orion/pull/1696
    thread_id = threading.get_ident()

    # The id of the application is used instead of the hash so each application instance
    # is managed independently even if they share the same settings. We include the
    # thread id since applications are managed separately per thread.
    key = (thread_id, id(app))

    # On exception, this will be populated with exception details
    exc_info = (None, None, None)

    # Get a lock unique to this thread since anyio locks are not threadsafe
    lock = APP_LIFESPANS_LOCKS[thread_id]

    async with lock:
        if key in APP_LIFESPANS:
            # The lifespan is already being managed, just increment the reference count
            APP_LIFESPANS_REF_COUNTS[key] += 1
        else:
            # Create a new lifespan manager
            APP_LIFESPANS[key] = context = LifespanManager(app)
            APP_LIFESPANS_REF_COUNTS[key] = 1

            # Ensure we enter the context before releasing the lock so startup hooks
            # are complete before another client can be used
            await context.__aenter__()

    try:
        yield
    except BaseException:
        exc_info = sys.exc_info()
        raise
    finally:
        # If we do not shield against anyio cancellation, the lock will return
        # immediately and the code in its context will not run, leaving the lifespan
        # open
        with anyio.CancelScope(shield=True):
            async with lock:
                # After the consumer exits the context, decrement the reference count
                APP_LIFESPANS_REF_COUNTS[key] -= 1

                # If this the last context to exit, close the lifespan
                if APP_LIFESPANS_REF_COUNTS[key] <= 0:
                    APP_LIFESPANS_REF_COUNTS.pop(key)
                    context = APP_LIFESPANS.pop(key)
                    await context.__aexit__(*exc_info)

inject_client

Simple helper to provide a context managed client to a asynchronous function.

The decorated function must take a client kwarg and if a client is passed when called it will be used instead of creating a new one, but it will not be context managed as it is assumed that the caller is managing the context.

Source code in prefect/client.py
def inject_client(fn):
    """
    Simple helper to provide a context managed client to a asynchronous function.

    The decorated function _must_ take a `client` kwarg and if a client is passed when
    called it will be used instead of creating a new one, but it will not be context
    managed as it is assumed that the caller is managing the context.
    """

    @wraps(fn)
    async def with_injected_client(*args, **kwargs):
        client = None

        if "client" in kwargs and kwargs["client"] is not None:
            client = kwargs["client"]
            client_context = asyncnullcontext()
        else:
            kwargs.pop("client", None)  # Remove null values
            client_context = get_client()

        async with client_context as client:
            kwargs.setdefault("client", client)
            return await fn(*args, **kwargs)

    return with_injected_client