Skip to content

prefect.client.schemas.actions

StateCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a new state.

Source code in prefect/client/schemas/actions.py
62
63
64
65
66
67
68
69
70
71
class StateCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a new state."""

    type: StateType
    name: Optional[str] = Field(default=None)
    message: Optional[str] = Field(default=None, examples=["Run started"])
    state_details: StateDetails = Field(default_factory=StateDetails)
    data: Union["BaseResult[R]", "DataDocument[R]", Any] = Field(
        default=None,
    )

FlowCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow.

Source code in prefect/client/schemas/actions.py
74
75
76
77
78
79
80
81
82
83
84
class FlowCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow."""

    name: str = Field(
        default=..., description="The name of the flow", examples=["my-flow"]
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of flow tags",
        examples=[["tag-1", "tag-2"]],
    )

FlowUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow.

Source code in prefect/client/schemas/actions.py
87
88
89
90
91
92
93
94
class FlowUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow."""

    tags: List[str] = Field(
        default_factory=list,
        description="A list of flow tags",
        examples=[["tag-1", "tag-2"]],
    )

DeploymentCreate

Bases: DeprecatedInfraOverridesField, ActionBaseModel

Data used by the Prefect REST API to create a deployment.

Source code in prefect/client/schemas/actions.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class DeploymentCreate(DeprecatedInfraOverridesField, ActionBaseModel):
    """Data used by the Prefect REST API to create a deployment."""

    @root_validator(pre=True)
    def remove_old_fields(cls, values):
        return remove_old_deployment_fields(values)

    name: str = Field(..., description="The name of the deployment.")
    flow_id: UUID = Field(..., description="The ID of the flow to deploy.")
    is_schedule_active: Optional[bool] = Field(None)
    paused: Optional[bool] = Field(None)
    schedules: List[DeploymentScheduleCreate] = Field(
        default_factory=list,
        description="A list of schedules for the deployment.",
    )
    enforce_parameter_schema: Optional[bool] = Field(
        default=None,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )
    parameter_openapi_schema: Optional[Dict[str, Any]] = Field(None)
    parameters: Dict[str, Any] = Field(
        default_factory=dict,
        description="Parameters for flow runs scheduled by the deployment.",
    )
    tags: List[str] = Field(default_factory=list)
    pull_steps: Optional[List[dict]] = Field(None)

    manifest_path: Optional[str] = Field(None)
    work_queue_name: Optional[str] = Field(None)
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the deployment's work pool.",
        examples=["my-work-pool"],
    )
    storage_document_id: Optional[UUID] = Field(None)
    infrastructure_document_id: Optional[UUID] = Field(None)
    schedule: Optional[SCHEDULE_TYPES] = Field(None)
    description: Optional[str] = Field(None)
    path: Optional[str] = Field(None)
    version: Optional[str] = Field(None)
    entrypoint: Optional[str] = Field(None)
    job_variables: Optional[Dict[str, Any]] = Field(
        default_factory=dict,
        description="Overrides to apply to flow run infrastructure at runtime.",
    )

    def check_valid_configuration(self, base_job_template: dict):
        """Check that the combination of base_job_template defaults
        and job_variables conforms to the specified schema.
        """
        variables_schema = deepcopy(base_job_template.get("variables"))

        if variables_schema is not None:
            # jsonschema considers required fields, even if that field has a default,
            # to still be required. To get around this we remove the fields from
            # required if there is a default present.
            required = variables_schema.get("required")
            properties = variables_schema.get("properties")
            if required is not None and properties is not None:
                for k, v in properties.items():
                    if "default" in v and k in required:
                        required.remove(k)

            jsonschema.validate(self.job_variables, variables_schema)

check_valid_configuration

Check that the combination of base_job_template defaults and job_variables conforms to the specified schema.

Source code in prefect/client/schemas/actions.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def check_valid_configuration(self, base_job_template: dict):
    """Check that the combination of base_job_template defaults
    and job_variables conforms to the specified schema.
    """
    variables_schema = deepcopy(base_job_template.get("variables"))

    if variables_schema is not None:
        # jsonschema considers required fields, even if that field has a default,
        # to still be required. To get around this we remove the fields from
        # required if there is a default present.
        required = variables_schema.get("required")
        properties = variables_schema.get("properties")
        if required is not None and properties is not None:
            for k, v in properties.items():
                if "default" in v and k in required:
                    required.remove(k)

        jsonschema.validate(self.job_variables, variables_schema)

DeploymentUpdate

Bases: DeprecatedInfraOverridesField, ActionBaseModel

Data used by the Prefect REST API to update a deployment.

Source code in prefect/client/schemas/actions.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class DeploymentUpdate(DeprecatedInfraOverridesField, ActionBaseModel):
    """Data used by the Prefect REST API to update a deployment."""

    @root_validator(pre=True)
    def remove_old_fields(cls, values):
        return remove_old_deployment_fields(values)

    @validator("schedule")
    def validate_none_schedule(cls, v):
        return return_none_schedule(v)

    version: Optional[str] = Field(None)
    schedule: Optional[SCHEDULE_TYPES] = Field(None)
    description: Optional[str] = Field(None)
    is_schedule_active: bool = Field(None)
    parameters: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Parameters for flow runs scheduled by the deployment.",
    )
    tags: List[str] = Field(default_factory=list)
    work_queue_name: Optional[str] = Field(None)
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the deployment's work pool.",
        examples=["my-work-pool"],
    )
    path: Optional[str] = Field(None)
    job_variables: Optional[Dict[str, Any]] = Field(
        default_factory=dict,
        description="Overrides to apply to flow run infrastructure at runtime.",
    )
    entrypoint: Optional[str] = Field(None)
    manifest_path: Optional[str] = Field(None)
    storage_document_id: Optional[UUID] = Field(None)
    infrastructure_document_id: Optional[UUID] = Field(None)
    enforce_parameter_schema: Optional[bool] = Field(
        default=None,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )

    def check_valid_configuration(self, base_job_template: dict):
        """Check that the combination of base_job_template defaults
        and job_variables conforms to the specified schema.
        """
        variables_schema = deepcopy(base_job_template.get("variables"))

        if variables_schema is not None:
            # jsonschema considers required fields, even if that field has a default,
            # to still be required. To get around this we remove the fields from
            # required if there is a default present.
            required = variables_schema.get("required")
            properties = variables_schema.get("properties")
            if required is not None and properties is not None:
                for k, v in properties.items():
                    if "default" in v and k in required:
                        required.remove(k)

        if variables_schema is not None:
            jsonschema.validate(self.job_variables, variables_schema)

check_valid_configuration

Check that the combination of base_job_template defaults and job_variables conforms to the specified schema.

Source code in prefect/client/schemas/actions.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def check_valid_configuration(self, base_job_template: dict):
    """Check that the combination of base_job_template defaults
    and job_variables conforms to the specified schema.
    """
    variables_schema = deepcopy(base_job_template.get("variables"))

    if variables_schema is not None:
        # jsonschema considers required fields, even if that field has a default,
        # to still be required. To get around this we remove the fields from
        # required if there is a default present.
        required = variables_schema.get("required")
        properties = variables_schema.get("properties")
        if required is not None and properties is not None:
            for k, v in properties.items():
                if "default" in v and k in required:
                    required.remove(k)

    if variables_schema is not None:
        jsonschema.validate(self.job_variables, variables_schema)

FlowRunUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow run.

Source code in prefect/client/schemas/actions.py
246
247
248
249
250
251
252
253
254
255
256
257
class FlowRunUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow run."""

    name: Optional[str] = Field(None)
    flow_version: Optional[str] = Field(None)
    parameters: Optional[Dict[str, Any]] = Field(None)
    empirical_policy: objects.FlowRunPolicy = Field(
        default_factory=objects.FlowRunPolicy
    )
    tags: List[str] = Field(default_factory=list)
    infrastructure_pid: Optional[str] = Field(None)
    job_variables: Optional[Dict[str, Any]] = Field(None)

TaskRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a task run

Source code in prefect/client/schemas/actions.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class TaskRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a task run"""

    # TaskRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the task run to create"
    )

    name: Optional[str] = Field(
        default=None,
        description="The name of the task run",
    )
    flow_run_id: Optional[UUID] = Field(None)
    task_key: str = Field(
        default=..., description="A unique identifier for the task being run."
    )
    dynamic_key: str = Field(
        default=...,
        description=(
            "A dynamic key used to differentiate between multiple runs of the same task"
            " within the same flow run."
        ),
    )
    cache_key: Optional[str] = Field(None)
    cache_expiration: Optional[objects.DateTimeTZ] = Field(None)
    task_version: Optional[str] = Field(None)
    empirical_policy: objects.TaskRunPolicy = Field(
        default_factory=objects.TaskRunPolicy,
    )
    tags: List[str] = Field(default_factory=list)
    task_inputs: Dict[
        str,
        List[
            Union[
                objects.TaskRunResult,
                objects.Parameter,
                objects.Constant,
            ]
        ],
    ] = Field(default_factory=dict)

TaskRunUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a task run

Source code in prefect/client/schemas/actions.py
302
303
304
305
class TaskRunUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a task run"""

    name: Optional[str] = Field(None)

FlowRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run.

Source code in prefect/client/schemas/actions.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class FlowRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run."""

    # FlowRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the flow run to create"
    )

    name: Optional[str] = Field(default=None, description="The name of the flow run.")
    flow_id: UUID = Field(default=..., description="The id of the flow being run.")
    deployment_id: Optional[UUID] = Field(None)
    flow_version: Optional[str] = Field(None)
    parameters: Dict[str, Any] = Field(
        default_factory=dict, description="The parameters for the flow run."
    )
    context: Dict[str, Any] = Field(
        default_factory=dict, description="The context for the flow run."
    )
    parent_task_run_id: Optional[UUID] = Field(None)
    infrastructure_document_id: Optional[UUID] = Field(None)
    empirical_policy: objects.FlowRunPolicy = Field(
        default_factory=objects.FlowRunPolicy
    )
    tags: List[str] = Field(default_factory=list)
    idempotency_key: Optional[str] = Field(None)

    class Config(ActionBaseModel.Config):
        json_dumps = orjson_dumps_extra_compatible

DeploymentFlowRunCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run from a deployment.

Source code in prefect/client/schemas/actions.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
class DeploymentFlowRunCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run from a deployment."""

    # FlowRunCreate states must be provided as StateCreate objects
    state: Optional[StateCreate] = Field(
        default=None, description="The state of the flow run to create"
    )

    name: Optional[str] = Field(default=None, description="The name of the flow run.")
    parameters: Dict[str, Any] = Field(
        default_factory=dict, description="The parameters for the flow run."
    )
    context: Dict[str, Any] = Field(
        default_factory=dict, description="The context for the flow run."
    )
    infrastructure_document_id: Optional[UUID] = Field(None)
    empirical_policy: objects.FlowRunPolicy = Field(
        default_factory=objects.FlowRunPolicy
    )
    tags: List[str] = Field(default_factory=list)
    idempotency_key: Optional[str] = Field(None)
    parent_task_run_id: Optional[UUID] = Field(None)
    work_queue_name: Optional[str] = Field(None)
    job_variables: Optional[dict] = Field(None)

SavedSearchCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a saved search.

Source code in prefect/client/schemas/actions.py
364
365
366
367
368
369
370
class SavedSearchCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a saved search."""

    name: str = Field(default=..., description="The name of the saved search.")
    filters: List[objects.SavedSearchFilter] = Field(
        default_factory=list, description="The filter set for the saved search."
    )

ConcurrencyLimitCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a concurrency limit.

Source code in prefect/client/schemas/actions.py
373
374
375
376
377
378
379
class ConcurrencyLimitCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a concurrency limit."""

    tag: str = Field(
        default=..., description="A tag the concurrency limit is applied to."
    )
    concurrency_limit: int = Field(default=..., description="The concurrency limit.")

BlockTypeCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block type.

Source code in prefect/client/schemas/actions.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
class BlockTypeCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block type."""

    name: str = Field(default=..., description="A block type's name")
    slug: str = Field(default=..., description="A block type's slug")
    logo_url: Optional[objects.HttpUrl] = Field(
        default=None, description="Web URL for the block type's logo"
    )
    documentation_url: Optional[objects.HttpUrl] = Field(
        default=None, description="Web URL for the block type's documentation"
    )
    description: Optional[str] = Field(
        default=None,
        description="A short blurb about the corresponding block's intended use",
    )
    code_example: Optional[str] = Field(
        default=None,
        description="A code snippet demonstrating use of the corresponding block",
    )

    # validators
    _validate_slug_format = validator("slug", allow_reuse=True)(
        validate_block_type_slug
    )

BlockTypeUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a block type.

Source code in prefect/client/schemas/actions.py
408
409
410
411
412
413
414
415
416
417
418
class BlockTypeUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a block type."""

    logo_url: Optional[objects.HttpUrl] = Field(None)
    documentation_url: Optional[objects.HttpUrl] = Field(None)
    description: Optional[str] = Field(None)
    code_example: Optional[str] = Field(None)

    @classmethod
    def updatable_fields(cls) -> set:
        return get_class_fields_only(cls)

BlockSchemaCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block schema.

Source code in prefect/client/schemas/actions.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class BlockSchemaCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block schema."""

    fields: Dict[str, Any] = Field(
        default_factory=dict, description="The block schema's field schema"
    )
    block_type_id: Optional[UUID] = Field(None)
    capabilities: List[str] = Field(
        default_factory=list,
        description="A list of Block capabilities",
    )
    version: str = Field(
        default=objects.DEFAULT_BLOCK_SCHEMA_VERSION,
        description="Human readable identifier for the block schema",
    )

BlockDocumentCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a block document.

Source code in prefect/client/schemas/actions.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
class BlockDocumentCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a block document."""

    name: Optional[str] = Field(
        default=None, description="The name of the block document"
    )
    data: Dict[str, Any] = Field(
        default_factory=dict, description="The block document's data"
    )
    block_schema_id: UUID = Field(
        default=..., description="The block schema ID for the block document"
    )
    block_type_id: UUID = Field(
        default=..., description="The block type ID for the block document"
    )
    is_anonymous: bool = Field(
        default=False,
        description=(
            "Whether the block is anonymous (anonymous blocks are usually created by"
            " Prefect automatically)"
        ),
    )

    _validate_name_format = validator("name", allow_reuse=True)(
        validate_block_document_name
    )

    @root_validator
    def validate_name_is_present_if_not_anonymous(cls, values):
        return validate_name_present_on_nonanonymous_blocks(values)

BlockDocumentUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a block document.

Source code in prefect/client/schemas/actions.py
470
471
472
473
474
475
476
477
478
479
480
481
482
class BlockDocumentUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a block document."""

    block_schema_id: Optional[UUID] = Field(
        default=None, description="A block schema ID"
    )
    data: Dict[str, Any] = Field(
        default_factory=dict, description="The block document's data"
    )
    merge_existing_data: bool = Field(
        default=True,
        description="Whether to merge the existing data with the new data or replace it",
    )

BlockDocumentReferenceCreate

Bases: ActionBaseModel

Data used to create block document reference.

Source code in prefect/client/schemas/actions.py
485
486
487
488
489
490
491
492
493
494
495
496
497
class BlockDocumentReferenceCreate(ActionBaseModel):
    """Data used to create block document reference."""

    id: UUID = Field(default_factory=uuid4)
    parent_block_document_id: UUID = Field(
        default=..., description="ID of block document the reference is nested within"
    )
    reference_block_document_id: UUID = Field(
        default=..., description="ID of the nested block document"
    )
    name: str = Field(
        default=..., description="The name that the reference is nested under"
    )

LogCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a log.

Source code in prefect/client/schemas/actions.py
500
501
502
503
504
505
506
507
508
class LogCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a log."""

    name: str = Field(default=..., description="The logger name.")
    level: int = Field(default=..., description="The log level.")
    message: str = Field(default=..., description="The log message.")
    timestamp: DateTimeTZ = Field(default=..., description="The log timestamp.")
    flow_run_id: Optional[UUID] = Field(None)
    task_run_id: Optional[UUID] = Field(None)

WorkPoolCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a work pool.

Source code in prefect/client/schemas/actions.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
class WorkPoolCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a work pool."""

    name: str = Field(
        description="The name of the work pool.",
    )
    description: Optional[str] = Field(None)
    type: str = Field(
        description="The work pool type.", default="prefect-agent"
    )  # TODO: change default
    base_job_template: Dict[str, Any] = Field(
        default_factory=dict,
        description="The base job template for the work pool.",
    )
    is_paused: bool = Field(
        default=False,
        description="Whether the work pool is paused.",
    )
    concurrency_limit: Optional[NonNegativeInteger] = Field(
        default=None, description="A concurrency limit for the work pool."
    )

WorkPoolUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a work pool.

Source code in prefect/client/schemas/actions.py
534
535
536
537
538
539
540
class WorkPoolUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a work pool."""

    description: Optional[str] = Field(None)
    is_paused: Optional[bool] = Field(None)
    base_job_template: Optional[Dict[str, Any]] = Field(None)
    concurrency_limit: Optional[int] = Field(None)

WorkQueueCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a work queue.

Source code in prefect/client/schemas/actions.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
class WorkQueueCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a work queue."""

    name: str = Field(default=..., description="The name of the work queue.")
    description: Optional[str] = Field(None)
    is_paused: bool = Field(
        default=False,
        description="Whether the work queue is paused.",
    )
    concurrency_limit: Optional[int] = Field(
        default=None,
        description="A concurrency limit for the work queue.",
    )
    priority: Optional[int] = Field(
        default=None,
        description=(
            "The queue's priority. Lower values are higher priority (1 is the highest)."
        ),
    )

    # DEPRECATED

    filter: Optional[objects.QueueFilter] = Field(
        None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )

WorkQueueUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a work queue.

Source code in prefect/client/schemas/actions.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
class WorkQueueUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a work queue."""

    name: Optional[str] = Field(None)
    description: Optional[str] = Field(None)
    is_paused: bool = Field(
        default=False, description="Whether or not the work queue is paused."
    )
    concurrency_limit: Optional[int] = Field(None)
    priority: Optional[int] = Field(None)
    last_polled: Optional[DateTimeTZ] = Field(None)

    # DEPRECATED

    filter: Optional[objects.QueueFilter] = Field(
        None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )

FlowRunNotificationPolicyCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a flow run notification policy.

Source code in prefect/client/schemas/actions.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
class FlowRunNotificationPolicyCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a flow run notification policy."""

    is_active: bool = Field(
        default=True, description="Whether the policy is currently active"
    )
    state_names: List[str] = Field(
        default=..., description="The flow run states that trigger notifications"
    )
    tags: List[str] = Field(
        default=...,
        description="The flow run tags that trigger notifications (set [] to disable)",
    )
    block_document_id: UUID = Field(
        default=..., description="The block document ID used for sending notifications"
    )
    message_template: Optional[str] = Field(
        default=None,
        description=(
            "A templatable notification message. Use {braces} to add variables."
            " Valid variables include:"
            f" {listrepr(sorted(objects.FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS), sep=', ')}"
        ),
        examples=[
            "Flow run {flow_run_name} with id {flow_run_id} entered state"
            " {flow_run_state_name}."
        ],
    )

    @validator("message_template")
    def validate_message_template_variables(cls, v):
        return validate_message_template_variables(v)

FlowRunNotificationPolicyUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a flow run notification policy.

Source code in prefect/client/schemas/actions.py
627
628
629
630
631
632
633
634
class FlowRunNotificationPolicyUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a flow run notification policy."""

    is_active: Optional[bool] = Field(None)
    state_names: Optional[List[str]] = Field(None)
    tags: Optional[List[str]] = Field(None)
    block_document_id: Optional[UUID] = Field(None)
    message_template: Optional[str] = Field(None)

ArtifactCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create an artifact.

Source code in prefect/client/schemas/actions.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
class ArtifactCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create an artifact."""

    key: Optional[str] = Field(None)
    type: Optional[str] = Field(None)
    description: Optional[str] = Field(None)
    data: Optional[Union[Dict[str, Any], Any]] = Field(None)
    metadata_: Optional[Dict[str, str]] = Field(None)
    flow_run_id: Optional[UUID] = Field(None)
    task_run_id: Optional[UUID] = Field(None)

    _validate_artifact_format = validator("key", allow_reuse=True)(
        validate_artifact_key
    )

ArtifactUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update an artifact.

Source code in prefect/client/schemas/actions.py
653
654
655
656
657
658
class ArtifactUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update an artifact."""

    data: Optional[Union[Dict[str, Any], Any]] = Field(None)
    description: Optional[str] = Field(None)
    metadata_: Optional[Dict[str, str]] = Field(None)

VariableCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a Variable.

Source code in prefect/client/schemas/actions.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
class VariableCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a Variable."""

    name: str = Field(
        default=...,
        description="The name of the variable",
        examples=["my_variable"],
        max_length=objects.MAX_VARIABLE_NAME_LENGTH,
    )
    value: str = Field(
        default=...,
        description="The value of the variable",
        examples=["my-value"],
        max_length=objects.MAX_VARIABLE_VALUE_LENGTH,
    )
    tags: Optional[List[str]] = Field(default=None)

    # validators
    _validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

VariableUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a Variable.

Source code in prefect/client/schemas/actions.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
class VariableUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a Variable."""

    name: Optional[str] = Field(
        default=None,
        description="The name of the variable",
        examples=["my_variable"],
        max_length=objects.MAX_VARIABLE_NAME_LENGTH,
    )
    value: Optional[str] = Field(
        default=None,
        description="The value of the variable",
        examples=["my-value"],
        max_length=objects.MAX_VARIABLE_NAME_LENGTH,
    )
    tags: Optional[List[str]] = Field(default=None)

    # validators
    _validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

GlobalConcurrencyLimitCreate

Bases: ActionBaseModel

Data used by the Prefect REST API to create a global concurrency limit.

Source code in prefect/client/schemas/actions.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
class GlobalConcurrencyLimitCreate(ActionBaseModel):
    """Data used by the Prefect REST API to create a global concurrency limit."""

    name: str = Field(description="The name of the global concurrency limit.")
    limit: int = Field(
        description=(
            "The maximum number of slots that can be occupied on this concurrency"
            " limit."
        )
    )
    active: Optional[bool] = Field(
        default=True,
        description="Whether or not the concurrency limit is in an active state.",
    )
    active_slots: Optional[int] = Field(
        default=0,
        description="Number of tasks currently using a concurrency slot.",
    )
    slot_decay_per_second: Optional[float] = Field(
        default=0.0,
        description=(
            "Controls the rate at which slots are released when the concurrency limit"
            " is used as a rate limit."
        ),
    )

GlobalConcurrencyLimitUpdate

Bases: ActionBaseModel

Data used by the Prefect REST API to update a global concurrency limit.

Source code in prefect/client/schemas/actions.py
730
731
732
733
734
735
736
737
class GlobalConcurrencyLimitUpdate(ActionBaseModel):
    """Data used by the Prefect REST API to update a global concurrency limit."""

    name: Optional[str] = Field(None)
    limit: Optional[int] = Field(None)
    active: Optional[bool] = Field(None)
    active_slots: Optional[int] = Field(None)
    slot_decay_per_second: Optional[float] = Field(None)

prefect.client.schemas.filters

Schemas that define Prefect REST API filtering operations.

Operator

Bases: AutoEnum

Operators for combining filter criteria.

Source code in prefect/client/schemas/filters.py
21
22
23
24
25
class Operator(AutoEnum):
    """Operators for combining filter criteria."""

    and_ = AutoEnum.auto()
    or_ = AutoEnum.auto()

OperatorMixin

Base model for Prefect filters that combines criteria with a user-provided operator

Source code in prefect/client/schemas/filters.py
28
29
30
31
32
33
34
class OperatorMixin:
    """Base model for Prefect filters that combines criteria with a user-provided operator"""

    operator: Operator = Field(
        default=Operator.and_,
        description="Operator for combining filter criteria. Defaults to 'and_'.",
    )

FlowFilterId

Bases: PrefectBaseModel

Filter by Flow.id.

Source code in prefect/client/schemas/filters.py
37
38
39
40
41
42
class FlowFilterId(PrefectBaseModel):
    """Filter by `Flow.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow ids to include"
    )

FlowFilterName

Bases: PrefectBaseModel

Filter by Flow.name.

Source code in prefect/client/schemas/filters.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class FlowFilterName(PrefectBaseModel):
    """Filter by `Flow.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of flow names to include",
        examples=[["my-flow-1", "my-flow-2"]],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        examples=["marvin"],
    )

FlowFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Flow.tags.

Source code in prefect/client/schemas/filters.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class FlowFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Flow.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["tag-1", "tag-2"]],
        description=(
            "A list of tags. Flows will be returned only if their tags are a superset"
            " of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include flows without tags"
    )

FlowFilter

Bases: PrefectBaseModel, OperatorMixin

Filter for flows. Only flows matching all criteria will be returned.

Source code in prefect/client/schemas/filters.py
81
82
83
84
85
86
87
88
89
90
91
92
class FlowFilter(PrefectBaseModel, OperatorMixin):
    """Filter for flows. Only flows matching all criteria will be returned."""

    id: Optional[FlowFilterId] = Field(
        default=None, description="Filter criteria for `Flow.id`"
    )
    name: Optional[FlowFilterName] = Field(
        default=None, description="Filter criteria for `Flow.name`"
    )
    tags: Optional[FlowFilterTags] = Field(
        default=None, description="Filter criteria for `Flow.tags`"
    )

FlowRunFilterId

Bases: PrefectBaseModel

Filter by FlowRun.id.

Source code in prefect/client/schemas/filters.py
 95
 96
 97
 98
 99
100
101
102
103
class FlowRunFilterId(PrefectBaseModel):
    """Filter by FlowRun.id."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run ids to include"
    )
    not_any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run ids to exclude"
    )

FlowRunFilterName

Bases: PrefectBaseModel

Filter by FlowRun.name.

Source code in prefect/client/schemas/filters.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class FlowRunFilterName(PrefectBaseModel):
    """Filter by `FlowRun.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of flow run names to include",
        examples=[["my-flow-run-1", "my-flow-run-2"]],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        examples=["marvin"],
    )

FlowRunFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.tags.

Source code in prefect/client/schemas/filters.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class FlowRunFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["tag-1", "tag-2"]],
        description=(
            "A list of tags. Flow runs will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include flow runs without tags"
    )

FlowRunFilterDeploymentId

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.deployment_id.

Source code in prefect/client/schemas/filters.py
142
143
144
145
146
147
148
149
150
151
class FlowRunFilterDeploymentId(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.deployment_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run deployment ids to include"
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without deployment ids",
    )

FlowRunFilterWorkQueueName

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.work_queue_name.

Source code in prefect/client/schemas/filters.py
154
155
156
157
158
159
160
161
162
163
164
165
class FlowRunFilterWorkQueueName(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.work_queue_name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        examples=[["work_queue_1", "work_queue_2"]],
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without work queue names",
    )

FlowRunFilterStateType

Bases: PrefectBaseModel

Filter by FlowRun.state_type.

Source code in prefect/client/schemas/filters.py
168
169
170
171
172
173
class FlowRunFilterStateType(PrefectBaseModel):
    """Filter by `FlowRun.state_type`."""

    any_: Optional[List[StateType]] = Field(
        default=None, description="A list of flow run state types to include"
    )

FlowRunFilterFlowVersion

Bases: PrefectBaseModel

Filter by FlowRun.flow_version.

Source code in prefect/client/schemas/filters.py
187
188
189
190
191
192
class FlowRunFilterFlowVersion(PrefectBaseModel):
    """Filter by `FlowRun.flow_version`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run flow_versions to include"
    )

FlowRunFilterStartTime

Bases: PrefectBaseModel

Filter by FlowRun.start_time.

Source code in prefect/client/schemas/filters.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class FlowRunFilterStartTime(PrefectBaseModel):
    """Filter by `FlowRun.start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs starting at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs starting at or after this time",
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only return flow runs without a start time"
    )

FlowRunFilterExpectedStartTime

Bases: PrefectBaseModel

Filter by FlowRun.expected_start_time.

Source code in prefect/client/schemas/filters.py
211
212
213
214
215
216
217
218
219
220
221
class FlowRunFilterExpectedStartTime(PrefectBaseModel):
    """Filter by `FlowRun.expected_start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs scheduled to start at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include flow runs scheduled to start at or after this time",
    )

FlowRunFilterNextScheduledStartTime

Bases: PrefectBaseModel

Filter by FlowRun.next_scheduled_start_time.

Source code in prefect/client/schemas/filters.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class FlowRunFilterNextScheduledStartTime(PrefectBaseModel):
    """Filter by `FlowRun.next_scheduled_start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include flow runs with a next_scheduled_start_time or before this"
            " time"
        ),
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include flow runs with a next_scheduled_start_time at or after this"
            " time"
        ),
    )

FlowRunFilterParentFlowRunId

Bases: PrefectBaseModel, OperatorMixin

Filter for subflows of the given flow runs

Source code in prefect/client/schemas/filters.py
243
244
245
246
247
248
class FlowRunFilterParentFlowRunId(PrefectBaseModel, OperatorMixin):
    """Filter for subflows of the given flow runs"""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run parents to include"
    )

FlowRunFilterParentTaskRunId

Bases: PrefectBaseModel, OperatorMixin

Filter by FlowRun.parent_task_run_id.

Source code in prefect/client/schemas/filters.py
251
252
253
254
255
256
257
258
259
260
class FlowRunFilterParentTaskRunId(PrefectBaseModel, OperatorMixin):
    """Filter by `FlowRun.parent_task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run parent_task_run_ids to include"
    )
    is_null_: Optional[bool] = Field(
        default=None,
        description="If true, only include flow runs without parent_task_run_id",
    )

FlowRunFilterIdempotencyKey

Bases: PrefectBaseModel

Filter by FlowRun.idempotency_key.

Source code in prefect/client/schemas/filters.py
263
264
265
266
267
268
269
270
271
class FlowRunFilterIdempotencyKey(PrefectBaseModel):
    """Filter by FlowRun.idempotency_key."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run idempotency keys to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of flow run idempotency keys to exclude"
    )

FlowRunFilter

Bases: PrefectBaseModel, OperatorMixin

Filter flow runs. Only flow runs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class FlowRunFilter(PrefectBaseModel, OperatorMixin):
    """Filter flow runs. Only flow runs matching all criteria will be returned"""

    id: Optional[FlowRunFilterId] = Field(
        default=None, description="Filter criteria for `FlowRun.id`"
    )
    name: Optional[FlowRunFilterName] = Field(
        default=None, description="Filter criteria for `FlowRun.name`"
    )
    tags: Optional[FlowRunFilterTags] = Field(
        default=None, description="Filter criteria for `FlowRun.tags`"
    )
    deployment_id: Optional[FlowRunFilterDeploymentId] = Field(
        default=None, description="Filter criteria for `FlowRun.deployment_id`"
    )
    work_queue_name: Optional[FlowRunFilterWorkQueueName] = Field(
        default=None, description="Filter criteria for `FlowRun.work_queue_name"
    )
    state: Optional[FlowRunFilterState] = Field(
        default=None, description="Filter criteria for `FlowRun.state`"
    )
    flow_version: Optional[FlowRunFilterFlowVersion] = Field(
        default=None, description="Filter criteria for `FlowRun.flow_version`"
    )
    start_time: Optional[FlowRunFilterStartTime] = Field(
        default=None, description="Filter criteria for `FlowRun.start_time`"
    )
    expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field(
        default=None, description="Filter criteria for `FlowRun.expected_start_time`"
    )
    next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field(
        default=None,
        description="Filter criteria for `FlowRun.next_scheduled_start_time`",
    )
    parent_flow_run_id: Optional[FlowRunFilterParentFlowRunId] = Field(
        default=None, description="Filter criteria for subflows of the given flow runs"
    )
    parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field(
        default=None, description="Filter criteria for `FlowRun.parent_task_run_id`"
    )
    idempotency_key: Optional[FlowRunFilterIdempotencyKey] = Field(
        default=None, description="Filter criteria for `FlowRun.idempotency_key`"
    )

