Skip to content

prefect.agent

The agent is responsible for checking for flow runs that are ready to run and starting their execution.

OrionAgent

Source code in prefect/agent.py
class OrionAgent:
    def __init__(
        self,
        work_queue_id: UUID = None,
        work_queue_name: str = None,
        prefetch_seconds: int = None,
    ) -> None:
        if not work_queue_id and not work_queue_name:
            raise ValueError(
                "Either work_queue_id or work_queue_name must be provided."
            )
        if work_queue_id and work_queue_name:
            raise ValueError("Provide only one of work_queue_id and work_queue_name.")
        self.work_queue_id = work_queue_id
        self.work_queue_name = work_queue_name
        self.prefetch_seconds = prefetch_seconds
        self.submitting_flow_run_ids = set()
        self.started = False
        self.logger = get_logger("agent")
        self.task_group: Optional[TaskGroup] = None
        self.client: Optional[OrionClient] = None

    async def work_queue_id_from_name(self) -> Optional[UUID]:
        """
        For agents that were provided a work_queue_name, rather than a work_queue_id,
        this function will retrieve the work queue ID corresponding to that name.
        If no matching queue is found, a warning is logged and `None` is returned.
        """
        if not self.work_queue_name:
            raise ValueError("No work queue name provided.")
        try:
            work_queue = await self.client.read_work_queue_by_name(self.work_queue_name)
            return work_queue.id
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == status.HTTP_404_NOT_FOUND:
                self.logger.warning(
                    f"No work queue found named {self.work_queue_name!r}"
                )
                return None
            else:
                raise

    async def get_and_submit_flow_runs(self) -> List[FlowRun]:
        """
        The principle method on agents. Queries for scheduled flow runs and submits them for execution in parallel.
        """
        if not self.started:
            raise RuntimeError("Agent is not started. Use `async with OrionAgent()...`")

        self.logger.debug("Checking for flow runs...")

        before = pendulum.now("utc").add(
            seconds=self.prefetch_seconds or PREFECT_AGENT_PREFETCH_SECONDS.value()
        )

        # Use the work queue id or load one from the name
        work_queue_id = self.work_queue_id or await self.work_queue_id_from_name()
        if not work_queue_id:
            return []

        try:
            submittable_runs = await self.client.get_runs_in_work_queue(
                id=work_queue_id, limit=10, scheduled_before=before
            )
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == status.HTTP_404_NOT_FOUND:
                raise ValueError(
                    f"No work queue found with id '{work_queue_id}'"
                ) from None
            else:
                raise

        for flow_run in submittable_runs:
            self.logger.info(f"Submitting flow run '{flow_run.id}'")

            # don't resubmit a run
            if flow_run.id in self.submitting_flow_run_ids:
                continue

            self.submitting_flow_run_ids.add(flow_run.id)
            self.task_group.start_soon(
                self.submit_run,
                flow_run,
            )
        return submittable_runs

    def get_flow_runner(self, flow_run: FlowRun):
        # TODO: Here, the agent may merge settings with those contained in the
        #       flow_run.flow_runner settings object

        flow_runner_settings = flow_run.flow_runner.copy() or FlowRunnerSettings()
        if not flow_runner_settings.type or flow_runner_settings.type == "universal":
            flow_runner_settings.type = "subprocess"

        return FlowRunner.from_settings(flow_runner_settings)

    async def submit_run(self, flow_run: FlowRun) -> None:
        """
        Submit a flow run to the flow runner
        """
        ready_to_submit = await self._propose_pending_state(flow_run)

        if ready_to_submit:
            # Successfully entered a pending state; submit to flow runner
            flow_runner = self.get_flow_runner(flow_run)

            try:
                # Wait for submission to be completed. Note that the submission function
                # may continue to run in the background after this exits.
                await self.task_group.start(flow_runner.submit_flow_run, flow_run)
                self.logger.info(f"Completed submission of flow run '{flow_run.id}'")
            except Exception as exc:
                self.logger.error(
                    f"Flow runner failed to submit flow run '{flow_run.id}'",
                    exc_info=True,
                )
                await self._propose_failed_state(flow_run, exc)

        self.submitting_flow_run_ids.remove(flow_run.id)

    async def _propose_pending_state(self, flow_run: FlowRun) -> bool:
        state = flow_run.state
        try:
            state = await self.client.propose_state(Pending(), flow_run_id=flow_run.id)
        except Abort as exc:
            self.logger.info(
                f"Aborted submission of flow run '{flow_run.id}'. "
                f"Server sent an abort signal: {exc}",
            )
            return False
        except Exception as exc:
            self.logger.error(
                f"Failed to update state of flow run '{flow_run.id}'",
                exc_info=True,
            )
            return False

        if not state.is_pending():
            self.logger.info(
                f"Aborted submission of flow run '{flow_run.id}': "
                f"Server returned a non-pending state {state.type.value!r}",
            )
            return False

        return True

    async def _propose_failed_state(self, flow_run: FlowRun, exc: Exception) -> None:
        try:
            await self.client.propose_state(
                Failed(
                    message="Submission failed.",
                    data=DataDocument.encode("cloudpickle", exc),
                ),
                flow_run_id=flow_run.id,
            )
        except Abort:
            # We've already failed, no need to note the abort but we don't want it to
            # raise in the agent process
            pass
        except Exception:
            self.logger.error(
                f"Failed to update state of flow run '{flow_run.id}'",
                exc_info=True,
            )

    # Context management ---------------------------------------------------------------

    async def start(self):
        self.started = True
        self.task_group = anyio.create_task_group()
        self.client = get_client()
        await self.client.__aenter__()
        await self.task_group.__aenter__()

    async def shutdown(self, *exc_info):
        self.started = False
        await self.task_group.__aexit__(*exc_info)
        await self.client.__aexit__(*exc_info)
        self.task_group = None
        self.client = None
        self.submitting_flow_run_ids = set()

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, *exc_info):
        await self.shutdown(*exc_info)

