Skip to content

prefect.cli.concurrency_limit

Command line interface for working with concurrency limits.

create async

Create a concurrency limit against a tag.

This limit controls how many task runs with that tag may simultaneously be in a Running state.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/cli/concurrency_limit.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@concurrency_limit_app.command()
async def create(tag: str, concurrency_limit: int):
    """
    Create a concurrency limit against a tag.

    This limit controls how many task runs with that tag may simultaneously be in a
    Running state.
    """

    async with get_client() as client:
        await client.create_concurrency_limit(
            tag=tag, concurrency_limit=concurrency_limit
        )
        await client.read_concurrency_limit_by_tag(tag)

    app.console.print(
        textwrap.dedent(
            f"""
            Created concurrency limit with properties:
                tag - {tag!r}
                concurrency_limit - {concurrency_limit}

            Delete the concurrency limit:
                prefect concurrency-limit delete {tag!r}

            Inspect the concurrency limit:
                prefect concurrency-limit inspect {tag!r}
        """
        )
    )

delete async

Delete the concurrency limit set on the specified tag.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/cli/concurrency_limit.py
147
148
149
150
151
152
153
154
155
156
157
158
159
@concurrency_limit_app.command()
async def delete(tag: str):
    """
    Delete the concurrency limit set on the specified tag.
    """

    async with get_client() as client:
        try:
            await client.delete_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    exit_with_success(f"Deleted concurrency limit set on the tag: {tag}")

inspect async

View details about a concurrency limit. active_slots shows a list of TaskRun IDs which are currently using a concurrency slot.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/cli/concurrency_limit.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@concurrency_limit_app.command()
async def inspect(tag: str):
    """
    View details about a concurrency limit. `active_slots` shows a list of TaskRun IDs
    which are currently using a concurrency slot.
    """

    async with get_client() as client:
        try:
            result = await client.read_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    trid_table = Table()
    trid_table.add_column("Active Task Run IDs", style="cyan", no_wrap=True)

    cl_table = Table(title=f"Concurrency Limit ID: [red]{str(result.id)}")
    cl_table.add_column("Tag", style="green", no_wrap=True)
    cl_table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    cl_table.add_column("Created", style="magenta", no_wrap=True)
    cl_table.add_column("Updated", style="magenta", no_wrap=True)

    for trid in sorted(result.active_slots):
        trid_table.add_row(str(trid))

    cl_table.add_row(
        str(result.tag),
        str(result.concurrency_limit),
        Pretty(pendulum.instance(result.created).diff_for_humans()),
        Pretty(pendulum.instance(result.updated).diff_for_humans()),
    )

    group = Group(
        cl_table,
        trid_table,
    )
    app.console.print(Panel(group, expand=False))

ls async

View all concurrency limits.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/cli/concurrency_limit.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@concurrency_limit_app.command()
async def ls(limit: int = 15, offset: int = 0):
    """
    View all concurrency limits.
    """
    table = Table(
        title="Concurrency Limits",
        caption="inspect a concurrency limit to show active task run IDs",
    )
    table.add_column("Tag", style="green", no_wrap=True)
    table.add_column("ID", justify="right", style="cyan", no_wrap=True)
    table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    table.add_column("Active Task Runs", style="magenta", no_wrap=True)

    async with get_client() as client:
        concurrency_limits = await client.read_concurrency_limits(
            limit=limit, offset=offset
        )

    for cl in sorted(concurrency_limits, key=lambda c: c.updated, reverse=True):
        table.add_row(
            str(cl.tag),
            str(cl.id),
            str(cl.concurrency_limit),
            str(len(cl.active_slots)),
        )

    app.console.print(table)

reset async

Resets the concurrency limit slots set on the specified tag.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/cli/concurrency_limit.py
132
133
134
135
136
137
138
139
140
141
142
143
144
@concurrency_limit_app.command()
async def reset(tag: str):
    """
    Resets the concurrency limit slots set on the specified tag.
    """

    async with get_client() as client:
        try:
            await client.reset_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    exit_with_success(f"Reset concurrency limit set on the tag: {tag}")