TaskRunFilterFlowRunId

Bases: PrefectBaseModel

Filter by TaskRun.flow_run_id.

Source code in prefect/client/schemas/filters.py
319
320
321
322
323
324
325
326
327
328
329
class TaskRunFilterFlowRunId(PrefectBaseModel):
    """Filter by `TaskRun.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run ids to include"
    )

    is_null_: bool = Field(
        default=False,
        description="If true, only include task runs without a flow run id",
    )

TaskRunFilterId

Bases: PrefectBaseModel

Filter by TaskRun.id.

Source code in prefect/client/schemas/filters.py
332
333
334
335
336
337
class TaskRunFilterId(PrefectBaseModel):
    """Filter by `TaskRun.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run ids to include"
    )

TaskRunFilterName

Bases: PrefectBaseModel

Filter by TaskRun.name.

Source code in prefect/client/schemas/filters.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class TaskRunFilterName(PrefectBaseModel):
    """Filter by `TaskRun.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of task run names to include",
        examples=[["my-task-run-1", "my-task-run-2"]],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        examples=["marvin"],
    )

TaskRunFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by TaskRun.tags.

Source code in prefect/client/schemas/filters.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class TaskRunFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `TaskRun.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["tag-1", "tag-2"]],
        description=(
            "A list of tags. Task runs will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include task runs without tags"
    )

TaskRunFilterStateType

Bases: PrefectBaseModel

Filter by TaskRun.state_type.

Source code in prefect/client/schemas/filters.py
376
377
378
379
380
381
class TaskRunFilterStateType(PrefectBaseModel):
    """Filter by `TaskRun.state_type`."""

    any_: Optional[List[StateType]] = Field(
        default=None, description="A list of task run state types to include"
    )

TaskRunFilterSubFlowRuns

Bases: PrefectBaseModel

Filter by TaskRun.subflow_run.

Source code in prefect/client/schemas/filters.py
395
396
397
398
399
400
401
402
403
404
class TaskRunFilterSubFlowRuns(PrefectBaseModel):
    """Filter by `TaskRun.subflow_run`."""

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If true, only include task runs that are subflow run parents; if false,"
            " exclude parent task runs"
        ),
    )

TaskRunFilterStartTime

Bases: PrefectBaseModel

Filter by TaskRun.start_time.

Source code in prefect/client/schemas/filters.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
class TaskRunFilterStartTime(PrefectBaseModel):
    """Filter by `TaskRun.start_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include task runs starting at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include task runs starting at or after this time",
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only return task runs without a start time"
    )

TaskRunFilter

Bases: PrefectBaseModel, OperatorMixin

Filter task runs. Only task runs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
class TaskRunFilter(PrefectBaseModel, OperatorMixin):
    """Filter task runs. Only task runs matching all criteria will be returned"""

    id: Optional[TaskRunFilterId] = Field(
        default=None, description="Filter criteria for `TaskRun.id`"
    )
    name: Optional[TaskRunFilterName] = Field(
        default=None, description="Filter criteria for `TaskRun.name`"
    )
    tags: Optional[TaskRunFilterTags] = Field(
        default=None, description="Filter criteria for `TaskRun.tags`"
    )
    state: Optional[TaskRunFilterState] = Field(
        default=None, description="Filter criteria for `TaskRun.state`"
    )
    start_time: Optional[TaskRunFilterStartTime] = Field(
        default=None, description="Filter criteria for `TaskRun.start_time`"
    )
    subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field(
        default=None, description="Filter criteria for `TaskRun.subflow_run`"
    )
    flow_run_id: Optional[TaskRunFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `TaskRun.flow_run_id`"
    )

DeploymentFilterId

Bases: PrefectBaseModel

Filter by Deployment.id.

Source code in prefect/client/schemas/filters.py
449
450
451
452
453
454
class DeploymentFilterId(PrefectBaseModel):
    """Filter by `Deployment.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of deployment ids to include"
    )

DeploymentFilterName

Bases: PrefectBaseModel

Filter by Deployment.name.

Source code in prefect/client/schemas/filters.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
class DeploymentFilterName(PrefectBaseModel):
    """Filter by `Deployment.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of deployment names to include",
        examples=[["my-deployment-1", "my-deployment-2"]],
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        examples=["marvin"],
    )

DeploymentFilterWorkQueueName

Bases: PrefectBaseModel

Filter by Deployment.work_queue_name.

Source code in prefect/client/schemas/filters.py
477
478
479
480
481
482
483
484
class DeploymentFilterWorkQueueName(PrefectBaseModel):
    """Filter by `Deployment.work_queue_name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        examples=[["work_queue_1", "work_queue_2"]],
    )

DeploymentFilterIsScheduleActive

Bases: PrefectBaseModel

Filter by Deployment.is_schedule_active.

Source code in prefect/client/schemas/filters.py
487
488
489
490
491
492
493
class DeploymentFilterIsScheduleActive(PrefectBaseModel):
    """Filter by `Deployment.is_schedule_active`."""

    eq_: Optional[bool] = Field(
        default=None,
        description="Only returns where deployment schedule is/is not active",
    )

DeploymentFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Deployment.tags.

Source code in prefect/client/schemas/filters.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class DeploymentFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Deployment.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["tag-1", "tag-2"]],
        description=(
            "A list of tags. Deployments will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include deployments without tags"
    )

DeploymentFilter

Bases: PrefectBaseModel, OperatorMixin

Filter for deployments. Only deployments matching all criteria will be returned.

Source code in prefect/client/schemas/filters.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
class DeploymentFilter(PrefectBaseModel, OperatorMixin):
    """Filter for deployments. Only deployments matching all criteria will be returned."""

    id: Optional[DeploymentFilterId] = Field(
        default=None, description="Filter criteria for `Deployment.id`"
    )
    name: Optional[DeploymentFilterName] = Field(
        default=None, description="Filter criteria for `Deployment.name`"
    )
    is_schedule_active: Optional[DeploymentFilterIsScheduleActive] = Field(
        default=None, description="Filter criteria for `Deployment.is_schedule_active`"
    )
    tags: Optional[DeploymentFilterTags] = Field(
        default=None, description="Filter criteria for `Deployment.tags`"
    )
    work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field(
        default=None, description="Filter criteria for `Deployment.work_queue_name`"
    )

LogFilterName

Bases: PrefectBaseModel

Filter by Log.name.

Source code in prefect/client/schemas/filters.py
532
533
534
535
536
537
538
539
class LogFilterName(PrefectBaseModel):
    """Filter by `Log.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of log names to include",
        examples=[["prefect.logger.flow_runs", "prefect.logger.task_runs"]],
    )

LogFilterLevel

Bases: PrefectBaseModel

Filter by Log.level.

Source code in prefect/client/schemas/filters.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class LogFilterLevel(PrefectBaseModel):
    """Filter by `Log.level`."""

    ge_: Optional[int] = Field(
        default=None,
        description="Include logs with a level greater than or equal to this level",
        examples=[20],
    )

    le_: Optional[int] = Field(
        default=None,
        description="Include logs with a level less than or equal to this level",
        examples=[50],
    )

LogFilterTimestamp

Bases: PrefectBaseModel

Filter by Log.timestamp.

Source code in prefect/client/schemas/filters.py
558
559
560
561
562
563
564
565
566
567
568
class LogFilterTimestamp(PrefectBaseModel):
    """Filter by `Log.timestamp`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include logs with a timestamp at or before this time",
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description="Only include logs with a timestamp at or after this time",
    )

LogFilterFlowRunId

Bases: PrefectBaseModel

Filter by Log.flow_run_id.

Source code in prefect/client/schemas/filters.py
571
572
573
574
575
576
class LogFilterFlowRunId(PrefectBaseModel):
    """Filter by `Log.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

LogFilterTaskRunId

Bases: PrefectBaseModel

Filter by Log.task_run_id.

Source code in prefect/client/schemas/filters.py
579
580
581
582
583
584
class LogFilterTaskRunId(PrefectBaseModel):
    """Filter by `Log.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

LogFilter

Bases: PrefectBaseModel, OperatorMixin

Filter logs. Only logs matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
class LogFilter(PrefectBaseModel, OperatorMixin):
    """Filter logs. Only logs matching all criteria will be returned"""

    level: Optional[LogFilterLevel] = Field(
        default=None, description="Filter criteria for `Log.level`"
    )
    timestamp: Optional[LogFilterTimestamp] = Field(
        default=None, description="Filter criteria for `Log.timestamp`"
    )
    flow_run_id: Optional[LogFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Log.flow_run_id`"
    )
    task_run_id: Optional[LogFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Log.task_run_id`"
    )

FilterSet

Bases: PrefectBaseModel

A collection of filters for common objects

Source code in prefect/client/schemas/filters.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class FilterSet(PrefectBaseModel):
    """A collection of filters for common objects"""

    flows: FlowFilter = Field(
        default_factory=FlowFilter, description="Filters that apply to flows"
    )
    flow_runs: FlowRunFilter = Field(
        default_factory=FlowRunFilter, description="Filters that apply to flow runs"
    )
    task_runs: TaskRunFilter = Field(
        default_factory=TaskRunFilter, description="Filters that apply to task runs"
    )
    deployments: DeploymentFilter = Field(
        default_factory=DeploymentFilter,
        description="Filters that apply to deployments",
    )

BlockTypeFilterName

Bases: PrefectBaseModel

Filter by BlockType.name

Source code in prefect/client/schemas/filters.py
622
623
624
625
626
627
628
629
630
631
632
633
class BlockTypeFilterName(PrefectBaseModel):
    """Filter by `BlockType.name`"""

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A case-insensitive partial match. For example, "
            " passing 'marvin' will match "
            "'marvin', 'sad-Marvin', and 'marvin-robot'."
        ),
        examples=["marvin"],
    )

BlockTypeFilterSlug

Bases: PrefectBaseModel

Filter by BlockType.slug

Source code in prefect/client/schemas/filters.py
636
637
638
639
640
641
class BlockTypeFilterSlug(PrefectBaseModel):
    """Filter by `BlockType.slug`"""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of slugs to match"
    )

BlockTypeFilter

Bases: PrefectBaseModel

Filter BlockTypes

Source code in prefect/client/schemas/filters.py
644
645
646
647
648
649
650
651
652
653
class BlockTypeFilter(PrefectBaseModel):
    """Filter BlockTypes"""

    name: Optional[BlockTypeFilterName] = Field(
        default=None, description="Filter criteria for `BlockType.name`"
    )

    slug: Optional[BlockTypeFilterSlug] = Field(
        default=None, description="Filter criteria for `BlockType.slug`"
    )

BlockSchemaFilterBlockTypeId

Bases: PrefectBaseModel

Filter by BlockSchema.block_type_id.

Source code in prefect/client/schemas/filters.py
656
657
658
659
660
661
class BlockSchemaFilterBlockTypeId(PrefectBaseModel):
    """Filter by `BlockSchema.block_type_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block type ids to include"
    )

BlockSchemaFilterId

Bases: PrefectBaseModel

Filter by BlockSchema.id

Source code in prefect/client/schemas/filters.py
664
665
666
667
668
669
class BlockSchemaFilterId(PrefectBaseModel):
    """Filter by BlockSchema.id"""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of IDs to include"
    )

BlockSchemaFilterCapabilities

Bases: PrefectBaseModel

Filter by BlockSchema.capabilities

Source code in prefect/client/schemas/filters.py
672
673
674
675
676
677
678
679
680
681
682
class BlockSchemaFilterCapabilities(PrefectBaseModel):
    """Filter by `BlockSchema.capabilities`"""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["write-storage", "read-storage"]],
        description=(
            "A list of block capabilities. Block entities will be returned only if an"
            " associated block schema has a superset of the defined capabilities."
        ),
    )

BlockSchemaFilterVersion

Bases: PrefectBaseModel

Filter by BlockSchema.capabilities

Source code in prefect/client/schemas/filters.py
685
686
687
688
689
690
691
692
class BlockSchemaFilterVersion(PrefectBaseModel):
    """Filter by `BlockSchema.capabilities`"""

    any_: Optional[List[str]] = Field(
        default=None,
        examples=[["2.0.0", "2.1.0"]],
        description="A list of block schema versions.",
    )

BlockSchemaFilter

Bases: PrefectBaseModel, OperatorMixin

Filter BlockSchemas

Source code in prefect/client/schemas/filters.py
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
class BlockSchemaFilter(PrefectBaseModel, OperatorMixin):
    """Filter BlockSchemas"""

    block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field(
        default=None, description="Filter criteria for `BlockSchema.block_type_id`"
    )
    block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field(
        default=None, description="Filter criteria for `BlockSchema.capabilities`"
    )
    id: Optional[BlockSchemaFilterId] = Field(
        default=None, description="Filter criteria for `BlockSchema.id`"
    )
    version: Optional[BlockSchemaFilterVersion] = Field(
        default=None, description="Filter criteria for `BlockSchema.version`"
    )

BlockDocumentFilterIsAnonymous

Bases: PrefectBaseModel

Filter by BlockDocument.is_anonymous.

Source code in prefect/client/schemas/filters.py
712
713
714
715
716
717
718
719
720
class BlockDocumentFilterIsAnonymous(PrefectBaseModel):
    """Filter by `BlockDocument.is_anonymous`."""

    eq_: Optional[bool] = Field(
        default=None,
        description=(
            "Filter block documents for only those that are or are not anonymous."
        ),
    )

BlockDocumentFilterBlockTypeId

Bases: PrefectBaseModel

Filter by BlockDocument.block_type_id.

Source code in prefect/client/schemas/filters.py
723
724
725
726
727
728
class BlockDocumentFilterBlockTypeId(PrefectBaseModel):
    """Filter by `BlockDocument.block_type_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block type ids to include"
    )

BlockDocumentFilterId

Bases: PrefectBaseModel

Filter by BlockDocument.id.

Source code in prefect/client/schemas/filters.py
731
732
733
734
735
736
class BlockDocumentFilterId(PrefectBaseModel):
    """Filter by `BlockDocument.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of block ids to include"
    )

BlockDocumentFilterName

Bases: PrefectBaseModel

Filter by BlockDocument.name.

Source code in prefect/client/schemas/filters.py
739
740
741
742
743
744
745
746
747
748
749
750
751
752
class BlockDocumentFilterName(PrefectBaseModel):
    """Filter by `BlockDocument.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of block names to include"
    )
    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match block names against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        examples=["my-block%"],
    )

BlockDocumentFilter

Bases: PrefectBaseModel, OperatorMixin

Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
class BlockDocumentFilter(PrefectBaseModel, OperatorMixin):
    """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned"""

    id: Optional[BlockDocumentFilterId] = Field(
        default=None, description="Filter criteria for `BlockDocument.id`"
    )
    is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field(
        # default is to exclude anonymous blocks
        BlockDocumentFilterIsAnonymous(eq_=False),
        description=(
            "Filter criteria for `BlockDocument.is_anonymous`. "
            "Defaults to excluding anonymous blocks."
        ),
    )
    block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field(
        default=None, description="Filter criteria for `BlockDocument.block_type_id`"
    )
    name: Optional[BlockDocumentFilterName] = Field(
        default=None, description="Filter criteria for `BlockDocument.name`"
    )

FlowRunNotificationPolicyFilterIsActive

Bases: PrefectBaseModel

Filter by FlowRunNotificationPolicy.is_active.

Source code in prefect/client/schemas/filters.py
777
778
779
780
781
782
783
784
785
class FlowRunNotificationPolicyFilterIsActive(PrefectBaseModel):
    """Filter by `FlowRunNotificationPolicy.is_active`."""

    eq_: Optional[bool] = Field(
        default=None,
        description=(
            "Filter notification policies for only those that are or are not active."
        ),
    )

