Skip to content

prefect.cli.agent

Command line interface for working with agent services

start async

Start an agent process to poll one or more work queues for flow runs.

Source code in prefect/cli/agent.py
@agent_app.command()
async def start(
    # deprecated main argument
    work_queue: str = typer.Argument(
        None,
        show_default=False,
        help="DEPRECATED: A work queue name or ID",
    ),
    work_queues: List[str] = typer.Option(
        None,
        "-q",
        "--work-queue",
        help="One or more work queue names for the agent to pull from.",
    ),
    hide_welcome: bool = typer.Option(False, "--hide-welcome"),
    api: str = SettingsOption(PREFECT_API_URL),
    # deprecated tags
    tags: List[str] = typer.Option(
        None,
        "-t",
        "--tag",
        help="DEPRECATED: One or more optional tags that will be used to create a work queue",
    ),
):
    """
    Start an agent process to poll one or more work queues for flow runs.
    """
    work_queues = work_queues or []

    if work_queue is not None:
        # try to treat the work_queue as a UUID
        try:
            async with get_client() as client:
                q = await client.read_work_queue(UUID(work_queue))
                work_queue = q.name
        # otherwise treat it as a string name
        except (TypeError, ValueError):
            pass
        work_queues.append(work_queue)
        app.console.print(
            "Agents now support multiple work queues. Instead of passing a single argument, provide work queue names "
            f"with the `-q` or `--work-queue` flag: `prefect agent start -q {work_queue}`\n",
            style="blue",
        )

    if not work_queues and not tags:
        exit_with_error("No work queues provided!", style="red")
    elif work_queues and tags:
        exit_with_error(
            "Either `work_queues` or `tags` can be provided, but not both.", style="red"
        )

    if tags:
        work_queue_name = f"Agent queue {'-'.join(sorted(tags))}"
        app.console.print(
            "`tags` are deprecated. For backwards-compatibility with old "
            f"versions of Prefect, this agent will create a work queue named `{work_queue_name}` "
            "that uses legacy tag-based matching.",
            style="red",
        )

        async with get_client() as client:
            try:
                work_queue = await client.read_work_queue_by_name(work_queue_name)
                if work_queue.filter is None:
                    # ensure the work queue has legacy (deprecated) tag-based behavior
                    await client.update_work_queue(filter=dict(tags=tags))
            except ObjectNotFound:
                # if the work queue doesn't already exist, we create it with tags
                # to enable legacy (deprecated) tag-matching behavior
                await client.create_work_queue(name=work_queue_name, tags=tags)

        work_queues = [work_queue_name]

    if not hide_welcome:
        if api:
            app.console.print(
                f"Starting v{prefect.__version__} agent connected to {api}..."
            )
        else:
            app.console.print(
                f"Starting v{prefect.__version__} agent with ephemeral API..."
            )

    async with OrionAgent(work_queues=work_queues) as agent:
        if not hide_welcome:
            app.console.print(ascii_name)
            app.console.print(
                "Agent started! Looking for work from "
                f"queue(s): {', '.join(work_queues)}..."
            )

        await critical_service_loop(
            agent.get_and_submit_flow_runs,
            PREFECT_AGENT_QUERY_INTERVAL.value(),
            printer=app.console.print,
        )

    app.console.print("Agent stopped!")