prefect.orion.schemas.core
Full schemas of Orion API objects.
Agent
pydantic-model
An ORM representation of an agent
Source code in prefect/orion/schemas/core.py
class Agent(ORMBaseModel):
"""An ORM representation of an agent"""
name: str = Field(
default_factory=lambda: coolname.generate_slug(2),
description="The name of the agent. If a name is not provided, it will be auto-generated.",
)
work_queue_id: UUID = Field(
..., description="The work queue with which the agent is associated."
)
last_activity_time: Optional[datetime.datetime] = Field(
None, description="The last time this agent polled for work."
)
last_activity_time
pydantic-field
Type: datetime
The last time this agent polled for work.
name
pydantic-field
Type: str
The name of the agent. If a name is not provided, it will be auto-generated.
work_queue_id
pydantic-field
required
Type: UUID
The work queue with which the agent is associated.
Block
pydantic-model
An ORM representation of a block.
Source code in prefect/orion/schemas/core.py
class Block(ORMBaseModel):
"""An ORM representation of a block."""
name: str = Field(..., description="The block's name'")
data: dict = Field(default_factory=dict, description="The block's data")
block_spec_id: UUID = Field(..., description="A block spec ID")
block_spec: Optional[BlockSpec] = Field(
None, description="The associated block spec"
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
@classmethod
async def from_orm_model(
cls,
session,
orm_block: "prefect.orion.database.orm_models.ORMBlock",
):
return cls(
id=orm_block.id,
name=orm_block.name,
data=await orm_block.decrypt_data(session=session),
block_spec_id=orm_block.block_spec_id,
block_spec=orm_block.block_spec,
)
block_spec
pydantic-field
Type: BlockSpec
The associated block spec
block_spec_id
pydantic-field
required
Type: UUID
A block spec ID
data
pydantic-field
Type: dict
The block's data
name
pydantic-field
required
Type: str
The block's name'
BlockSpec
pydantic-model
An ORM representation of a block spec.
Source code in prefect/orion/schemas/core.py
class BlockSpec(ORMBaseModel):
"""An ORM representation of a block spec."""
name: str = Field(..., description="The block spec's name")
version: str = Field(..., description="The block spec's version")
type: str = Field(None, description="The block spec's type")
fields: dict = Field(
default_factory=dict, description="The block spec's field schema"
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
@validator("version", check_fields=False)
def validate_version_characters(cls, v):
if any(c in v for c in INVALID_CHARACTERS):
raise ValueError(
f"Version contains an invalid character {INVALID_CHARACTERS}."
)
return v
fields
pydantic-field
Type: dict
The block spec's field schema
name
pydantic-field
required
Type: str
The block spec's name
type
pydantic-field
Type: str
The block spec's type
version
pydantic-field
required
Type: str
The block spec's version
ConcurrencyLimit
pydantic-model
An ORM representation of a concurrency limit.
Source code in prefect/orion/schemas/core.py
class ConcurrencyLimit(ORMBaseModel):
"""An ORM representation of a concurrency limit."""
tag: str = Field(..., description="A tag the concurrency limit is applied to.")
concurrency_limit: int = Field(..., description="The concurrency limit.")
active_slots: List[UUID] = Field(
default_factory=list,
description="A list of active run ids using a concurrency slot",
)
active_slots
pydantic-field
Type: List[uuid.UUID]
A list of active run ids using a concurrency slot
concurrency_limit
pydantic-field
required
Type: int
The concurrency limit.
tag
pydantic-field
required
Type: str
A tag the concurrency limit is applied to.
Configuration
pydantic-model
An ORM representation of account info.
Source code in prefect/orion/schemas/core.py
class Configuration(ORMBaseModel):
"""An ORM representation of account info."""
key: str = Field(..., description="Account info key")
value: dict = Field(..., description="Account info")
key
pydantic-field
required
Type: str
Account info key
value
pydantic-field
required
Type: dict
Account info
Deployment
pydantic-model
An ORM representation of deployment data.
Source code in prefect/orion/schemas/core.py
class Deployment(ORMBaseModel):
"""An ORM representation of deployment data."""
name: str = Field(..., description="The name of the deployment.")
flow_id: UUID = Field(
..., description="The flow id associated with the deployment."
)
flow_data: schemas.data.DataDocument = Field(
..., description="A data document representing the flow code to execute."
)
schedule: schemas.schedules.SCHEDULE_TYPES = Field(
None, description="A schedule for the deployment."
)
is_schedule_active: bool = Field(
True, description="Whether or not the deployment schedule is active."
)
parameters: Dict[str, Any] = Field(
default_factory=dict,
description="Parameters for flow runs scheduled by the deployment.",
)
tags: List[str] = Field(
default_factory=list,
description="A list of tags for the deployment",
example=["tag-1", "tag-2"],
)
flow_runner: FlowRunnerSettings = Field(
None,
description="The flow runner to assign to flow runs associated with this deployment.",
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
flow_data
pydantic-field
required
Type: DataDocument
A data document representing the flow code to execute.
flow_id
pydantic-field
required
Type: UUID
The flow id associated with the deployment.
flow_runner
pydantic-field
Type: FlowRunnerSettings
The flow runner to assign to flow runs associated with this deployment.
is_schedule_active
pydantic-field
Type: bool
Whether or not the deployment schedule is active.
name
pydantic-field
required
Type: str
The name of the deployment.
parameters
pydantic-field
Type: Dict[str, Any]
Parameters for flow runs scheduled by the deployment.
schedule
pydantic-field
Type: Union[prefect.orion.schemas.schedules.IntervalSchedule, prefect.orion.schemas.schedules.CronSchedule, prefect.orion.schemas.schedules.RRuleSchedule]
A schedule for the deployment.
tags
pydantic-field
Type: List[str]
A list of tags for the deployment
Flow
pydantic-model
An ORM representation of flow data.
Source code in prefect/orion/schemas/core.py
class Flow(ORMBaseModel):
"""An ORM representation of flow data."""
name: str = Field(..., description="The name of the flow", example="my-flow")
tags: List[str] = Field(
default_factory=list,
description="A list of flow tags",
example=["tag-1", "tag-2"],
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
name
pydantic-field
required
Type: str
The name of the flow
tags
pydantic-field
Type: List[str]
A list of flow tags
FlowRun
pydantic-model
An ORM representation of flow run data.
Source code in prefect/orion/schemas/core.py
class FlowRun(ORMBaseModel):
"""An ORM representation of flow run data."""
name: str = Field(
default_factory=lambda: coolname.generate_slug(2),
description="The name of the flow run. Defaults to a random slug if not specified.",
example="my-flow-run",
)
flow_id: UUID = Field(..., description="The id of the flow being run.")
state_id: UUID = Field(None, description="The id of the flow run's current state.")
deployment_id: UUID = Field(
None,
description="The id of the deployment associated with this flow run, if available.",
)
flow_version: str = Field(
None,
description="The version of the flow executed in this flow run.",
example="1.0",
)
parameters: dict = Field(
default_factory=dict, description="Parameters for the flow run."
)
idempotency_key: str = Field(
None,
description="An optional idempotency key for the flow run. Used to ensure the same flow run is not created multiple times.",
)
context: dict = Field(
default_factory=dict,
description="Additional context for the flow run.",
example={"my_var": "my_val"},
)
empirical_policy: dict = Field(default_factory=dict)
empirical_config: dict = Field(default_factory=dict)
tags: List[str] = Field(
default_factory=list,
description="A list of tags on the flow run",
example=["tag-1", "tag-2"],
)
parent_task_run_id: UUID = Field(
None,
description="If the flow run is a subflow, the id of the 'dummy' task in the parent flow used to track subflow state.",
)
state_type: schemas.states.StateType = Field(
None, description="The type of the current flow run state."
)
run_count: int = Field(
0, description="The number of times the flow run was executed."
)
expected_start_time: datetime.datetime = Field(
None,
description="The flow run's expected start time.",
)
next_scheduled_start_time: datetime.datetime = Field(
None,
description="The next time the flow run is scheduled to start.",
)
start_time: datetime.datetime = Field(None, description="The actual start time.")
end_time: datetime.datetime = Field(None, description="The actual end time.")
total_run_time: datetime.timedelta = Field(
datetime.timedelta(0),
description="Total run time. If the flow run was executed multiple times, the time of each run will be summed.",
)
estimated_run_time: datetime.timedelta = Field(
datetime.timedelta(0), description="A real-time estimate of the total run time."
)
estimated_start_time_delta: datetime.timedelta = Field(
datetime.timedelta(0),
description="The difference between actual and expected start time.",
)
auto_scheduled: bool = Field(
False, description="Whether or not the flow run was automatically scheduled."
)
flow_runner: FlowRunnerSettings = Field(
None,
description="The flow runner to use to create infrastructure to execute this flow run",
)
# relationships
# flow: Flow = None
# task_runs: List["TaskRun"] = Field(default_factory=list)
state: schemas.states.State = Field(
None, description="The current state of the flow run."
)
# parent_task_run: "TaskRun" = None
@validator("name", pre=True)
def set_name(cls, name):
return name or coolname.generate_slug(2)
def __eq__(self, other: Any) -> bool:
"""
Check for "equality" to another flow run schema
Estimates times are rolling and will always change with repeated queries for
a flow run so we ignore them during equality checks.
"""
if isinstance(other, FlowRun):
exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
return self.dict(exclude=exclude_fields) == other.dict(
exclude=exclude_fields
)
return super().__eq__(other)
auto_scheduled
pydantic-field
Type: bool
Whether or not the flow run was automatically scheduled.
context
pydantic-field
Type: dict
Additional context for the flow run.
deployment_id
pydantic-field
Type: UUID
The id of the deployment associated with this flow run, if available.
end_time
pydantic-field
Type: datetime
The actual end time.
estimated_run_time
pydantic-field
Type: timedelta
A real-time estimate of the total run time.
estimated_start_time_delta
pydantic-field
Type: timedelta
The difference between actual and expected start time.
expected_start_time
pydantic-field
Type: datetime
The flow run's expected start time.
flow_id
pydantic-field
required
Type: UUID
The id of the flow being run.
flow_runner
pydantic-field
Type: FlowRunnerSettings
The flow runner to use to create infrastructure to execute this flow run
flow_version
pydantic-field
Type: str
The version of the flow executed in this flow run.
idempotency_key
pydantic-field
Type: str
An optional idempotency key for the flow run. Used to ensure the same flow run is not created multiple times.
name
pydantic-field
Type: str
The name of the flow run. Defaults to a random slug if not specified.
next_scheduled_start_time
pydantic-field
Type: datetime
The next time the flow run is scheduled to start.
parameters
pydantic-field
Type: dict
Parameters for the flow run.
parent_task_run_id
pydantic-field
Type: UUID
If the flow run is a subflow, the id of the 'dummy' task in the parent flow used to track subflow state.
run_count
pydantic-field
Type: int
The number of times the flow run was executed.
start_time
pydantic-field
Type: datetime
The actual start time.
state
pydantic-field
Type: State
The current state of the flow run.
state_id
pydantic-field
Type: UUID
The id of the flow run's current state.
state_type
pydantic-field
Type: StateType
The type of the current flow run state.
tags
pydantic-field
Type: List[str]
A list of tags on the flow run
total_run_time
pydantic-field
Type: timedelta
Total run time. If the flow run was executed multiple times, the time of each run will be summed.
FlowRunnerSettings
pydantic-model
An API schema for passing details about the flow runner.
This schema is agnostic to the types and configuration provided by clients
Source code in prefect/orion/schemas/core.py
class FlowRunnerSettings(PrefectBaseModel):
"""
An API schema for passing details about the flow runner.
This schema is agnostic to the types and configuration provided by clients
"""
type: str = Field(
None,
description="The type of the flow runner which can be used by the client for dispatching.",
)
config: dict = Field(
None, description="The configuration for the given flow runner type."
)
# The following is required for composite compatibility in the ORM
def __init__(self, type: str = None, config: dict = None, **kwargs) -> None:
# Pydantic does not support positional arguments so they must be converted to
# keyword arguments
super().__init__(type=type, config=config, **kwargs)
def __composite_values__(self):
return self.type, self.config
config
pydantic-field
Type: dict
The configuration for the given flow runner type.
type
pydantic-field
Type: str
The type of the flow runner which can be used by the client for dispatching.
Log
pydantic-model
An ORM representation of log data.
Source code in prefect/orion/schemas/core.py
class Log(ORMBaseModel):
"""An ORM representation of log data."""
name: str = Field(..., description="The logger name.")
level: int = Field(..., description="The log level.")
message: str = Field(..., description="The log message.")
timestamp: datetime.datetime = Field(..., description="The log timestamp.")
flow_run_id: UUID = Field(
..., description="The flow run ID associated with the log."
)
task_run_id: Optional[UUID] = Field(
None, description="The task run ID associated with the log."
)
flow_run_id
pydantic-field
required
Type: UUID
The flow run ID associated with the log.
level
pydantic-field
required
Type: int
The log level.
message
pydantic-field
required
Type: str
The log message.
name
pydantic-field
required
Type: str
The logger name.
task_run_id
pydantic-field
Type: UUID
The task run ID associated with the log.
timestamp
pydantic-field
required
Type: datetime
The log timestamp.
QueueFilter
pydantic-model
Filter criteria definition for a work queue.
Source code in prefect/orion/schemas/core.py
class QueueFilter(PrefectBaseModel):
"""Filter criteria definition for a work queue."""
tags: Optional[List[str]] = Field(
None,
description="Only include flow runs with these tags in the work queue.",
)
deployment_ids: Optional[List[UUID]] = Field(
None,
description="Only include flow runs from these deployments in the work queue.",
)
flow_runner_types: Optional[List[str]] = Field(
None,
description="Only include flow runs with these flow runner types in the work queue.",
)
def get_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's flow runs.
"""
return schemas.filters.FlowRunFilter(
tags=schemas.filters.FlowRunFilterTags(all_=self.tags),
deployment_id=schemas.filters.FlowRunFilterDeploymentId(
any_=self.deployment_ids,
is_null_=False,
),
flow_runner_type=schemas.filters.FlowRunFilterFlowRunnerType(
any_=self.flow_runner_types,
),
)
def get_scheduled_flow_run_filter(
self, scheduled_before: datetime.datetime
) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Args:
scheduled_before: Create a FlowRunFilter that excludes runs scheduled before this date.
Returns:
Flow run filter that can be used to query the work queue for scheduled runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.SCHEDULED,
]
)
),
"next_scheduled_start_time": schemas.filters.FlowRunFilterNextScheduledStartTime(
before_=scheduled_before
),
}
)
def get_executing_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.PENDING,
schemas.states.StateType.RUNNING,
]
)
)
}
)
deployment_ids
pydantic-field
Type: List[uuid.UUID]
Only include flow runs from these deployments in the work queue.
flow_runner_types
pydantic-field
Type: List[str]
Only include flow runs with these flow runner types in the work queue.
tags
pydantic-field
Type: List[str]
Only include flow runs with these tags in the work queue.
QueueFilter.get_executing_flow_run_filter
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
Source code in prefect/orion/schemas/core.py
def get_executing_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's PENDING or RUNNING flow runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.PENDING,
schemas.states.StateType.RUNNING,
]
)
)
}
)
QueueFilter.get_flow_run_filter
Construct a flow run filter for the work queue's flow runs.
Source code in prefect/orion/schemas/core.py
def get_flow_run_filter(self) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's flow runs.
"""
return schemas.filters.FlowRunFilter(
tags=schemas.filters.FlowRunFilterTags(all_=self.tags),
deployment_id=schemas.filters.FlowRunFilterDeploymentId(
any_=self.deployment_ids,
is_null_=False,
),
flow_runner_type=schemas.filters.FlowRunFilterFlowRunnerType(
any_=self.flow_runner_types,
),
)
QueueFilter.get_scheduled_flow_run_filter
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Parameters:
Name | Description | Default |
---|---|---|
scheduled_before |
Create a FlowRunFilter that excludes runs scheduled before this date. datetime |
required |
Returns:
Type | Description |
---|---|
schemas.filters.FlowRunFilter |
Flow run filter that can be used to query the work queue for scheduled runs. |
Source code in prefect/orion/schemas/core.py
def get_scheduled_flow_run_filter(
self, scheduled_before: datetime.datetime
) -> "schemas.filters.FlowRunFilter":
"""
Construct a flow run filter for the work queue's SCHEDULED flow runs.
Args:
scheduled_before: Create a FlowRunFilter that excludes runs scheduled before this date.
Returns:
Flow run filter that can be used to query the work queue for scheduled runs.
"""
return self.get_flow_run_filter().copy(
update={
"state": schemas.filters.FlowRunFilterState(
type=schemas.filters.FlowRunFilterStateType(
any_=[
schemas.states.StateType.SCHEDULED,
]
)
),
"next_scheduled_start_time": schemas.filters.FlowRunFilterNextScheduledStartTime(
before_=scheduled_before
),
}
)
SavedSearch
pydantic-model
An ORM representation of saved search data. Represents a set of filter criteria.
Source code in prefect/orion/schemas/core.py
class SavedSearch(ORMBaseModel):
"""An ORM representation of saved search data. Represents a set of filter criteria."""
name: str = Field(..., description="The name of the saved search.")
filters: List[SavedSearchFilter] = Field(
default_factory=list, description="The filter set for the saved search."
)
filters
pydantic-field
Type: List[prefect.orion.schemas.core.SavedSearchFilter]
The filter set for the saved search.
name
pydantic-field
required
Type: str
The name of the saved search.
SavedSearchFilter
pydantic-model
A filter for a saved search model. Intended for use by the Prefect UI.
Source code in prefect/orion/schemas/core.py
class SavedSearchFilter(PrefectBaseModel):
"""A filter for a saved search model. Intended for use by the Prefect UI."""
object: str = Field(..., description="The object over which to filter.")
property: str = Field(
..., description="The property of the object on which to filter."
)
type: str = Field(..., description="The type of the property.")
operation: str = Field(
..., description="The operator to apply to the object. For example, `equals`."
)
value: Any = Field(..., description="A JSON-compatible value for the filter.")
object
pydantic-field
required
Type: str
The object over which to filter.
operation
pydantic-field
required
Type: str
The operator to apply to the object. For example, equals
.
property
pydantic-field
required
Type: str
The property of the object on which to filter.
type
pydantic-field
required
Type: str
The type of the property.
value
pydantic-field
required
Type: Any
A JSON-compatible value for the filter.
TaskRun
pydantic-model
An ORM representation of task run data.
Source code in prefect/orion/schemas/core.py
class TaskRun(ORMBaseModel):
"""An ORM representation of task run data."""
name: str = Field(
default_factory=lambda: coolname.generate_slug(2), example="my-task-run"
)
flow_run_id: UUID = Field(..., description="The flow run id of the task run.")
task_key: str = Field(
..., description="A unique identifier for the task being run."
)
dynamic_key: str = Field(
...,
description="A dynamic key used to differentiate between multiple runs of the same task within the same flow run.",
)
cache_key: str = Field(
None,
description="An optional cache key. If a COMPLETED state associated with this cache key is found, the cached COMPLETED state will be used instead of executing the task run.",
)
cache_expiration: datetime.datetime = Field(
None, description="Specifies when the cached state should expire."
)
task_version: str = Field(None, description="The version of the task being run.")
empirical_policy: TaskRunPolicy = Field(
default_factory=TaskRunPolicy,
)
tags: List[str] = Field(
default_factory=list,
description="A list of tags for the task run.",
example=["tag-1", "tag-2"],
)
state_id: UUID = Field(None, description="The id of the current task run state.")
task_inputs: Dict[str, List[Union[TaskRunResult, Parameter, Constant]]] = Field(
default_factory=dict,
description="Tracks the source of inputs to a task run. Used for internal bookkeeping.",
)
state_type: schemas.states.StateType = Field(
None, description="The type of the current task run state."
)
run_count: int = Field(
0, description="The number of times the task run has been executed."
)
expected_start_time: datetime.datetime = Field(
None,
description="The task run's expected start time.",
)
# the next scheduled start time will be populated
# whenever the run is in a scheduled state
next_scheduled_start_time: datetime.datetime = Field(
None,
description="The next time the task run is scheduled to start.",
)
start_time: datetime.datetime = Field(None, description="The actual start time.")
end_time: datetime.datetime = Field(None, description="The actual end time.")
total_run_time: datetime.timedelta = Field(
datetime.timedelta(0),
description="Total run time. If the task run was executed multiple times, the time of each run will be summed.",
)
estimated_run_time: datetime.timedelta = Field(
datetime.timedelta(0), description="A real-time estimate of total run time."
)
estimated_start_time_delta: datetime.timedelta = Field(
datetime.timedelta(0),
description="The difference between actual and expected start time.",
)
# relationships
# flow_run: FlowRun = None
# subflow_runs: List[FlowRun] = Field(default_factory=list)
state: schemas.states.State = Field(None, description="The current task run state.")
@validator("name", pre=True)
def set_name(cls, name):
return name or coolname.generate_slug(2)
cache_expiration
pydantic-field
Type: datetime
Specifies when the cached state should expire.
cache_key
pydantic-field
Type: str
An optional cache key. If a COMPLETED state associated with this cache key is found, the cached COMPLETED state will be used instead of executing the task run.
dynamic_key
pydantic-field
required
Type: str
A dynamic key used to differentiate between multiple runs of the same task within the same flow run.
end_time
pydantic-field
Type: datetime
The actual end time.
estimated_run_time
pydantic-field
Type: timedelta
A real-time estimate of total run time.
estimated_start_time_delta
pydantic-field
Type: timedelta
The difference between actual and expected start time.
expected_start_time
pydantic-field
Type: datetime
The task run's expected start time.
flow_run_id
pydantic-field
required
Type: UUID
The flow run id of the task run.
next_scheduled_start_time
pydantic-field
Type: datetime
The next time the task run is scheduled to start.
run_count
pydantic-field
Type: int
The number of times the task run has been executed.
start_time
pydantic-field
Type: datetime
The actual start time.
state
pydantic-field
Type: State
The current task run state.
state_id
pydantic-field
Type: UUID
The id of the current task run state.
state_type
pydantic-field
Type: StateType
The type of the current task run state.
tags
pydantic-field
Type: List[str]
A list of tags for the task run.
task_inputs
pydantic-field
Type: Dict[str, List[Union[prefect.orion.schemas.core.TaskRunResult, prefect.orion.schemas.core.Parameter, prefect.orion.schemas.core.Constant]]]
Tracks the source of inputs to a task run. Used for internal bookkeeping.
task_key
pydantic-field
required
Type: str
A unique identifier for the task being run.
task_version
pydantic-field
Type: str
The version of the task being run.
total_run_time
pydantic-field
Type: timedelta
Total run time. If the task run was executed multiple times, the time of each run will be summed.
WorkQueue
pydantic-model
An ORM representaiton of a work queue
Source code in prefect/orion/schemas/core.py
class WorkQueue(ORMBaseModel):
"""An ORM representaiton of a work queue"""
filter: QueueFilter = Field(
default_factory=QueueFilter, description="Filter criteria for the work queue."
)
name: str = Field(..., description="The name of the work queue.")
description: Optional[str] = Field(
"", description="An optional description for the work queue."
)
is_paused: bool = Field(
False, description="Whether or not the work queue is paused."
)
concurrency_limit: Optional[int] = Field(
None, description="An optional concurrency limit for the work queue."
)
@validator("name", check_fields=False)
def validate_name_characters(cls, v):
raise_on_invalid_name(v)
return v
concurrency_limit
pydantic-field
Type: int
An optional concurrency limit for the work queue.
description
pydantic-field
Type: str
An optional description for the work queue.
filter
pydantic-field
Type: QueueFilter
Filter criteria for the work queue.
is_paused
pydantic-field
Type: bool
Whether or not the work queue is paused.
name
pydantic-field
required
Type: str
The name of the work queue.
raise_on_invalid_name
Raise an InvalidNameError if the given name contains any invalid characters.
Source code in prefect/orion/schemas/core.py
def raise_on_invalid_name(name: str) -> None:
"""
Raise an InvalidNameError if the given name contains any invalid
characters.
"""
if any(c in name for c in INVALID_CHARACTERS):
raise InvalidNameError(
f"Name {name!r} contains an invalid character. "
f"Must not contain any of: {INVALID_CHARACTERS}."
)