FlowRunNotificationPolicyFilter

Bases: PrefectBaseModel

Filter FlowRunNotificationPolicies.

Source code in prefect/client/schemas/filters.py
788
789
790
791
792
793
794
class FlowRunNotificationPolicyFilter(PrefectBaseModel):
    """Filter FlowRunNotificationPolicies."""

    is_active: Optional[FlowRunNotificationPolicyFilterIsActive] = Field(
        default=FlowRunNotificationPolicyFilterIsActive(eq_=False),
        description="Filter criteria for `FlowRunNotificationPolicy.is_active`. ",
    )

WorkQueueFilterId

Bases: PrefectBaseModel

Filter by WorkQueue.id.

Source code in prefect/client/schemas/filters.py
797
798
799
800
801
802
803
class WorkQueueFilterId(PrefectBaseModel):
    """Filter by `WorkQueue.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None,
        description="A list of work queue ids to include",
    )

WorkQueueFilterName

Bases: PrefectBaseModel

Filter by WorkQueue.name.

Source code in prefect/client/schemas/filters.py
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
class WorkQueueFilterName(PrefectBaseModel):
    """Filter by `WorkQueue.name`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="A list of work queue names to include",
        examples=[["wq-1", "wq-2"]],
    )

    startswith_: Optional[List[str]] = Field(
        default=None,
        description=(
            "A list of case-insensitive starts-with matches. For example, "
            " passing 'marvin' will match "
            "'marvin', and 'Marvin-robot', but not 'sad-marvin'."
        ),
        examples=[["marvin", "Marvin-robot"]],
    )

WorkQueueFilter

Bases: PrefectBaseModel, OperatorMixin

Filter work queues. Only work queues matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
826
827
828
829
830
831
832
833
834
835
836
class WorkQueueFilter(PrefectBaseModel, OperatorMixin):
    """Filter work queues. Only work queues matching all criteria will be
    returned"""

    id: Optional[WorkQueueFilterId] = Field(
        default=None, description="Filter criteria for `WorkQueue.id`"
    )

    name: Optional[WorkQueueFilterName] = Field(
        default=None, description="Filter criteria for `WorkQueue.name`"
    )

WorkPoolFilterId

Bases: PrefectBaseModel

Filter by WorkPool.id.

Source code in prefect/client/schemas/filters.py
839
840
841
842
843
844
class WorkPoolFilterId(PrefectBaseModel):
    """Filter by `WorkPool.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of work pool ids to include"
    )

WorkPoolFilterName

Bases: PrefectBaseModel

Filter by WorkPool.name.

Source code in prefect/client/schemas/filters.py
847
848
849
850
851
852
class WorkPoolFilterName(PrefectBaseModel):
    """Filter by `WorkPool.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of work pool names to include"
    )

WorkPoolFilterType

Bases: PrefectBaseModel

Filter by WorkPool.type.

Source code in prefect/client/schemas/filters.py
855
856
857
858
859
860
class WorkPoolFilterType(PrefectBaseModel):
    """Filter by `WorkPool.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of work pool types to include"
    )

WorkerFilterWorkPoolId

Bases: PrefectBaseModel

Filter by Worker.worker_config_id.

Source code in prefect/client/schemas/filters.py
875
876
877
878
879
880
class WorkerFilterWorkPoolId(PrefectBaseModel):
    """Filter by `Worker.worker_config_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of work pool ids to include"
    )

WorkerFilterLastHeartbeatTime

Bases: PrefectBaseModel

Filter by Worker.last_heartbeat_time.

Source code in prefect/client/schemas/filters.py
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
class WorkerFilterLastHeartbeatTime(PrefectBaseModel):
    """Filter by `Worker.last_heartbeat_time`."""

    before_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include processes whose last heartbeat was at or before this time"
        ),
    )
    after_: Optional[DateTimeTZ] = Field(
        default=None,
        description=(
            "Only include processes whose last heartbeat was at or after this time"
        ),
    )

ArtifactFilterId

Bases: PrefectBaseModel

Filter by Artifact.id.

Source code in prefect/client/schemas/filters.py
911
912
913
914
915
916
class ArtifactFilterId(PrefectBaseModel):
    """Filter by `Artifact.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of artifact ids to include"
    )

ArtifactFilterKey

Bases: PrefectBaseModel

Filter by Artifact.key.

Source code in prefect/client/schemas/filters.py
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
class ArtifactFilterKey(PrefectBaseModel):
    """Filter by `Artifact.key`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact keys to include"
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match artifact keys against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        examples=["my-artifact-%"],
    )

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If `true`, only include artifacts with a non-null key. If `false`, "
            "only include artifacts with a null key."
        ),
    )

ArtifactFilterFlowRunId

Bases: PrefectBaseModel

Filter by Artifact.flow_run_id.

Source code in prefect/client/schemas/filters.py
944
945
946
947
948
949
class ArtifactFilterFlowRunId(PrefectBaseModel):
    """Filter by `Artifact.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

ArtifactFilterTaskRunId

Bases: PrefectBaseModel

Filter by Artifact.task_run_id.

Source code in prefect/client/schemas/filters.py
952
953
954
955
956
957
class ArtifactFilterTaskRunId(PrefectBaseModel):
    """Filter by `Artifact.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

ArtifactFilterType

Bases: PrefectBaseModel

Filter by Artifact.type.

Source code in prefect/client/schemas/filters.py
960
961
962
963
964
965
966
967
968
class ArtifactFilterType(PrefectBaseModel):
    """Filter by `Artifact.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to exclude"
    )

ArtifactFilter

Bases: PrefectBaseModel, OperatorMixin

Filter artifacts. Only artifacts matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
class ArtifactFilter(PrefectBaseModel, OperatorMixin):
    """Filter artifacts. Only artifacts matching all criteria will be returned"""

    id: Optional[ArtifactFilterId] = Field(
        default=None, description="Filter criteria for `Artifact.id`"
    )
    key: Optional[ArtifactFilterKey] = Field(
        default=None, description="Filter criteria for `Artifact.key`"
    )
    flow_run_id: Optional[ArtifactFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Artifact.flow_run_id`"
    )
    task_run_id: Optional[ArtifactFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Artifact.task_run_id`"
    )
    type: Optional[ArtifactFilterType] = Field(
        default=None, description="Filter criteria for `Artifact.type`"
    )

ArtifactCollectionFilterLatestId

Bases: PrefectBaseModel

Filter by ArtifactCollection.latest_id.

Source code in prefect/client/schemas/filters.py
991
992
993
994
995
996
class ArtifactCollectionFilterLatestId(PrefectBaseModel):
    """Filter by `ArtifactCollection.latest_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of artifact ids to include"
    )

ArtifactCollectionFilterKey

Bases: PrefectBaseModel

Filter by ArtifactCollection.key.

Source code in prefect/client/schemas/filters.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
class ArtifactCollectionFilterKey(PrefectBaseModel):
    """Filter by `ArtifactCollection.key`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact keys to include"
    )

    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match artifact keys against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        examples=["my-artifact-%"],
    )

    exists_: Optional[bool] = Field(
        default=None,
        description=(
            "If `true`, only include artifacts with a non-null key. If `false`, "
            "only include artifacts with a null key. Should return all rows in "
            "the ArtifactCollection table if specified."
        ),
    )

ArtifactCollectionFilterFlowRunId

Bases: PrefectBaseModel

Filter by ArtifactCollection.flow_run_id.

Source code in prefect/client/schemas/filters.py
1025
1026
1027
1028
1029
1030
class ArtifactCollectionFilterFlowRunId(PrefectBaseModel):
    """Filter by `ArtifactCollection.flow_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of flow run IDs to include"
    )

ArtifactCollectionFilterTaskRunId

Bases: PrefectBaseModel

Filter by ArtifactCollection.task_run_id.

Source code in prefect/client/schemas/filters.py
1033
1034
1035
1036
1037
1038
class ArtifactCollectionFilterTaskRunId(PrefectBaseModel):
    """Filter by `ArtifactCollection.task_run_id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of task run IDs to include"
    )

ArtifactCollectionFilterType

Bases: PrefectBaseModel

Filter by ArtifactCollection.type.

Source code in prefect/client/schemas/filters.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
class ArtifactCollectionFilterType(PrefectBaseModel):
    """Filter by `ArtifactCollection.type`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to include"
    )
    not_any_: Optional[List[str]] = Field(
        default=None, description="A list of artifact types to exclude"
    )

ArtifactCollectionFilter

Bases: PrefectBaseModel, OperatorMixin

Filter artifact collections. Only artifact collections matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
class ArtifactCollectionFilter(PrefectBaseModel, OperatorMixin):
    """Filter artifact collections. Only artifact collections matching all criteria will be returned"""

    latest_id: Optional[ArtifactCollectionFilterLatestId] = Field(
        default=None, description="Filter criteria for `Artifact.id`"
    )
    key: Optional[ArtifactCollectionFilterKey] = Field(
        default=None, description="Filter criteria for `Artifact.key`"
    )
    flow_run_id: Optional[ArtifactCollectionFilterFlowRunId] = Field(
        default=None, description="Filter criteria for `Artifact.flow_run_id`"
    )
    task_run_id: Optional[ArtifactCollectionFilterTaskRunId] = Field(
        default=None, description="Filter criteria for `Artifact.task_run_id`"
    )
    type: Optional[ArtifactCollectionFilterType] = Field(
        default=None, description="Filter criteria for `Artifact.type`"
    )

VariableFilterId

Bases: PrefectBaseModel

Filter by Variable.id.

Source code in prefect/client/schemas/filters.py
1072
1073
1074
1075
1076
1077
class VariableFilterId(PrefectBaseModel):
    """Filter by `Variable.id`."""

    any_: Optional[List[UUID]] = Field(
        default=None, description="A list of variable ids to include"
    )

VariableFilterName

Bases: PrefectBaseModel

Filter by Variable.name.

Source code in prefect/client/schemas/filters.py
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
class VariableFilterName(PrefectBaseModel):
    """Filter by `Variable.name`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of variables names to include"
    )
    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match variable names against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        examples=["my_variable_%"],
    )

VariableFilterValue

Bases: PrefectBaseModel

Filter by Variable.value.

Source code in prefect/client/schemas/filters.py
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
class VariableFilterValue(PrefectBaseModel):
    """Filter by `Variable.value`."""

    any_: Optional[List[str]] = Field(
        default=None, description="A list of variables value to include"
    )
    like_: Optional[str] = Field(
        default=None,
        description=(
            "A string to match variable value against. This can include "
            "SQL wildcard characters like `%` and `_`."
        ),
        examples=["my-value-%"],
    )

VariableFilterTags

Bases: PrefectBaseModel, OperatorMixin

Filter by Variable.tags.

Source code in prefect/client/schemas/filters.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
class VariableFilterTags(PrefectBaseModel, OperatorMixin):
    """Filter by `Variable.tags`."""

    all_: Optional[List[str]] = Field(
        default=None,
        examples=[["tag-1", "tag-2"]],
        description=(
            "A list of tags. Variables will be returned only if their tags are a"
            " superset of the list"
        ),
    )
    is_null_: Optional[bool] = Field(
        default=None, description="If true, only include Variables without tags"
    )

VariableFilter

Bases: PrefectBaseModel, OperatorMixin

Filter variables. Only variables matching all criteria will be returned

Source code in prefect/client/schemas/filters.py
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
class VariableFilter(PrefectBaseModel, OperatorMixin):
    """Filter variables. Only variables matching all criteria will be returned"""

    id: Optional[VariableFilterId] = Field(
        default=None, description="Filter criteria for `Variable.id`"
    )
    name: Optional[VariableFilterName] = Field(
        default=None, description="Filter criteria for `Variable.name`"
    )
    value: Optional[VariableFilterValue] = Field(
        default=None, description="Filter criteria for `Variable.value`"
    )
    tags: Optional[VariableFilterTags] = Field(
        default=None, description="Filter criteria for `Variable.tags`"
    )

prefect.client.schemas.objects

StateType

Bases: AutoEnum

Enumeration of state types.

Source code in prefect/client/schemas/objects.py
78
79
80
81
82
83
84
85
86
87
88
89
class StateType(AutoEnum):
    """Enumeration of state types."""

    SCHEDULED = AutoEnum.auto()
    PENDING = AutoEnum.auto()
    RUNNING = AutoEnum.auto()
    COMPLETED = AutoEnum.auto()
    FAILED = AutoEnum.auto()
    CANCELLED = AutoEnum.auto()
    CRASHED = AutoEnum.auto()
    PAUSED = AutoEnum.auto()
    CANCELLING = AutoEnum.auto()

WorkPoolStatus

Bases: AutoEnum

Enumeration of work pool statuses.

Source code in prefect/client/schemas/objects.py
92
93
94
95
96
97
class WorkPoolStatus(AutoEnum):
    """Enumeration of work pool statuses."""

    READY = AutoEnum.auto()
    NOT_READY = AutoEnum.auto()
    PAUSED = AutoEnum.auto()

WorkerStatus

Bases: AutoEnum

Enumeration of worker statuses.

Source code in prefect/client/schemas/objects.py
100
101
102
103
104
class WorkerStatus(AutoEnum):
    """Enumeration of worker statuses."""

    ONLINE = AutoEnum.auto()
    OFFLINE = AutoEnum.auto()

DeploymentStatus

Bases: AutoEnum

Enumeration of deployment statuses.

Source code in prefect/client/schemas/objects.py
107
108
109
110
111
class DeploymentStatus(AutoEnum):
    """Enumeration of deployment statuses."""

    READY = AutoEnum.auto()
    NOT_READY = AutoEnum.auto()

WorkQueueStatus

Bases: AutoEnum

Enumeration of work queue statuses.

Source code in prefect/client/schemas/objects.py
114
115
116
117
118
119
class WorkQueueStatus(AutoEnum):
    """Enumeration of work queue statuses."""

    READY = AutoEnum.auto()
    NOT_READY = AutoEnum.auto()
    PAUSED = AutoEnum.auto()

State

Bases: ObjectBaseModel, Generic[R]

The state of a run.

