Skip to content

prefect.cli.concurrency_limit

Command line interface for working with concurrency limits.

create async

Create a concurrency limit against a tag.

This limit controls how many task runs with that tag may simultaneously be in a Running state.

Source code in prefect/cli/concurrency_limit.py
@concurrency_limit_app.command()
async def create(tag: str, concurrency_limit: int):
    """
    Create a concurrency limit against a tag.

    This limit controls how many task runs with that tag may simultaneously be in a
    Running state.
    """

    async with get_client() as client:
        await client.create_concurrency_limit(
            tag=tag, concurrency_limit=concurrency_limit
        )
        result = await client.read_concurrency_limit_by_tag(tag)

    app.console.print(Pretty(result))

delete async

Delete the concurrency limit set on the specified tag.

Source code in prefect/cli/concurrency_limit.py
@concurrency_limit_app.command()
async def delete(tag: str):
    """
    Delete the concurrency limit set on the specified tag.
    """

    async with get_client() as client:
        try:
            await client.delete_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    exit_with_success(f"Deleted concurrency limit set on the tag: {tag}")

inspect async

View details about a concurrency limit. active_slots shows a list of TaskRun IDs which are currently using a concurrency slot.

Source code in prefect/cli/concurrency_limit.py
@concurrency_limit_app.command()
async def inspect(tag: str):
    """
    View details about a concurrency limit. `active_slots` shows a list of TaskRun IDs
    which are currently using a concurrency slot.
    """

    async with get_client() as client:
        try:
            result = await client.read_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    trid_table = Table()
    trid_table.add_column("Active Task Run IDs", style="cyan", no_wrap=True)

    cl_table = Table(title=f"Concurrency Limit ID: [red]{str(result.id)}")
    cl_table.add_column("Tag", style="green", no_wrap=True)
    cl_table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    cl_table.add_column("Created", style="magenta", no_wrap=True)
    cl_table.add_column("Updated", style="magenta", no_wrap=True)

    for trid in result.active_slots:
        trid_table.add_row(str(trid))

    cl_table.add_row(
        str(result.tag),
        str(result.concurrency_limit),
        Pretty(pendulum.instance(result.created).diff_for_humans()),
        Pretty(pendulum.instance(result.updated).diff_for_humans()),
    )

    group = Group(
        cl_table,
        trid_table,
    )
    app.console.print(Panel(group, expand=False))

ls async

View all concurrency limits.

Source code in prefect/cli/concurrency_limit.py
@concurrency_limit_app.command()
async def ls(limit: int = 15, offset: int = 0):
    """
    View all concurrency limits.
    """
    table = Table(
        title="Concurrency Limits",
        caption="inspect a concurrency limit to show active task run IDs",
    )
    table.add_column("Tag", style="green", no_wrap=True)
    table.add_column("ID", justify="right", style="cyan", no_wrap=True)
    table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    table.add_column("Active Task Runs", style="magenta", no_wrap=True)

    async with get_client() as client:
        concurrency_limits = await client.read_concurrency_limits(
            limit=limit, offset=offset
        )

    for cl in sorted(concurrency_limits, key=lambda c: c.updated, reverse=True):
        table.add_row(
            str(cl.tag),
            str(cl.id),
            str(cl.concurrency_limit),
            str(len(cl.active_slots)),
        )

    app.console.print(table)