OrionAgent.get_and_submit_flow_runs async

The principle method on agents. Queries for scheduled flow runs and submits them for execution in parallel.

Source code in prefect/agent.py
async def get_and_submit_flow_runs(self) -> List[FlowRun]:
    """
    The principle method on agents. Queries for scheduled flow runs and submits them for execution in parallel.
    """
    if not self.started:
        raise RuntimeError("Agent is not started. Use `async with OrionAgent()...`")

    self.logger.debug("Checking for flow runs...")

    before = pendulum.now("utc").add(
        seconds=self.prefetch_seconds or PREFECT_AGENT_PREFETCH_SECONDS.value()
    )

    # Use the work queue id or load one from the name
    work_queue_id = self.work_queue_id or await self.work_queue_id_from_name()
    if not work_queue_id:
        return []

    try:
        submittable_runs = await self.client.get_runs_in_work_queue(
            id=work_queue_id, limit=10, scheduled_before=before
        )
    except httpx.HTTPStatusError as exc:
        if exc.response.status_code == status.HTTP_404_NOT_FOUND:
            raise ValueError(
                f"No work queue found with id '{work_queue_id}'"
            ) from None
        else:
            raise

    for flow_run in submittable_runs:
        self.logger.info(f"Submitting flow run '{flow_run.id}'")

        # don't resubmit a run
        if flow_run.id in self.submitting_flow_run_ids:
            continue

        self.submitting_flow_run_ids.add(flow_run.id)
        self.task_group.start_soon(
            self.submit_run,
            flow_run,
        )
    return submittable_runs

OrionAgent.submit_run async

Submit a flow run to the flow runner

Source code in prefect/agent.py
async def submit_run(self, flow_run: FlowRun) -> None:
    """
    Submit a flow run to the flow runner
    """
    ready_to_submit = await self._propose_pending_state(flow_run)

    if ready_to_submit:
        # Successfully entered a pending state; submit to flow runner
        flow_runner = self.get_flow_runner(flow_run)

        try:
            # Wait for submission to be completed. Note that the submission function
            # may continue to run in the background after this exits.
            await self.task_group.start(flow_runner.submit_flow_run, flow_run)
            self.logger.info(f"Completed submission of flow run '{flow_run.id}'")
        except Exception as exc:
            self.logger.error(
                f"Flow runner failed to submit flow run '{flow_run.id}'",
                exc_info=True,
            )
            await self._propose_failed_state(flow_run, exc)

    self.submitting_flow_run_ids.remove(flow_run.id)

OrionAgent.work_queue_id_from_name async

For agents that were provided a work_queue_name, rather than a work_queue_id, this function will retrieve the work queue ID corresponding to that name. If no matching queue is found, a warning is logged and None is returned.

Source code in prefect/agent.py
async def work_queue_id_from_name(self) -> Optional[UUID]:
    """
    For agents that were provided a work_queue_name, rather than a work_queue_id,
    this function will retrieve the work queue ID corresponding to that name.
    If no matching queue is found, a warning is logged and `None` is returned.
    """
    if not self.work_queue_name:
        raise ValueError("No work queue name provided.")
    try:
        work_queue = await self.client.read_work_queue_by_name(self.work_queue_name)
        return work_queue.id
    except httpx.HTTPStatusError as exc:
        if exc.response.status_code == status.HTTP_404_NOT_FOUND:
            self.logger.warning(
                f"No work queue found named {self.work_queue_name!r}"
            )
            return None
        else:
            raise