Source code in prefect/client/schemas/objects.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class State(ObjectBaseModel, Generic[R]):
    """
    The state of a run.
    """

    type: StateType
    name: Optional[str] = Field(default=None)
    timestamp: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))
    message: Optional[str] = Field(default=None, examples=["Run started"])
    state_details: StateDetails = Field(default_factory=StateDetails)
    data: Union["BaseResult[R]", "DataDocument[R]", Any] = Field(
        default=None,
    )

    @overload
    def result(self: "State[R]", raise_on_failure: bool = True) -> R:
        ...

    @overload
    def result(self: "State[R]", raise_on_failure: bool = False) -> Union[R, Exception]:
        ...

    def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
        """
        Retrieve the result attached to this state.

        Args:
            raise_on_failure: a boolean specifying whether to raise an exception
                if the state is of type `FAILED` and the underlying data is an exception
            fetch: a boolean specifying whether to resolve references to persisted
                results into data. For synchronous users, this defaults to `True`.
                For asynchronous users, this defaults to `False` for backwards
                compatibility.

        Raises:
            TypeError: If the state is failed but the result is not an exception.

        Returns:
            The result of the run

        Examples:
            >>> from prefect import flow, task
            >>> @task
            >>> def my_task(x):
            >>>     return x

            Get the result from a task future in a flow

            >>> @flow
            >>> def my_flow():
            >>>     future = my_task("hello")
            >>>     state = future.wait()
            >>>     result = state.result()
            >>>     print(result)
            >>> my_flow()
            hello

            Get the result from a flow state

            >>> @flow
            >>> def my_flow():
            >>>     return "hello"
            >>> my_flow(return_state=True).result()
            hello

            Get the result from a failed state

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
            >>> state.result()  # Raises `ValueError`

            Get the result from a failed state without erroring

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)
            >>> result = state.result(raise_on_failure=False)
            >>> print(result)
            ValueError("oh no!")


            Get the result from a flow state in an async context

            >>> @flow
            >>> async def my_flow():
            >>>     return "hello"
            >>> state = await my_flow(return_state=True)
            >>> await state.result()
            hello
        """
        from prefect.states import get_state_result

        return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

    def to_state_create(self):
        """
        Convert this state to a `StateCreate` type which can be used to set the state of
        a run in the API.

        This method will drop this state's `data` if it is not a result type. Only
        results should be sent to the API. Other data is only available locally.
        """
        from prefect.client.schemas.actions import StateCreate
        from prefect.results import BaseResult

        return StateCreate(
            type=self.type,
            name=self.name,
            message=self.message,
            data=self.data if isinstance(self.data, BaseResult) else None,
            state_details=self.state_details,
        )

    @validator("name", always=True)
    def default_name_from_type(cls, v, *, values, **kwargs):
        return get_or_create_state_name(v, values)

    @root_validator
    def default_scheduled_start_time(cls, values):
        """
        TODO: This should throw an error instead of setting a default but is out of
              scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled
              into work refactoring state initialization
        """
        if values.get("type") == StateType.SCHEDULED:
            state_details = values.setdefault(
                "state_details", cls.__fields__["state_details"].get_default()
            )
            if not state_details.scheduled_time:
                state_details.scheduled_time = pendulum.now("utc")
        return values

    def is_scheduled(self) -> bool:
        return self.type == StateType.SCHEDULED

    def is_pending(self) -> bool:
        return self.type == StateType.PENDING

    def is_running(self) -> bool:
        return self.type == StateType.RUNNING

    def is_completed(self) -> bool:
        return self.type == StateType.COMPLETED

    def is_failed(self) -> bool:
        return self.type == StateType.FAILED

    def is_crashed(self) -> bool:
        return self.type == StateType.CRASHED

    def is_cancelled(self) -> bool:
        return self.type == StateType.CANCELLED

    def is_cancelling(self) -> bool:
        return self.type == StateType.CANCELLING

    def is_final(self) -> bool:
        return self.type in {
            StateType.CANCELLED,
            StateType.FAILED,
            StateType.COMPLETED,
            StateType.CRASHED,
        }

    def is_paused(self) -> bool:
        return self.type == StateType.PAUSED

    def copy(
        self,
        *,
        update: Optional[Dict[str, Any]] = None,
        reset_fields: bool = False,
        **kwargs,
    ):
        """
        Copying API models should return an object that could be inserted into the
        database again. The 'timestamp' is reset using the default factory.
        """
        update = update or {}
        update.setdefault("timestamp", self.__fields__["timestamp"].get_default())
        return super().copy(reset_fields=reset_fields, update=update, **kwargs)

    def __repr__(self) -> str:
        """
        Generates a complete state representation appropriate for introspection
        and debugging, including the result:

        `MyCompletedState(message="my message", type=COMPLETED, result=...)`
        """
        from prefect.deprecated.data_documents import DataDocument

        if isinstance(self.data, DataDocument):
            result = self.data.decode()
        else:
            result = self.data

        display = dict(
            message=repr(self.message),
            type=str(self.type.value),
            result=repr(result),
        )

        return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})"

    def __str__(self) -> str:
        """
        Generates a simple state representation appropriate for logging:

        `MyCompletedState("my message", type=COMPLETED)`
        """

        display = []

        if self.message:
            display.append(repr(self.message))

        if self.type.value.lower() != self.name.lower():
            display.append(f"type={self.type.value}")

        return f"{self.name}({', '.join(display)})"

    def __hash__(self) -> int:
        return hash(
            (
                getattr(self.state_details, "flow_run_id", None),
                getattr(self.state_details, "task_run_id", None),
                self.timestamp,
                self.type,
            )
        )

result

Retrieve the result attached to this state.

Parameters:

Name Type Description Default
raise_on_failure bool

a boolean specifying whether to raise an exception if the state is of type FAILED and the underlying data is an exception

True
fetch Optional[bool]

a boolean specifying whether to resolve references to persisted results into data. For synchronous users, this defaults to True. For asynchronous users, this defaults to False for backwards compatibility.

None

Raises:

Type Description
TypeError

If the state is failed but the result is not an exception.

Returns:

Type Description

The result of the run

Examples:

>>> from prefect import flow, task
>>> @task
>>> def my_task(x):
>>>     return x

Get the result from a task future in a flow

>>> @flow
>>> def my_flow():
>>>     future = my_task("hello")
>>>     state = future.wait()
>>>     result = state.result()
>>>     print(result)
>>> my_flow()
hello

Get the result from a flow state

>>> @flow
>>> def my_flow():
>>>     return "hello"
>>> my_flow(return_state=True).result()
hello

Get the result from a failed state

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
>>> state.result()  # Raises `ValueError`

Get the result from a failed state without erroring

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)
>>> result = state.result(raise_on_failure=False)
>>> print(result)
ValueError("oh no!")

Get the result from a flow state in an async context

>>> @flow
>>> async def my_flow():
>>>     return "hello"
>>> state = await my_flow(return_state=True)
>>> await state.result()
hello
Source code in prefect/client/schemas/objects.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
    """
    Retrieve the result attached to this state.

    Args:
        raise_on_failure: a boolean specifying whether to raise an exception
            if the state is of type `FAILED` and the underlying data is an exception
        fetch: a boolean specifying whether to resolve references to persisted
            results into data. For synchronous users, this defaults to `True`.
            For asynchronous users, this defaults to `False` for backwards
            compatibility.

    Raises:
        TypeError: If the state is failed but the result is not an exception.

    Returns:
        The result of the run

    Examples:
        >>> from prefect import flow, task
        >>> @task
        >>> def my_task(x):
        >>>     return x

        Get the result from a task future in a flow

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task("hello")
        >>>     state = future.wait()
        >>>     result = state.result()
        >>>     print(result)
        >>> my_flow()
        hello

        Get the result from a flow state

        >>> @flow
        >>> def my_flow():
        >>>     return "hello"
        >>> my_flow(return_state=True).result()
        hello

        Get the result from a failed state

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
        >>> state.result()  # Raises `ValueError`

        Get the result from a failed state without erroring

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)
        >>> result = state.result(raise_on_failure=False)
        >>> print(result)
        ValueError("oh no!")


        Get the result from a flow state in an async context

        >>> @flow
        >>> async def my_flow():
        >>>     return "hello"
        >>> state = await my_flow(return_state=True)
        >>> await state.result()
        hello
    """
    from prefect.states import get_state_result

    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

to_state_create

Convert this state to a StateCreate type which can be used to set the state of a run in the API.

This method will drop this state's data if it is not a result type. Only results should be sent to the API. Other data is only available locally.

Source code in prefect/client/schemas/objects.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def to_state_create(self):
    """
    Convert this state to a `StateCreate` type which can be used to set the state of
    a run in the API.

    This method will drop this state's `data` if it is not a result type. Only
    results should be sent to the API. Other data is only available locally.
    """
    from prefect.client.schemas.actions import StateCreate
    from prefect.results import BaseResult

    return StateCreate(
        type=self.type,
        name=self.name,
        message=self.message,
        data=self.data if isinstance(self.data, BaseResult) else None,
        state_details=self.state_details,
    )

default_scheduled_start_time

This should throw an error instead of setting a default but is out of

scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled into work refactoring state initialization

Source code in prefect/client/schemas/objects.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@root_validator
def default_scheduled_start_time(cls, values):
    """
    TODO: This should throw an error instead of setting a default but is out of
          scope for https://github.com/PrefectHQ/orion/pull/174/ and can be rolled
          into work refactoring state initialization
    """
    if values.get("type") == StateType.SCHEDULED:
        state_details = values.setdefault(
            "state_details", cls.__fields__["state_details"].get_default()
        )
        if not state_details.scheduled_time:
            state_details.scheduled_time = pendulum.now("utc")
    return values

FlowRunPolicy

Bases: PrefectBaseModel

Defines of how a flow run should be orchestrated.

Source code in prefect/client/schemas/objects.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class FlowRunPolicy(PrefectBaseModel):
    """Defines of how a flow run should be orchestrated."""

    max_retries: int = Field(
        default=0,
        description=(
            "The maximum number of retries. Field is not used. Please use `retries`"
            " instead."
        ),
        deprecated=True,
    )
    retry_delay_seconds: float = Field(
        default=0,
        description=(
            "The delay between retries. Field is not used. Please use `retry_delay`"
            " instead."
        ),
        deprecated=True,
    )
    retries: Optional[int] = Field(default=None, description="The number of retries.")
    retry_delay: Optional[int] = Field(
        default=None, description="The delay time between retries, in seconds."
    )
    pause_keys: Optional[set] = Field(
        default_factory=set, description="Tracks pauses this run has observed."
    )
    resuming: Optional[bool] = Field(
        default=False, description="Indicates if this run is resuming from a pause."
    )

    @root_validator
    def populate_deprecated_fields(cls, values):
        return set_run_policy_deprecated_fields(values)

FlowRun

Bases: ObjectBaseModel

Source code in prefect/client/schemas/objects.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
class FlowRun(ObjectBaseModel):
    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the flow run. Defaults to a random slug if not specified."
        ),
        examples=["my-flow-run"],
    )
    flow_id: UUID = Field(default=..., description="The id of the flow being run.")
    state_id: Optional[UUID] = Field(
        default=None, description="The id of the flow run's current state."
    )
    deployment_id: Optional[UUID] = Field(
        default=None,
        description=(
            "The id of the deployment associated with this flow run, if available."
        ),
    )
    deployment_version: Optional[str] = Field(
        default=None,
        description="The version of the deployment associated with this flow run.",
        examples=["1.0"],
    )
    work_queue_name: Optional[str] = Field(
        default=None, description="The work queue that handled this flow run."
    )
    flow_version: Optional[str] = Field(
        default=None,
        description="The version of the flow executed in this flow run.",
        examples=["1.0"],
    )
    parameters: Dict[str, Any] = Field(
        default_factory=dict, description="Parameters for the flow run."
    )
    idempotency_key: Optional[str] = Field(
        default=None,
        description=(
            "An optional idempotency key for the flow run. Used to ensure the same flow"
            " run is not created multiple times."
        ),
    )
    context: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional context for the flow run.",
        examples=[{"my_var": "my_val"}],
    )
    empirical_policy: FlowRunPolicy = Field(
        default_factory=FlowRunPolicy,
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags on the flow run",
        examples=[["tag-1", "tag-2"]],
    )
    parent_task_run_id: Optional[UUID] = Field(
        default=None,
        description=(
            "If the flow run is a subflow, the id of the 'dummy' task in the parent"
            " flow used to track subflow state."
        ),
    )
    run_count: int = Field(
        default=0, description="The number of times the flow run was executed."
    )
    expected_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The flow run's expected start time.",
    )
    next_scheduled_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The next time the flow run is scheduled to start.",
    )
    start_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual start time."
    )
    end_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual end time."
    )
    total_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description=(
            "Total run time. If the flow run was executed multiple times, the time of"
            " each run will be summed."
        ),
    )
    estimated_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="A real-time estimate of the total run time.",
    )
    estimated_start_time_delta: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="The difference between actual and expected start time.",
    )
    auto_scheduled: bool = Field(
        default=False,
        description="Whether or not the flow run was automatically scheduled.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use this flow run.",
    )
    infrastructure_pid: Optional[str] = Field(
        default=None,
        description="The id of the flow run as returned by an infrastructure block.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this flow run.",
    )
    work_queue_id: Optional[UUID] = Field(
        default=None, description="The id of the run's work pool queue."
    )

    work_pool_id: Optional[UUID] = Field(
        description="The work pool with which the queue is associated."
    )
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the flow run's work pool.",
        examples=["my-work-pool"],
    )
    state: Optional[State] = Field(
        default=None,
        description="The state of the flow run.",
        examples=[State(type=StateType.COMPLETED)],
    )
    job_variables: Optional[dict] = Field(
        default=None, description="Job variables for the flow run."
    )

    # These are server-side optimizations and should not be present on client models
    # TODO: Deprecate these fields

    state_type: Optional[StateType] = Field(
        default=None, description="The type of the current flow run state."
    )
    state_name: Optional[str] = Field(
        default=None, description="The name of the current flow run state."
    )

    def __eq__(self, other: Any) -> bool:
        """
        Check for "equality" to another flow run schema

        Estimates times are rolling and will always change with repeated queries for
        a flow run so we ignore them during equality checks.
        """
        if isinstance(other, FlowRun):
            exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
            return self.dict(exclude=exclude_fields) == other.dict(
                exclude=exclude_fields
            )
        return super().__eq__(other)

    @validator("name", pre=True)
    def set_default_name(cls, name):
        return get_or_create_run_name(name)

TaskRunPolicy

Bases: PrefectBaseModel

Defines of how a task run should retry.

Source code in prefect/client/schemas/objects.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
class TaskRunPolicy(PrefectBaseModel):
    """Defines of how a task run should retry."""

    max_retries: int = Field(
        default=0,
        description=(
            "The maximum number of retries. Field is not used. Please use `retries`"
            " instead."
        ),
        deprecated=True,
    )
    retry_delay_seconds: float = Field(
        default=0,
        description=(
            "The delay between retries. Field is not used. Please use `retry_delay`"
            " instead."
        ),
        deprecated=True,
    )
    retries: Optional[int] = Field(default=None, description="The number of retries.")
    retry_delay: Union[None, int, List[int]] = Field(
        default=None,
        description="A delay time or list of delay times between retries, in seconds.",
    )
    retry_jitter_factor: Optional[float] = Field(
        default=None, description="Determines the amount a retry should jitter"
    )

    @root_validator
    def populate_deprecated_fields(cls, values):
        return set_run_policy_deprecated_fields(values)

    @validator("retry_delay")
    def validate_configured_retry_delays(cls, v):
        return list_length_50_or_less(v)

    @validator("retry_jitter_factor")
    def validate_jitter_factor(cls, v):
        return validate_not_negative(v)

TaskRunInput

Bases: PrefectBaseModel

Base class for classes that represent inputs to task runs, which could include, constants, parameters, or other task runs.

Source code in prefect/client/schemas/objects.py
611
612
613
614
615
616
617
618
619
620
621
class TaskRunInput(PrefectBaseModel):
    """
    Base class for classes that represent inputs to task runs, which
    could include, constants, parameters, or other task runs.
    """

    # freeze TaskRunInputs to allow them to be placed in sets
    class Config:
        frozen = True

    input_type: str

TaskRunResult

Bases: TaskRunInput

Represents a task run result input to another task run.

Source code in prefect/client/schemas/objects.py
624
625
626
627
628
class TaskRunResult(TaskRunInput):
    """Represents a task run result input to another task run."""

    input_type: Literal["task_run"] = "task_run"
    id: UUID

Parameter

Bases: TaskRunInput

Represents a parameter input to a task run.

Source code in prefect/client/schemas/objects.py
631
632
633
634
635
class Parameter(TaskRunInput):
    """Represents a parameter input to a task run."""

    input_type: Literal["parameter"] = "parameter"
    name: str

Constant

Bases: TaskRunInput

Represents constant input value to a task run.

Source code in prefect/client/schemas/objects.py
638
639
640
641
642
class Constant(TaskRunInput):
    """Represents constant input value to a task run."""

    input_type: Literal["constant"] = "constant"
    type: str

Workspace

Bases: PrefectBaseModel

A Prefect Cloud workspace.

Expected payload for each workspace returned by the me/workspaces route.

Source code in prefect/client/schemas/objects.py
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
class Workspace(PrefectBaseModel):
    """
    A Prefect Cloud workspace.

    Expected payload for each workspace returned by the `me/workspaces` route.
    """

    account_id: UUID = Field(..., description="The account id of the workspace.")
    account_name: str = Field(..., description="The account name.")
    account_handle: str = Field(..., description="The account's unique handle.")
    workspace_id: UUID = Field(..., description="The workspace id.")
    workspace_name: str = Field(..., description="The workspace name.")
    workspace_description: str = Field(..., description="Description of the workspace.")
    workspace_handle: str = Field(..., description="The workspace's unique handle.")

    class Config:
        extra = "ignore"

    @property
    def handle(self) -> str:
        """
        The full handle of the workspace as `account_handle` / `workspace_handle`
        """
        return self.account_handle + "/" + self.workspace_handle

    def api_url(self) -> str:
        """
        Generate the API URL for accessing this workspace
        """
        return (
            f"{PREFECT_CLOUD_API_URL.value()}"
            f"/accounts/{self.account_id}"
            f"/workspaces/{self.workspace_id}"
        )

    def ui_url(self) -> str:
        """
        Generate the UI URL for accessing this workspace
        """
        return (
            f"{PREFECT_CLOUD_UI_URL.value()}"
            f"/account/{self.account_id}"
            f"/workspace/{self.workspace_id}"
        )

    def __hash__(self):
        return hash(self.handle)

handle: str property

The full handle of the workspace as account_handle / workspace_handle

api_url

Generate the API URL for accessing this workspace

Source code in prefect/client/schemas/objects.py
778
779
780
781
782
783
784
785
786
def api_url(self) -> str:
    """
    Generate the API URL for accessing this workspace
    """
    return (
        f"{PREFECT_CLOUD_API_URL.value()}"
        f"/accounts/{self.account_id}"
        f"/workspaces/{self.workspace_id}"
    )

ui_url

Generate the UI URL for accessing this workspace

Source code in prefect/client/schemas/objects.py
788
789
790
791
792
793
794
795
796
def ui_url(self) -> str:
    """
    Generate the UI URL for accessing this workspace
    """
    return (
        f"{PREFECT_CLOUD_UI_URL.value()}"
        f"/account/{self.account_id}"
        f"/workspace/{self.workspace_id}"
    )

BlockType

Bases: ObjectBaseModel

An ORM representation of a block type

Source code in prefect/client/schemas/objects.py
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
class BlockType(ObjectBaseModel):
    """An ORM representation of a block type"""

    name: str = Field(default=..., description="A block type's name")
    slug: str = Field(default=..., description="A block type's slug")
    logo_url: Optional[HttpUrl] = Field(
        default=None, description="Web URL for the block type's logo"
    )
    documentation_url: Optional[HttpUrl] = Field(
        default=None, description="Web URL for the block type's documentation"
    )
    description: Optional[str] = Field(
        default=None,
        description="A short blurb about the corresponding block's intended use",
    )
    code_example: Optional[str] = Field(
        default=None,
        description="A code snippet demonstrating use of the corresponding block",
    )
    is_protected: bool = Field(
        default=False, description="Protected block types cannot be modified via API."
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        return raise_on_name_with_banned_characters(v)

BlockDocument

Bases: ObjectBaseModel

An ORM representation of a block document.

Source code in prefect/client/schemas/objects.py
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
class BlockDocument(ObjectBaseModel):
    """An ORM representation of a block document."""

    name: Optional[str] = Field(
        default=None,
        description=(
            "The block document's name. Not required for anonymous block documents."
        ),
    )
    data: Dict[str, Any] = Field(
        default_factory=dict, description="The block document's data"
    )
    block_schema_id: UUID = Field(default=..., description="A block schema ID")
    block_schema: Optional[BlockSchema] = Field(
        default=None, description="The associated block schema"
    )
    block_type_id: UUID = Field(default=..., description="A block type ID")
    block_type_name: Optional[str] = Field(None, description="A block type name")
    block_type: Optional[BlockType] = Field(
        default=None, description="The associated block type"
    )
    block_document_references: Dict[str, Dict[str, Any]] = Field(
        default_factory=dict, description="Record of the block document's references"
    )
    is_anonymous: bool = Field(
        default=False,
        description=(
            "Whether the block is anonymous (anonymous blocks are usually created by"
            " Prefect automatically)"
        ),
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        # the BlockDocumentCreate subclass allows name=None
        # and will inherit this validator
        return raise_on_name_with_banned_characters(v)

    @root_validator
    def validate_name_is_present_if_not_anonymous(cls, values):
        return validate_name_present_on_nonanonymous_blocks(values)

Flow

Bases: ObjectBaseModel

An ORM representation of flow data.

Source code in prefect/client/schemas/objects.py
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
class Flow(ObjectBaseModel):
    """An ORM representation of flow data."""

    name: str = Field(
        default=..., description="The name of the flow", examples=["my-flow"]
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of flow tags",
        examples=[["tag-1", "tag-2"]],
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        return raise_on_name_with_banned_characters(v)

Deployment

Bases: DeprecatedInfraOverridesField, ObjectBaseModel

An ORM representation of deployment data.

Source code in prefect/client/schemas/objects.py
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
class Deployment(DeprecatedInfraOverridesField, ObjectBaseModel):
    """An ORM representation of deployment data."""

    name: str = Field(default=..., description="The name of the deployment.")
    version: Optional[str] = Field(
        default=None, description="An optional version for the deployment."
    )
    description: Optional[str] = Field(
        default=None, description="A description for the deployment."
    )
    flow_id: UUID = Field(
        default=..., description="The flow id associated with the deployment."
    )
    schedule: Optional[SCHEDULE_TYPES] = Field(
        default=None, description="A schedule for the deployment."
    )
    is_schedule_active: bool = Field(
        default=True, description="Whether or not the deployment schedule is active."
    )
    paused: bool = Field(
        default=False, description="Whether or not the deployment is paused."
    )
    schedules: List[DeploymentSchedule] = Field(
        default_factory=list, description="A list of schedules for the deployment."
    )
    job_variables: Dict[str, Any] = Field(
        default_factory=dict,
        description="Overrides to apply to flow run infrastructure at runtime.",
    )
    parameters: Dict[str, Any] = Field(
        default_factory=dict,
        description="Parameters for flow runs scheduled by the deployment.",
    )
    pull_steps: Optional[List[dict]] = Field(
        default=None,
        description="Pull steps for cloning and running this deployment.",
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags for the deployment",
        examples=[["tag-1", "tag-2"]],
    )
    work_queue_name: Optional[str] = Field(
        default=None,
        description=(
            "The work queue for the deployment. If no work queue is set, work will not"
            " be scheduled."
        ),
    )
    last_polled: Optional[DateTimeTZ] = Field(
        default=None,
        description="The last time the deployment was polled for status updates.",
    )
    parameter_openapi_schema: Optional[Dict[str, Any]] = Field(
        default=None,
        description="The parameter schema of the flow, including defaults.",
    )
    path: Optional[str] = Field(
        default=None,
        description=(
            "The path to the working directory for the workflow, relative to remote"
            " storage or an absolute path."
        ),
    )
    entrypoint: Optional[str] = Field(
        default=None,
        description=(
            "The path to the entrypoint for the workflow, relative to the `path`."
        ),
    )
    manifest_path: Optional[str] = Field(
        default=None,
        description=(
            "The path to the flow's manifest file, relative to the chosen storage."
        ),
    )
    storage_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining storage used for this flow.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use for flow runs.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this deployment.",
    )
    updated_by: Optional[UpdatedBy] = Field(
        default=None,
        description="Optional information about the updater of this deployment.",
    )
    work_queue_id: UUID = Field(
        default=None,
        description=(
            "The id of the work pool queue to which this deployment is assigned."
        ),
    )
    enforce_parameter_schema: bool = Field(
        default=False,
        description=(
            "Whether or not the deployment should enforce the parameter schema."
        ),
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        return raise_on_name_with_banned_characters(v)

ConcurrencyLimit

Bases: ObjectBaseModel

An ORM representation of a concurrency limit.

Source code in prefect/client/schemas/objects.py
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
class ConcurrencyLimit(ObjectBaseModel):
    """An ORM representation of a concurrency limit."""

    tag: str = Field(
        default=..., description="A tag the concurrency limit is applied to."
    )
    concurrency_limit: int = Field(default=..., description="The concurrency limit.")
    active_slots: List[UUID] = Field(
        default_factory=list,
        description="A list of active run ids using a concurrency slot",
    )

BlockSchema

Bases: ObjectBaseModel

An ORM representation of a block schema.

Source code in prefect/client/schemas/objects.py
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
class BlockSchema(ObjectBaseModel):
    """An ORM representation of a block schema."""

    checksum: str = Field(default=..., description="The block schema's unique checksum")
    fields: Dict[str, Any] = Field(
        default_factory=dict, description="The block schema's field schema"
    )
    block_type_id: Optional[UUID] = Field(default=..., description="A block type ID")
    block_type: Optional[BlockType] = Field(
        default=None, description="The associated block type"
    )
    capabilities: List[str] = Field(
        default_factory=list,
        description="A list of Block capabilities",
    )
    version: str = Field(
        default=DEFAULT_BLOCK_SCHEMA_VERSION,
        description="Human readable identifier for the block schema",
    )

BlockSchemaReference

Bases: ObjectBaseModel

An ORM representation of a block schema reference.

Source code in prefect/client/schemas/objects.py
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
class BlockSchemaReference(ObjectBaseModel):
    """An ORM representation of a block schema reference."""

    parent_block_schema_id: UUID = Field(
        default=..., description="ID of block schema the reference is nested within"
    )
    parent_block_schema: Optional[BlockSchema] = Field(
        default=None, description="The block schema the reference is nested within"
    )
    reference_block_schema_id: UUID = Field(
        default=..., description="ID of the nested block schema"
    )
    reference_block_schema: Optional[BlockSchema] = Field(
        default=None, description="The nested block schema"
    )
    name: str = Field(
        default=..., description="The name that the reference is nested under"
    )

BlockDocumentReference

Bases: ObjectBaseModel

An ORM representation of a block document reference.

Source code in prefect/client/schemas/objects.py
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
class BlockDocumentReference(ObjectBaseModel):
    """An ORM representation of a block document reference."""

    parent_block_document_id: UUID = Field(
        default=..., description="ID of block document the reference is nested within"
    )
    parent_block_document: Optional[BlockDocument] = Field(
        default=None, description="The block document the reference is nested within"
    )
    reference_block_document_id: UUID = Field(
        default=..., description="ID of the nested block document"
    )
    reference_block_document: Optional[BlockDocument] = Field(
        default=None, description="The nested block document"
    )
    name: str = Field(
        default=..., description="The name that the reference is nested under"
    )

    @root_validator
    def validate_parent_and_ref_are_different(cls, values):
        return validate_parent_and_ref_diff(values)

SavedSearchFilter

Bases: PrefectBaseModel

A filter for a saved search model. Intended for use by the Prefect UI.

Source code in prefect/client/schemas/objects.py
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
class SavedSearchFilter(PrefectBaseModel):
    """A filter for a saved search model. Intended for use by the Prefect UI."""

    object: str = Field(default=..., description="The object over which to filter.")
    property: str = Field(
        default=..., description="The property of the object on which to filter."
    )
    type: str = Field(default=..., description="The type of the property.")
    operation: str = Field(
        default=...,
        description="The operator to apply to the object. For example, `equals`.",
    )
    value: Any = Field(
        default=..., description="A JSON-compatible value for the filter."
    )

SavedSearch

Bases: ObjectBaseModel

An ORM representation of saved search data. Represents a set of filter criteria.

Source code in prefect/client/schemas/objects.py
1145
1146
1147
1148
1149
1150
1151
class SavedSearch(ObjectBaseModel):
    """An ORM representation of saved search data. Represents a set of filter criteria."""

    name: str = Field(default=..., description="The name of the saved search.")
    filters: List[SavedSearchFilter] = Field(
        default_factory=list, description="The filter set for the saved search."
    )

Log

Bases: ObjectBaseModel

An ORM representation of log data.

Source code in prefect/client/schemas/objects.py
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
class Log(ObjectBaseModel):
    """An ORM representation of log data."""

    name: str = Field(default=..., description="The logger name.")
    level: int = Field(default=..., description="The log level.")
    message: str = Field(default=..., description="The log message.")
    timestamp: DateTimeTZ = Field(default=..., description="The log timestamp.")
    flow_run_id: Optional[UUID] = Field(
        default=None, description="The flow run ID associated with the log."
    )
    task_run_id: Optional[UUID] = Field(
        default=None, description="The task run ID associated with the log."
    )

QueueFilter

Bases: PrefectBaseModel

Filter criteria definition for a work queue.

Source code in prefect/client/schemas/objects.py
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
class QueueFilter(PrefectBaseModel):
    """Filter criteria definition for a work queue."""

    tags: Optional[List[str]] = Field(
        default=None,
        description="Only include flow runs with these tags in the work queue.",
    )
    deployment_ids: Optional[List[UUID]] = Field(
        default=None,
        description="Only include flow runs from these deployments in the work queue.",
    )

WorkQueue

Bases: ObjectBaseModel

An ORM representation of a work queue

Source code in prefect/client/schemas/objects.py
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
class WorkQueue(ObjectBaseModel):
    """An ORM representation of a work queue"""

    name: str = Field(default=..., description="The name of the work queue.")
    description: Optional[str] = Field(
        default="", description="An optional description for the work queue."
    )
    is_paused: bool = Field(
        default=False, description="Whether or not the work queue is paused."
    )
    concurrency_limit: Optional[NonNegativeInteger] = Field(
        default=None, description="An optional concurrency limit for the work queue."
    )
    priority: PositiveInteger = Field(
        default=1,
        description=(
            "The queue's priority. Lower values are higher priority (1 is the highest)."
        ),
    )
    work_pool_name: Optional[str] = Field(default=None)
    # Will be required after a future migration
    work_pool_id: Optional[UUID] = Field(
        description="The work pool with which the queue is associated."
    )
    filter: Optional[QueueFilter] = Field(
        default=None,
        description="DEPRECATED: Filter criteria for the work queue.",
        deprecated=True,
    )
    last_polled: Optional[DateTimeTZ] = Field(
        default=None, description="The last time an agent polled this queue for work."
    )
    status: Optional[WorkQueueStatus] = Field(
        default=None, description="The queue status."
    )

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        return raise_on_name_with_banned_characters(v)

WorkQueueHealthPolicy

Bases: PrefectBaseModel

Source code in prefect/client/schemas/objects.py
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
class WorkQueueHealthPolicy(PrefectBaseModel):
    maximum_late_runs: Optional[int] = Field(
        default=0,
        description=(
            "The maximum number of late runs in the work queue before it is deemed"
            " unhealthy. Defaults to `0`."
        ),
    )
    maximum_seconds_since_last_polled: Optional[int] = Field(
        default=60,
        description=(
            "The maximum number of time in seconds elapsed since work queue has been"
            " polled before it is deemed unhealthy. Defaults to `60`."
        ),
    )

    def evaluate_health_status(
        self, late_runs_count: int, last_polled: Optional[DateTimeTZ] = None
    ) -> bool:
        """
        Given empirical information about the state of the work queue, evaluate its health status.

        Args:
            late_runs: the count of late runs for the work queue.
            last_polled: the last time the work queue was polled, if available.

        Returns:
            bool: whether or not the work queue is healthy.
        """
        healthy = True
        if (
            self.maximum_late_runs is not None
            and late_runs_count > self.maximum_late_runs
        ):
            healthy = False

        if self.maximum_seconds_since_last_polled is not None:
            if (
                last_polled is None
                or pendulum.now("UTC").diff(last_polled).in_seconds()
                > self.maximum_seconds_since_last_polled
            ):
                healthy = False

        return healthy

evaluate_health_status

Given empirical information about the state of the work queue, evaluate its health status.

Parameters:

Name Type Description Default
late_runs

the count of late runs for the work queue.

required
last_polled Optional[DateTimeTZ]

the last time the work queue was polled, if available.

None

Returns:

Name Type Description
bool bool

whether or not the work queue is healthy.

Source code in prefect/client/schemas/objects.py
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
def evaluate_health_status(
    self, late_runs_count: int, last_polled: Optional[DateTimeTZ] = None
) -> bool:
    """
    Given empirical information about the state of the work queue, evaluate its health status.

    Args:
        late_runs: the count of late runs for the work queue.
        last_polled: the last time the work queue was polled, if available.

    Returns:
        bool: whether or not the work queue is healthy.
    """
    healthy = True
    if (
        self.maximum_late_runs is not None
        and late_runs_count > self.maximum_late_runs
    ):
        healthy = False

    if self.maximum_seconds_since_last_polled is not None:
        if (
            last_polled is None
            or pendulum.now("UTC").diff(last_polled).in_seconds()
            > self.maximum_seconds_since_last_polled
        ):
            healthy = False

    return healthy

FlowRunNotificationPolicy

Bases: ObjectBaseModel

An ORM representation of a flow run notification.

Source code in prefect/client/schemas/objects.py
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
class FlowRunNotificationPolicy(ObjectBaseModel):
    """An ORM representation of a flow run notification."""

    is_active: bool = Field(
        default=True, description="Whether the policy is currently active"
    )
    state_names: List[str] = Field(
        default=..., description="The flow run states that trigger notifications"
    )
    tags: List[str] = Field(
        default=...,
        description="The flow run tags that trigger notifications (set [] to disable)",
    )
    block_document_id: UUID = Field(
        default=..., description="The block document ID used for sending notifications"
    )
    message_template: Optional[str] = Field(
        default=None,
        description=(
            "A templatable notification message. Use {braces} to add variables."
            " Valid variables include:"
            f" {listrepr(sorted(FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS), sep=', ')}"
        ),
        examples=[
            "Flow run {flow_run_name} with id {flow_run_id} entered state"
            " {flow_run_state_name}."
        ],
    )

    @validator("message_template")
    def validate_message_template_variables(cls, v):
        return validate_message_template_variables(v)

Agent

Bases: ObjectBaseModel

An ORM representation of an agent

Source code in prefect/client/schemas/objects.py
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
class Agent(ObjectBaseModel):
    """An ORM representation of an agent"""

    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the agent. If a name is not provided, it will be"
            " auto-generated."
        ),
    )
    work_queue_id: UUID = Field(
        default=..., description="The work queue with which the agent is associated."
    )
    last_activity_time: Optional[DateTimeTZ] = Field(
        default=None, description="The last time this agent polled for work."
    )

WorkPool

Bases: ObjectBaseModel

An ORM representation of a work pool

Source code in prefect/client/schemas/objects.py
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
class WorkPool(ObjectBaseModel):
    """An ORM representation of a work pool"""

    name: str = Field(
        description="The name of the work pool.",
    )
    description: Optional[str] = Field(
        default=None, description="A description of the work pool."
    )
    type: str = Field(description="The work pool type.")
    base_job_template: Dict[str, Any] = Field(
        default_factory=dict, description="The work pool's base job template."
    )
    is_paused: bool = Field(
        default=False,
        description="Pausing the work pool stops the delivery of all work.",
    )
    concurrency_limit: Optional[NonNegativeInteger] = Field(
        default=None, description="A concurrency limit for the work pool."
    )
    status: Optional[WorkPoolStatus] = Field(
        default=None, description="The current status of the work pool."
    )

    # this required field has a default of None so that the custom validator
    # below will be called and produce a more helpful error message
    default_queue_id: UUID = Field(
        None, description="The id of the pool's default queue."
    )

    @property
    def is_push_pool(self) -> bool:
        return self.type.endswith(":push")

    @property
    def is_managed_pool(self) -> bool:
        return self.type.endswith(":managed")

    @validator("name", check_fields=False)
    def validate_name_characters(cls, v):
        return raise_on_name_with_banned_characters(v)

    @validator("default_queue_id", always=True)
    def helpful_error_for_missing_default_queue_id(cls, v):
        return validate_default_queue_id_not_none(v)

Worker

Bases: ObjectBaseModel

An ORM representation of a worker

Source code in prefect/client/schemas/objects.py
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
class Worker(ObjectBaseModel):
    """An ORM representation of a worker"""

    name: str = Field(description="The name of the worker.")
    work_pool_id: UUID = Field(
        description="The work pool with which the queue is associated."
    )
    last_heartbeat_time: datetime.datetime = Field(
        None, description="The last time the worker process sent a heartbeat."
    )
    heartbeat_interval_seconds: Optional[int] = Field(
        default=None,
        description=(
            "The number of seconds to expect between heartbeats sent by the worker."
        ),
    )
    status: WorkerStatus = Field(
        WorkerStatus.OFFLINE,
        description="Current status of the worker.",
    )

FlowRunInput

Bases: ObjectBaseModel

Source code in prefect/client/schemas/objects.py
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
class FlowRunInput(ObjectBaseModel):
    flow_run_id: UUID = Field(description="The flow run ID associated with the input.")
    key: str = Field(description="The key of the input.")
    value: str = Field(description="The value of the input.")
    sender: Optional[str] = Field(description="The sender of the input.")

    @property
    def decoded_value(self) -> Any:
        """
        Decode the value of the input.

        Returns:
            Any: the decoded value
        """
        return orjson.loads(self.value)

    @validator("key", check_fields=False)
    def validate_name_characters(cls, v):
        raise_on_name_alphanumeric_dashes_only(v)
        return v

decoded_value: Any property

Decode the value of the input.

Returns:

Name Type Description
Any Any

the decoded value

GlobalConcurrencyLimit

Bases: ObjectBaseModel

An ORM representation of a global concurrency limit

Source code in prefect/client/schemas/objects.py
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
class GlobalConcurrencyLimit(ObjectBaseModel):
    """An ORM representation of a global concurrency limit"""

    name: str = Field(description="The name of the global concurrency limit.")
    limit: int = Field(
        description=(
            "The maximum number of slots that can be occupied on this concurrency"
            " limit."
        )
    )
    active: Optional[bool] = Field(
        default=True,
        description="Whether or not the concurrency limit is in an active state.",
    )
    active_slots: Optional[int] = Field(
        default=0,
        description="Number of tasks currently using a concurrency slot.",
    )
    slot_decay_per_second: Optional[float] = Field(
        default=0.0,
        description=(
            "Controls the rate at which slots are released when the concurrency limit"
            " is used as a rate limit."
        ),
    )

prefect.client.schemas.responses

SetStateStatus

Bases: AutoEnum

Enumerates return statuses for setting run states.

Source code in prefect/client/schemas/responses.py
25
26
27
28
29
30
31
class SetStateStatus(AutoEnum):
    """Enumerates return statuses for setting run states."""

    ACCEPT = AutoEnum.auto()
    REJECT = AutoEnum.auto()
    ABORT = AutoEnum.auto()
    WAIT = AutoEnum.auto()

StateAcceptDetails

Bases: PrefectBaseModel

Details associated with an ACCEPT state transition.

Source code in prefect/client/schemas/responses.py
34
35
36
37
38
39
40
41
42
43
class StateAcceptDetails(PrefectBaseModel):
    """Details associated with an ACCEPT state transition."""

    type: Literal["accept_details"] = Field(
        default="accept_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )

StateRejectDetails

Bases: PrefectBaseModel

Details associated with a REJECT state transition.

Source code in prefect/client/schemas/responses.py
46
47
48
49
50
51
52
53
54
55
56
57
58
class StateRejectDetails(PrefectBaseModel):
    """Details associated with a REJECT state transition."""

    type: Literal["reject_details"] = Field(
        default="reject_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was rejected."
    )

StateAbortDetails

Bases: PrefectBaseModel

Details associated with an ABORT state transition.

Source code in prefect/client/schemas/responses.py
61
62
63
64
65
66
67
68
69
70
71
72
73
class StateAbortDetails(PrefectBaseModel):
    """Details associated with an ABORT state transition."""

    type: Literal["abort_details"] = Field(
        default="abort_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was aborted."
    )

StateWaitDetails

Bases: PrefectBaseModel

Details associated with a WAIT state transition.

Source code in prefect/client/schemas/responses.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class StateWaitDetails(PrefectBaseModel):
    """Details associated with a WAIT state transition."""

    type: Literal["wait_details"] = Field(
        default="wait_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    delay_seconds: int = Field(
        default=...,
        description=(
            "The length of time in seconds the client should wait before transitioning"
            " states."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition should wait."
    )

HistoryResponseState

Bases: PrefectBaseModel

Represents a single state's history over an interval.

Source code in prefect/client/schemas/responses.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class HistoryResponseState(PrefectBaseModel):
    """Represents a single state's history over an interval."""

    state_type: objects.StateType = Field(default=..., description="The state type.")
    state_name: str = Field(default=..., description="The state name.")
    count_runs: int = Field(
        default=...,
        description="The number of runs in the specified state during the interval.",
    )
    sum_estimated_run_time: datetime.timedelta = Field(
        default=...,
        description="The total estimated run time of all runs during the interval.",
    )
    sum_estimated_lateness: datetime.timedelta = Field(
        default=...,
        description=(
            "The sum of differences between actual and expected start time during the"
            " interval."
        ),
    )

HistoryResponse

Bases: PrefectBaseModel

Represents a history of aggregation states over an interval

Source code in prefect/client/schemas/responses.py
120
121
122
123
124
125
126
127
128
129
130
131
class HistoryResponse(PrefectBaseModel):
    """Represents a history of aggregation states over an interval"""

    interval_start: DateTimeTZ = Field(
        default=..., description="The start date of the interval."
    )
    interval_end: DateTimeTZ = Field(
        default=..., description="The end date of the interval."
    )
    states: List[HistoryResponseState] = Field(
        default=..., description="A list of state histories during the interval."
    )

OrchestrationResult

Bases: PrefectBaseModel

A container for the output of state orchestration.

Source code in prefect/client/schemas/responses.py
139
140
141
142
143
144
145
146
class OrchestrationResult(PrefectBaseModel):
    """
    A container for the output of state orchestration.
    """

    state: Optional[objects.State]
    status: SetStateStatus
    details: StateResponseDetails

FlowRunResponse

Bases: ObjectBaseModel

Source code in prefect/client/schemas/responses.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
class FlowRunResponse(ObjectBaseModel):
    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the flow run. Defaults to a random slug if not specified."
        ),
        examples=["my-flow-run"],
    )
    flow_id: UUID = Field(default=..., description="The id of the flow being run.")
    state_id: Optional[UUID] = Field(
        default=None, description="The id of the flow run's current state."
    )
    deployment_id: Optional[UUID] = Field(
        default=None,
        description=(
            "The id of the deployment associated with this flow run, if available."
        ),
    )
    deployment_version: Optional[str] = Field(
        default=None,
        description="The version of the deployment associated with this flow run.",
        examples=["1.0"],
    )
    work_queue_name: Optional[str] = Field(
        default=None, description="The work queue that handled this flow run."
    )
    flow_version: Optional[str] = Field(
        default=None,
        description="The version of the flow executed in this flow run.",
        examples=["1.0"],
    )
    parameters: Dict[str, Any] = Field(
        default_factory=dict, description="Parameters for the flow run."
    )
    idempotency_key: Optional[str] = Field(
        default=None,
        description=(
            "An optional idempotency key for the flow run. Used to ensure the same flow"
            " run is not created multiple times."
        ),
    )
    context: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional context for the flow run.",
        examples=[{"my_var": "my_val"}],
    )
    empirical_policy: objects.FlowRunPolicy = Field(
        default_factory=objects.FlowRunPolicy,
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags on the flow run",
        examples=[["tag-1", "tag-2"]],
    )
    parent_task_run_id: Optional[UUID] = Field(
        default=None,
        description=(
            "If the flow run is a subflow, the id of the 'dummy' task in the parent"
            " flow used to track subflow state."
        ),
    )
    run_count: int = Field(
        default=0, description="The number of times the flow run was executed."
    )
    expected_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The flow run's expected start time.",
    )
    next_scheduled_start_time: Optional[DateTimeTZ] = Field(
        default=None,
        description="The next time the flow run is scheduled to start.",
    )
    start_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual start time."
    )
    end_time: Optional[DateTimeTZ] = Field(
        default=None, description="The actual end time."
    )
    total_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description=(
            "Total run time. If the flow run was executed multiple times, the time of"
            " each run will be summed."
        ),
    )
    estimated_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="A real-time estimate of the total run time.",
    )
    estimated_start_time_delta: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="The difference between actual and expected start time.",
    )
    auto_scheduled: bool = Field(
        default=False,
        description="Whether or not the flow run was automatically scheduled.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use this flow run.",
    )
    infrastructure_pid: Optional[str] = Field(
        default=None,
        description="The id of the flow run as returned by an infrastructure block.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this flow run.",
    )
    work_queue_id: Optional[UUID] = Field(
        default=None, description="The id of the run's work pool queue."
    )

    work_pool_id: Optional[UUID] = Field(
        description="The work pool with which the queue is associated."
    )
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the flow run's work pool.",
        examples=["my-work-pool"],
    )
    state: Optional[objects.State] = Field(
        default=None,
        description="The state of the flow run.",
        examples=[objects.State(type=objects.StateType.COMPLETED)],
    )
    job_variables: Optional[dict] = Field(
        default=None, description="Job variables for the flow run."
    )

    # These are server-side optimizations and should not be present on client models
    # TODO: Deprecate these fields

    state_type: Optional[objects.StateType] = Field(
        default=None, description="The type of the current flow run state."
    )
    state_name: Optional[str] = Field(
        default=None, description="The name of the current flow run state."
    )

    def __eq__(self, other: Any) -> bool:
        """
        Check for "equality" to another flow run schema

        Estimates times are rolling and will always change with repeated queries for
        a flow run so we ignore them during equality checks.
        """
        if isinstance(other, objects.FlowRun):
            exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
            return self.dict(exclude=exclude_fields) == other.dict(
                exclude=exclude_fields
            )
        return super().__eq__(other)

prefect.client.schemas.schedules

Schedule schemas

IntervalSchedule

Bases: PrefectBaseModel

A schedule formed by adding interval increments to an anchor_date. If no anchor_date is supplied, the current UTC time is used. If a timezone-naive datetime is provided for anchor_date, it is assumed to be in the schedule's timezone (or UTC). Even if supplied with an IANA timezone, anchor dates are always stored as UTC offsets, so a timezone can be provided to determine localization behaviors like DST boundary handling. If none is provided it will be inferred from the anchor date.

NOTE: If the IntervalSchedule anchor_date or timezone is provided in a DST-observing timezone, then the schedule will adjust itself appropriately. Intervals greater than 24 hours will follow DST conventions, while intervals of less than 24 hours will follow UTC intervals. For example, an hourly schedule will fire every UTC hour, even across DST boundaries. When clocks are set back, this will result in two runs that appear to both be scheduled for 1am local time, even though they are an hour apart in UTC time. For longer intervals, like a daily schedule, the interval schedule will adjust for DST boundaries so that the clock-hour remains constant. This means that a daily schedule that always fires at 9am will observe DST and continue to fire at 9am in the local time zone.

Parameters:

Name Type Description Default
interval timedelta

an interval to schedule on

required
anchor_date DateTimeTZ

an anchor date to schedule increments against; if not provided, the current timestamp will be used

required
timezone str

a valid timezone string

required
Source code in prefect/client/schemas/schedules.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class IntervalSchedule(PrefectBaseModel):
    """
    A schedule formed by adding `interval` increments to an `anchor_date`. If no
    `anchor_date` is supplied, the current UTC time is used.  If a
    timezone-naive datetime is provided for `anchor_date`, it is assumed to be
    in the schedule's timezone (or UTC). Even if supplied with an IANA timezone,
    anchor dates are always stored as UTC offsets, so a `timezone` can be
    provided to determine localization behaviors like DST boundary handling. If
    none is provided it will be inferred from the anchor date.

    NOTE: If the `IntervalSchedule` `anchor_date` or `timezone` is provided in a
    DST-observing timezone, then the schedule will adjust itself appropriately.
    Intervals greater than 24 hours will follow DST conventions, while intervals
    of less than 24 hours will follow UTC intervals. For example, an hourly
    schedule will fire every UTC hour, even across DST boundaries. When clocks
    are set back, this will result in two runs that *appear* to both be
    scheduled for 1am local time, even though they are an hour apart in UTC
    time. For longer intervals, like a daily schedule, the interval schedule
    will adjust for DST boundaries so that the clock-hour remains constant. This
    means that a daily schedule that always fires at 9am will observe DST and
    continue to fire at 9am in the local time zone.

    Args:
        interval (datetime.timedelta): an interval to schedule on
        anchor_date (DateTimeTZ, optional): an anchor date to schedule increments against;
            if not provided, the current timestamp will be used
        timezone (str, optional): a valid timezone string
    """

    class Config:
        extra = "forbid"
        exclude_none = True

    interval: PositiveDuration
    anchor_date: DateTimeTZ = None
    timezone: Optional[str] = Field(default=None, examples=["America/New_York"])

    @validator("anchor_date", always=True)
    def validate_anchor_date(cls, v):
        return default_anchor_date(v)

    @validator("timezone", always=True)
    def validate_default_timezone(cls, v, values):
        return default_timezone(v, values=values)

CronSchedule

Bases: PrefectBaseModel

Cron schedule

NOTE: If the timezone is a DST-observing one, then the schedule will adjust itself appropriately. Cron's rules for DST are based on schedule times, not intervals. This means that an hourly cron schedule will fire on every new schedule hour, not every elapsed hour; for example, when clocks are set back this will result in a two-hour pause as the schedule will fire the first time 1am is reached and the first time 2am is reached, 120 minutes later. Longer schedules, such as one that fires at 9am every morning, will automatically adjust for DST.

Parameters:

Name Type Description Default
cron str

a valid cron string

required
timezone str

a valid timezone string in IANA tzdata format (for example, America/New_York).

required
day_or bool

Control how croniter handles day and day_of_week entries. Defaults to True, matching cron which connects those values using OR. If the switch is set to False, the values are connected using AND. This behaves like fcron and enables you to e.g. define a job that executes each 2nd friday of a month by setting the days of month and the weekday.

required
Source code in prefect/client/schemas/schedules.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class CronSchedule(PrefectBaseModel):
    """
    Cron schedule

    NOTE: If the timezone is a DST-observing one, then the schedule will adjust
    itself appropriately. Cron's rules for DST are based on schedule times, not
    intervals. This means that an hourly cron schedule will fire on every new
    schedule hour, not every elapsed hour; for example, when clocks are set back
    this will result in a two-hour pause as the schedule will fire *the first
    time* 1am is reached and *the first time* 2am is reached, 120 minutes later.
    Longer schedules, such as one that fires at 9am every morning, will
    automatically adjust for DST.

    Args:
        cron (str): a valid cron string
        timezone (str): a valid timezone string in IANA tzdata format (for example,
            America/New_York).
        day_or (bool, optional): Control how croniter handles `day` and `day_of_week`
            entries. Defaults to True, matching cron which connects those values using
            OR. If the switch is set to False, the values are connected using AND. This
            behaves like fcron and enables you to e.g. define a job that executes each
            2nd friday of a month by setting the days of month and the weekday.

    """

    class Config:
        extra = "forbid"

    cron: str = Field(default=..., examples=["0 0 * * *"])
    timezone: Optional[str] = Field(default=None, examples=["America/New_York"])
    day_or: bool = Field(
        default=True,
        description=(
            "Control croniter behavior for handling day and day_of_week entries."
        ),
    )

    @validator("timezone")
    def valid_timezone(cls, v):
        return default_timezone(v)

    @validator("cron")
    def valid_cron_string(cls, v):
        return validate_cron_string(v)

RRuleSchedule

Bases: PrefectBaseModel

RRule schedule, based on the iCalendar standard (RFC 5545) as implemented in dateutils.rrule.

RRules are appropriate for any kind of calendar-date manipulation, including irregular intervals, repetition, exclusions, week day or day-of-month adjustments, and more.

Note that as a calendar-oriented standard, RRuleSchedules are sensitive to to the initial timezone provided. A 9am daily schedule with a daylight saving time-aware start date will maintain a local 9am time through DST boundaries; a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

Parameters:

Name Type Description Default
rrule str

a valid RRule string

required
timezone str

a valid timezone string

required
Source code in prefect/client/schemas/schedules.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class RRuleSchedule(PrefectBaseModel):
    """
    RRule schedule, based on the iCalendar standard
    ([RFC 5545](https://datatracker.ietf.org/doc/html/rfc5545)) as
    implemented in `dateutils.rrule`.

    RRules are appropriate for any kind of calendar-date manipulation, including
    irregular intervals, repetition, exclusions, week day or day-of-month
    adjustments, and more.

    Note that as a calendar-oriented standard, `RRuleSchedules` are sensitive to
    to the initial timezone provided. A 9am daily schedule with a daylight saving
    time-aware start date will maintain a local 9am time through DST boundaries;
    a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

    Args:
        rrule (str): a valid RRule string
        timezone (str, optional): a valid timezone string
    """

    class Config:
        extra = "forbid"

    rrule: str
    timezone: Optional[str] = Field(default=None, examples=["America/New_York"])

    @validator("rrule")
    def validate_rrule_str(cls, v):
        return validate_rrule_string(v)

    @classmethod
    def from_rrule(cls, rrule: dateutil.rrule.rrule):
        if isinstance(rrule, dateutil.rrule.rrule):
            if rrule._dtstart.tzinfo is not None:
                timezone = rrule._dtstart.tzinfo.name
            else:
                timezone = "UTC"
            return RRuleSchedule(rrule=str(rrule), timezone=timezone)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            dtstarts = [rr._dtstart for rr in rrule._rrule if rr._dtstart is not None]
            unique_dstarts = set(pendulum.instance(d).in_tz("UTC") for d in dtstarts)
            unique_timezones = set(d.tzinfo for d in dtstarts if d.tzinfo is not None)

            if len(unique_timezones) > 1:
                raise ValueError(
                    f"rruleset has too many dtstart timezones: {unique_timezones}"
                )

            if len(unique_dstarts) > 1:
                raise ValueError(f"rruleset has too many dtstarts: {unique_dstarts}")

            if unique_dstarts and unique_timezones:
                timezone = dtstarts[0].tzinfo.name
            else:
                timezone = "UTC"

            rruleset_string = ""
            if rrule._rrule:
                rruleset_string += "\n".join(str(r) for r in rrule._rrule)
            if rrule._exrule:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "\n".join(str(r) for r in rrule._exrule).replace(
                    "RRULE", "EXRULE"
                )
            if rrule._rdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "RDATE:" + ",".join(
                    rd.strftime("%Y%m%dT%H%M%SZ") for rd in rrule._rdate
                )
            if rrule._exdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "EXDATE:" + ",".join(
                    exd.strftime("%Y%m%dT%H%M%SZ") for exd in rrule._exdate
                )
            return RRuleSchedule(rrule=rruleset_string, timezone=timezone)
        else:
            raise ValueError(f"Invalid RRule object: {rrule}")

    def to_rrule(self) -> dateutil.rrule.rrule:
        """
        Since rrule doesn't properly serialize/deserialize timezones, we localize dates
        here
        """
        rrule = dateutil.rrule.rrulestr(
            self.rrule,
            dtstart=DEFAULT_ANCHOR_DATE,
            cache=True,
        )
        timezone = dateutil.tz.gettz(self.timezone)
        if isinstance(rrule, dateutil.rrule.rrule):
            kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
            if rrule._until:
                kwargs.update(
                    until=rrule._until.replace(tzinfo=timezone),
                )
            return rrule.replace(**kwargs)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            # update rrules
            localized_rrules = []
            for rr in rrule._rrule:
                kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
                if rr._until:
                    kwargs.update(
                        until=rr._until.replace(tzinfo=timezone),
                    )
                localized_rrules.append(rr.replace(**kwargs))
            rrule._rrule = localized_rrules

            # update exrules
            localized_exrules = []
            for exr in rrule._exrule:
                kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
                if exr._until:
                    kwargs.update(
                        until=exr._until.replace(tzinfo=timezone),
                    )
                localized_exrules.append(exr.replace(**kwargs))
            rrule._exrule = localized_exrules

            # update rdates
            localized_rdates = []
            for rd in rrule._rdate:
                localized_rdates.append(rd.replace(tzinfo=timezone))
            rrule._rdate = localized_rdates

            # update exdates
            localized_exdates = []
            for exd in rrule._exdate:
                localized_exdates.append(exd.replace(tzinfo=timezone))
            rrule._exdate = localized_exdates

            return rrule

    @validator("timezone", always=True)
    def valid_timezone(cls, v):
        return validate_rrule_timezone(v)

to_rrule

Since rrule doesn't properly serialize/deserialize timezones, we localize dates here

Source code in prefect/client/schemas/schedules.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def to_rrule(self) -> dateutil.rrule.rrule:
    """
    Since rrule doesn't properly serialize/deserialize timezones, we localize dates
    here
    """
    rrule = dateutil.rrule.rrulestr(
        self.rrule,
        dtstart=DEFAULT_ANCHOR_DATE,
        cache=True,
    )
    timezone = dateutil.tz.gettz(self.timezone)
    if isinstance(rrule, dateutil.rrule.rrule):
        kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
        if rrule._until:
            kwargs.update(
                until=rrule._until.replace(tzinfo=timezone),
            )
        return rrule.replace(**kwargs)
    elif isinstance(rrule, dateutil.rrule.rruleset):
        # update rrules
        localized_rrules = []
        for rr in rrule._rrule:
            kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
            if rr._until:
                kwargs.update(
                    until=rr._until.replace(tzinfo=timezone),
                )
            localized_rrules.append(rr.replace(**kwargs))
        rrule._rrule = localized_rrules

        # update exrules
        localized_exrules = []
        for exr in rrule._exrule:
            kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
            if exr._until:
                kwargs.update(
                    until=exr._until.replace(tzinfo=timezone),
                )
            localized_exrules.append(exr.replace(**kwargs))
        rrule._exrule = localized_exrules

        # update rdates
        localized_rdates = []
        for rd in rrule._rdate:
            localized_rdates.append(rd.replace(tzinfo=timezone))
        rrule._rdate = localized_rdates

        # update exdates
        localized_exdates = []
        for exd in rrule._exdate:
            localized_exdates.append(exd.replace(tzinfo=timezone))
        rrule._exdate = localized_exdates

        return rrule

construct_schedule

Construct a schedule from the provided arguments.

Parameters:

Name Type Description Default
interval Optional[Union[int, float, timedelta]]

An interval on which to schedule runs. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.

None
anchor_date Optional[Union[datetime, str]]

The start date for an interval schedule.

None
cron Optional[str]

A cron schedule for runs.

None
rrule Optional[str]

An rrule schedule of when to execute runs of this flow.

None
timezone Optional[str]

A timezone to use for the schedule. Defaults to UTC.

None
Source code in prefect/client/schemas/schedules.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def construct_schedule(
    interval: Optional[Union[int, float, datetime.timedelta]] = None,
    anchor_date: Optional[Union[datetime.datetime, str]] = None,
    cron: Optional[str] = None,
    rrule: Optional[str] = None,
    timezone: Optional[str] = None,
) -> SCHEDULE_TYPES:
    """
    Construct a schedule from the provided arguments.

    Args:
        interval: An interval on which to schedule runs. Accepts either a number
            or a timedelta object. If a number is given, it will be interpreted as seconds.
        anchor_date: The start date for an interval schedule.
        cron: A cron schedule for runs.
        rrule: An rrule schedule of when to execute runs of this flow.
        timezone: A timezone to use for the schedule. Defaults to UTC.
    """
    num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None)
    if num_schedules > 1:
        raise ValueError("Only one of interval, cron, or rrule can be provided.")

    if anchor_date and not interval:
        raise ValueError(
            "An anchor date can only be provided with an interval schedule"
        )

    if timezone and not (interval or cron or rrule):
        raise ValueError(
            "A timezone can only be provided with interval, cron, or rrule"
        )

    schedule = None
    if interval:
        if isinstance(interval, (int, float)):
            interval = datetime.timedelta(seconds=interval)
        schedule = IntervalSchedule(
            interval=interval, anchor_date=anchor_date, timezone=timezone
        )
    elif cron:
        schedule = CronSchedule(cron=cron, timezone=timezone)
    elif rrule:
        schedule = RRuleSchedule(rrule=rrule, timezone=timezone)

    if schedule is None:
        raise ValueError("Either interval, cron, or rrule must be provided")

    return schedule

prefect.client.schemas.sorting

FlowRunSort

Bases: AutoEnum

Defines flow run sorting options.

Source code in prefect/client/schemas/sorting.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class FlowRunSort(AutoEnum):
    """Defines flow run sorting options."""

    ID_DESC = AutoEnum.auto()
    START_TIME_ASC = AutoEnum.auto()
    START_TIME_DESC = AutoEnum.auto()
    EXPECTED_START_TIME_ASC = AutoEnum.auto()
    EXPECTED_START_TIME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()
    NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto()
    END_TIME_DESC = AutoEnum.auto()

TaskRunSort

Bases: AutoEnum

Defines task run sorting options.

Source code in prefect/client/schemas/sorting.py
18
19
20
21
22
23
24
25
26
27
class TaskRunSort(AutoEnum):
    """Defines task run sorting options."""

    ID_DESC = AutoEnum.auto()
    EXPECTED_START_TIME_ASC = AutoEnum.auto()
    EXPECTED_START_TIME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()
    NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto()
    END_TIME_DESC = AutoEnum.auto()

LogSort

Bases: AutoEnum

Defines log sorting options.

Source code in prefect/client/schemas/sorting.py
30
31
32
33
34
class LogSort(AutoEnum):
    """Defines log sorting options."""

    TIMESTAMP_ASC = AutoEnum.auto()
    TIMESTAMP_DESC = AutoEnum.auto()

FlowSort

Bases: AutoEnum

Defines flow sorting options.

Source code in prefect/client/schemas/sorting.py
37
38
39
40
41
42
43
class FlowSort(AutoEnum):
    """Defines flow sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()

DeploymentSort

Bases: AutoEnum

Defines deployment sorting options.

Source code in prefect/client/schemas/sorting.py
46
47
48
49
50
51
52
class DeploymentSort(AutoEnum):
    """Defines deployment sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    NAME_DESC = AutoEnum.auto()

ArtifactSort

Bases: AutoEnum

Defines artifact sorting options.

Source code in prefect/client/schemas/sorting.py
55
56
57
58
59
60
61
62
class ArtifactSort(AutoEnum):
    """Defines artifact sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    ID_DESC = AutoEnum.auto()
    KEY_DESC = AutoEnum.auto()
    KEY_ASC = AutoEnum.auto()

ArtifactCollectionSort

Bases: AutoEnum

Defines artifact collection sorting options.

Source code in prefect/client/schemas/sorting.py
65
66
67
68
69
70
71
72
class ArtifactCollectionSort(AutoEnum):
    """Defines artifact collection sorting options."""

    CREATED_DESC = AutoEnum.auto()
    UPDATED_DESC = AutoEnum.auto()
    ID_DESC = AutoEnum.auto()
    KEY_DESC = AutoEnum.auto()
    KEY_ASC = AutoEnum.auto()

VariableSort

Bases: AutoEnum

Defines variables sorting options.

Source code in prefect/client/schemas/sorting.py
75
76
77
78
79
80
81
class VariableSort(AutoEnum):
    """Defines variables sorting options."""

    CREATED_DESC = "CREATED_DESC"
    UPDATED_DESC = "UPDATED_DESC"
    NAME_DESC = "NAME_DESC"
    NAME_ASC = "NAME_ASC"

BlockDocumentSort

Bases: AutoEnum

Defines block document sorting options.

Source code in prefect/client/schemas/sorting.py
84
85
86
87
88
89
class BlockDocumentSort(AutoEnum):
    """Defines block document sorting options."""

    NAME_DESC = AutoEnum.auto()
    NAME_ASC = AutoEnum.auto()
    BLOCK_TYPE_AND_NAME_ASC = AutoEnum.auto()