Skip to content

prefect.orion.schemas.filters

Schemas that define Orion filtering operations.

Each filter schema includes logic for transforming itself into a SQL where clause.

DeploymentFilter pydantic-model

Filter for deployments. Only deployments matching all criteria will be returned.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilter(PrefectFilterBaseModel):
    """Filter for deployments. Only deployments matching all criteria will be returned."""

    id: Optional[DeploymentFilterId] = Field(
        None, description="Filter criteria for `Deployment.id`"
    )
    name: Optional[DeploymentFilterName] = Field(
        None, description="Filter criteria for `Deployment.name`"
    )
    is_schedule_active: Optional[DeploymentFilterIsScheduleActive] = Field(
        None, description="Filter criteria for `Deployment.is_schedule_active`"
    )
    tags: Optional[DeploymentFilterTags] = Field(
        None, description="Filter criteria for `Deployment.tags`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.is_schedule_active is not None:
            filters.append(self.is_schedule_active.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))

        return filters

id pydantic-field

Type: DeploymentFilterId

Filter criteria for Deployment.id

is_schedule_active pydantic-field

Type: DeploymentFilterIsScheduleActive

Filter criteria for Deployment.is_schedule_active

name pydantic-field

Type: DeploymentFilterName

Filter criteria for Deployment.name

tags pydantic-field

Type: DeploymentFilterTags

Filter criteria for Deployment.tags

DeploymentFilterId pydantic-model

Filter by Deployment.id.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterId(PrefectFilterBaseModel):
    """Filter by `Deployment.id`."""

    any_: List[UUID] = Field(None, description="A list of deployment ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Deployment.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of deployment ids to include

DeploymentFilterIsScheduleActive pydantic-model

Filter by Deployment.is_schedule_active.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterIsScheduleActive(PrefectFilterBaseModel):
    """Filter by `Deployment.is_schedule_active`."""

    eq_: bool = Field(
        None,
        description="Only returns where deployment schedule is/is not active",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.eq_ is not None:
            filters.append(db.Deployment.is_schedule_active.is_(self.eq_))
        return filters

eq_ pydantic-field

Type: bool

Only returns where deployment schedule is/is not active

DeploymentFilterName pydantic-model

Filter by Deployment.name.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterName(PrefectFilterBaseModel):
    """Filter by `Deployment.name`."""

    any_: List[str] = Field(
        None,
        description="A list of deployment names to include",
        example=["my-deployment-1", "my-deployment-2"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Deployment.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of deployment names to include

DeploymentFilterTags pydantic-model

Filter by Deployment.tags.

Source code in prefect/orion/schemas/filters.py
class DeploymentFilterTags(PrefectFilterBaseModel):
    """Filter by `Deployment.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Deployments will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include deployments without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.Deployment.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.Deployment.tags == [] if self.is_null_ else db.Deployment.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Deployments will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include deployments without tags

FilterSet pydantic-model

A collection of filters for common objects

Source code in prefect/orion/schemas/filters.py
class FilterSet(PrefectBaseModel):
    """A collection of filters for common objects"""

    flows: FlowFilter = Field(
        default_factory=FlowFilter, description="Filters that apply to flows"
    )
    flow_runs: FlowRunFilter = Field(
        default_factory=FlowRunFilter, description="Filters that apply to flow runs"
    )
    task_runs: TaskRunFilter = Field(
        default_factory=TaskRunFilter, description="Filters that apply to task runs"
    )
    deployments: DeploymentFilter = Field(
        default_factory=DeploymentFilter,
        description="Filters that apply to deployments",
    )

deployments pydantic-field

Type: DeploymentFilter

Filters that apply to deployments

flow_runs pydantic-field

Type: FlowRunFilter

Filters that apply to flow runs

flows pydantic-field

Type: FlowFilter

Filters that apply to flows

task_runs pydantic-field

Type: TaskRunFilter

Filters that apply to task runs

FlowFilter pydantic-model

Filter for flows. Only flows matching all criteria will be returned.

Source code in prefect/orion/schemas/filters.py
class FlowFilter(PrefectFilterBaseModel):
    """Filter for flows. Only flows matching all criteria will be returned."""

    id: Optional[FlowFilterId] = Field(
        None, description="Filter criteria for `Flow.id`"
    )
    name: Optional[FlowFilterName] = Field(
        None, description="Filter criteria for `Flow.name`"
    )
    tags: Optional[FlowFilterTags] = Field(
        None, description="Filter criteria for `Flow.tags`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))

        return filters

id pydantic-field

Type: FlowFilterId

Filter criteria for Flow.id

name pydantic-field

Type: FlowFilterName

Filter criteria for Flow.name

tags pydantic-field

Type: FlowFilterTags

Filter criteria for Flow.tags

FlowFilterId pydantic-model

Filter by Flow.id.

Source code in prefect/orion/schemas/filters.py
class FlowFilterId(PrefectFilterBaseModel):
    """Filter by `Flow.id`."""

    any_: List[UUID] = Field(None, description="A list of flow ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Flow.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow ids to include

FlowFilterName pydantic-model

Filter by Flow.name.

Source code in prefect/orion/schemas/filters.py
class FlowFilterName(PrefectFilterBaseModel):
    """Filter by `Flow.name`."""

    any_: List[str] = Field(
        None,
        description="A list of flow names to include",
        example=["my-flow-1", "my-flow-2"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Flow.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow names to include

FlowFilterTags pydantic-model

Filter by Flow.tags.

Source code in prefect/orion/schemas/filters.py
class FlowFilterTags(PrefectFilterBaseModel):
    """Filter by `Flow.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Flows will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(None, description="If true, only include flows without tags")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.Flow.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(db.Flow.tags == [] if self.is_null_ else db.Flow.tags != [])
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Flows will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include flows without tags

FlowRunFilter pydantic-model

Filter flow runs. Only flow runs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class FlowRunFilter(PrefectFilterBaseModel):
    """Filter flow runs. Only flow runs matching all criteria will be returned"""

    id: Optional[FlowRunFilterId] = Field(
        None, description="Filter criteria for `FlowRun.id`"
    )
    name: Optional[FlowRunFilterName] = Field(
        None, description="Filter criteria for `FlowRun.name`"
    )
    tags: Optional[FlowRunFilterTags] = Field(
        None, description="Filter criteria for `FlowRun.tags`"
    )
    deployment_id: Optional[FlowRunFilterDeploymentId] = Field(
        None, description="Filter criteria for `FlowRun.deployment_id`"
    )
    state: Optional[FlowRunFilterState] = Field(
        None, description="Filter criteria for `FlowRun.state`"
    )
    flow_version: Optional[FlowRunFilterFlowVersion] = Field(
        None, description="Filter criteria for `FlowRun.flow_version`"
    )
    start_time: Optional[FlowRunFilterStartTime] = Field(
        None, description="Filter criteria for `FlowRun.start_time`"
    )
    expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field(
        None, description="Filter criteria for `FlowRun.expected_start_time`"
    )
    next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field(
        None, description="Filter criteria for `FlowRun.next_scheduled_start_time`"
    )
    parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field(
        None, description="Filter criteria for `FlowRun.parent_task_run_id`"
    )

    flow_runner_type: Optional[FlowRunFilterFlowRunnerType] = Field(
        None, description="Filter criteria for FlowRun.flow_runner_type."
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))
        if self.deployment_id is not None:
            filters.append(self.deployment_id.as_sql_filter(db))
        if self.flow_version is not None:
            filters.append(self.flow_version.as_sql_filter(db))
        if self.state is not None:
            filters.append(self.state.as_sql_filter(db))
        if self.start_time is not None:
            filters.append(self.start_time.as_sql_filter(db))
        if self.expected_start_time is not None:
            filters.append(self.expected_start_time.as_sql_filter(db))
        if self.next_scheduled_start_time is not None:
            filters.append(self.next_scheduled_start_time.as_sql_filter(db))
        if self.parent_task_run_id is not None:
            filters.append(self.parent_task_run_id.as_sql_filter(db))
        if self.flow_runner_type is not None:
            filters.append(self.flow_runner_type.as_sql_filter(db))

        return filters

deployment_id pydantic-field

Type: FlowRunFilterDeploymentId

Filter criteria for FlowRun.deployment_id

expected_start_time pydantic-field

Type: FlowRunFilterExpectedStartTime

Filter criteria for FlowRun.expected_start_time

flow_runner_type pydantic-field

Type: FlowRunFilterFlowRunnerType

Filter criteria for FlowRun.flow_runner_type.

flow_version pydantic-field

Type: FlowRunFilterFlowVersion

Filter criteria for FlowRun.flow_version

id pydantic-field

Type: FlowRunFilterId

Filter criteria for FlowRun.id

name pydantic-field

Type: FlowRunFilterName

Filter criteria for FlowRun.name

next_scheduled_start_time pydantic-field

Type: FlowRunFilterNextScheduledStartTime

Filter criteria for FlowRun.next_scheduled_start_time

parent_task_run_id pydantic-field

Type: FlowRunFilterParentTaskRunId

Filter criteria for FlowRun.parent_task_run_id

start_time pydantic-field

Type: FlowRunFilterStartTime

Filter criteria for FlowRun.start_time

state pydantic-field

Type: FlowRunFilterState

Filter criteria for FlowRun.state

tags pydantic-field

Type: FlowRunFilterTags

Filter criteria for FlowRun.tags

FlowRunFilterDeploymentId pydantic-model

Filter by FlowRun.deployment_id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterDeploymentId(PrefectFilterBaseModel):
    """Filter by `FlowRun.deployment_id`."""

    any_: List[UUID] = Field(
        None, description="A list of flow run deployment ids to include"
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without deployment ids"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.deployment_id.in_(self.any_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.deployment_id == None
                if self.is_null_
                else db.FlowRun.deployment_id != None
            )
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run deployment ids to include

is_null_ pydantic-field

Type: bool

If true, only include flow runs without deployment ids

FlowRunFilterExpectedStartTime pydantic-model

Filter by FlowRun.expected_start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterExpectedStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.expected_start_time`."""

    before_: datetime.datetime = Field(
        None,
        description="Only include flow runs scheduled to start at or before this time",
    )
    after_: datetime.datetime = Field(
        None,
        description="Only include flow runs scheduled to start at or after this time",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.expected_start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.expected_start_time >= self.after_)
        return filters

after_ pydantic-field

Type: datetime

Only include flow runs scheduled to start at or after this time

before_ pydantic-field

Type: datetime

Only include flow runs scheduled to start at or before this time

FlowRunFilterFlowRunnerType pydantic-model

Filter by FlowRun.flow_runner_type.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterFlowRunnerType(PrefectFilterBaseModel):
    """Filter by `FlowRun.flow_runner_type`."""

    any_: List[str] = Field(
        None, description="A list of flow run flow runner types to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.flow_runner_type.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run flow runner types to include

FlowRunFilterFlowVersion pydantic-model

Filter by FlowRun.flow_version.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterFlowVersion(PrefectFilterBaseModel):
    """Filter by `FlowRun.flow_version`."""

    any_: List[str] = Field(
        None, description="A list of flow run flow_versions to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.flow_version.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run flow_versions to include

FlowRunFilterId pydantic-model

Filter by FlowRun.id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterId(PrefectFilterBaseModel):
    """Filter by FlowRun.id."""

    any_: List[UUID] = Field(None, description="A list of flow run ids to include")
    not_any_: List[UUID] = Field(None, description="A list of flow run ids to exclude")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.id.in_(self.any_))
        if self.not_any_ is not None:
            filters.append(db.FlowRun.id.not_in(self.not_any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run ids to include

not_any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run ids to exclude

FlowRunFilterName pydantic-model

Filter by FlowRun.name.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterName(PrefectFilterBaseModel):
    """Filter by `FlowRun.name`."""

    any_: List[str] = Field(
        None,
        description="A list of flow run names to include",
        example=["my-flow-run-1", "my-flow-run-2"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run names to include

FlowRunFilterNextScheduledStartTime pydantic-model

Filter by FlowRun.next_scheduled_start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterNextScheduledStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.next_scheduled_start_time`."""

    before_: datetime.datetime = Field(
        None,
        description="Only include flow runs with a next_scheduled_start_time or before this time",
    )
    after_: datetime.datetime = Field(
        None,
        description="Only include flow runs with a next_scheduled_start_time at or after this time",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.next_scheduled_start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.next_scheduled_start_time >= self.after_)
        return filters

after_ pydantic-field

Type: datetime

Only include flow runs with a next_scheduled_start_time at or after this time

before_ pydantic-field

Type: datetime

Only include flow runs with a next_scheduled_start_time or before this time

FlowRunFilterParentTaskRunId pydantic-model

Filter by FlowRun.parent_task_run_id.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterParentTaskRunId(PrefectFilterBaseModel):
    """Filter by `FlowRun.parent_task_run_id`."""

    any_: List[UUID] = Field(
        None, description="A list of flow run parent_task_run_ids to include"
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without parent_task_run_id"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.parent_task_run_id.in_(self.any_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.parent_task_run_id == None
                if self.is_null_
                else db.FlowRun.parent_task_run_id != None
            )
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run parent_task_run_ids to include

is_null_ pydantic-field

Type: bool

If true, only include flow runs without parent_task_run_id

FlowRunFilterStartTime pydantic-model

Filter by FlowRun.start_time.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStartTime(PrefectFilterBaseModel):
    """Filter by `FlowRun.start_time`."""

    before_: datetime.datetime = Field(
        None, description="Only include flow runs starting at or before this time"
    )
    after_: datetime.datetime = Field(
        None, description="Only include flow runs starting at or after this time"
    )
    is_null_: bool = Field(
        None, description="If true, only return flow runs without a start time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.FlowRun.start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.FlowRun.start_time >= self.after_)
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.start_time == None
                if self.is_null_
                else db.FlowRun.start_time != None
            )
        return filters

after_ pydantic-field

Type: datetime

Only include flow runs starting at or after this time

before_ pydantic-field

Type: datetime

Only include flow runs starting at or before this time

is_null_ pydantic-field

Type: bool

If true, only return flow runs without a start time

FlowRunFilterStateName pydantic-model

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStateName(PrefectFilterBaseModel):
    any_: List[str] = Field(
        None, description="A list of flow run state names to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.state_name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of flow run state names to include

FlowRunFilterStateType pydantic-model

Filter by FlowRun.state_type.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterStateType(PrefectFilterBaseModel):
    """Filter by `FlowRun.state_type`."""

    any_: List[schemas.states.StateType] = Field(
        None, description="A list of flow run state types to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.FlowRun.state_type.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[prefect.orion.schemas.states.StateType]

A list of flow run state types to include

FlowRunFilterTags pydantic-model

Filter by FlowRun.tags.

Source code in prefect/orion/schemas/filters.py
class FlowRunFilterTags(PrefectFilterBaseModel):
    """Filter by `FlowRun.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Flow runs will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include flow runs without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.FlowRun.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.FlowRun.tags == [] if self.is_null_ else db.FlowRun.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Flow runs will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include flow runs without tags

LogFilter pydantic-model

Filter logs. Only logs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class LogFilter(PrefectFilterBaseModel):
    """Filter logs. Only logs matching all criteria will be returned"""

    level: Optional[LogFilterLevel] = Field(
        None, description="Filter criteria for `Log.level`"
    )
    timestamp: Optional[LogFilterTimestamp] = Field(
        None, description="Filter criteria for `Log.timestamp`"
    )
    flow_run_id: Optional[LogFilterFlowRunId] = Field(
        None, description="Filter criteria for `Log.flow_run_id`"
    )
    task_run_id: Optional[LogFilterTaskRunId] = Field(
        None, description="Filter criteria for `Log.task_run_id`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.level is not None:
            filters.append(self.level.as_sql_filter(db))
        if self.timestamp is not None:
            filters.append(self.timestamp.as_sql_filter(db))
        if self.flow_run_id is not None:
            filters.append(self.flow_run_id.as_sql_filter(db))
        if self.task_run_id is not None:
            filters.append(self.task_run_id.as_sql_filter(db))

        return filters

flow_run_id pydantic-field

Type: LogFilterFlowRunId

Filter criteria for Log.flow_run_id

level pydantic-field

Type: LogFilterLevel

Filter criteria for Log.level

task_run_id pydantic-field

Type: LogFilterTaskRunId

Filter criteria for Log.task_run_id

timestamp pydantic-field

Type: LogFilterTimestamp

Filter criteria for Log.timestamp

LogFilterFlowRunId pydantic-model

Filter by Log.flow_run_id.

Source code in prefect/orion/schemas/filters.py
class LogFilterFlowRunId(PrefectFilterBaseModel):
    """Filter by `Log.flow_run_id`."""

    any_: List[UUID] = Field(None, description="A list of flow run IDs to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.flow_run_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of flow run IDs to include

LogFilterLevel pydantic-model

Filter by Log.level.

Source code in prefect/orion/schemas/filters.py
class LogFilterLevel(PrefectFilterBaseModel):
    """Filter by `Log.level`."""

    ge_: int = Field(
        None,
        description="Include logs with a level greater than or equal to this level",
        example=20,
    )

    le_: int = Field(
        None,
        description="Include logs with a level less than or equal to this level",
        example=50,
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.ge_ is not None:
            filters.append(db.Log.level >= self.ge_)
        if self.le_ is not None:
            filters.append(db.Log.level <= self.le_)
        return filters

ge_ pydantic-field

Type: int

Include logs with a level greater than or equal to this level

le_ pydantic-field

Type: int

Include logs with a level less than or equal to this level

LogFilterName pydantic-model

Filter by Log.name.

Source code in prefect/orion/schemas/filters.py
class LogFilterName(PrefectFilterBaseModel):
    """Filter by `Log.name`."""

    any_: List[str] = Field(
        None,
        description="A list of log names to include",
        example=["prefect.logger.flow_runs", "prefect.logger.task_runs"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of log names to include

LogFilterTaskRunId pydantic-model

Filter by Log.task_run_id.

Source code in prefect/orion/schemas/filters.py
class LogFilterTaskRunId(PrefectFilterBaseModel):
    """Filter by `Log.task_run_id`."""

    any_: List[UUID] = Field(None, description="A list of task run IDs to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.Log.task_run_id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of task run IDs to include

LogFilterTimestamp pydantic-model

Filter by Log.timestamp.

Source code in prefect/orion/schemas/filters.py
class LogFilterTimestamp(PrefectFilterBaseModel):
    """Filter by `Log.timestamp`."""

    before_: datetime.datetime = Field(
        None, description="Only include logs with a timestamp at or before this time"
    )
    after_: datetime.datetime = Field(
        None, description="Only include logs with a timestamp at or after this time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.Log.timestamp <= self.before_)
        if self.after_ is not None:
            filters.append(db.Log.timestamp >= self.after_)
        return filters

after_ pydantic-field

Type: datetime

Only include logs with a timestamp at or after this time

before_ pydantic-field

Type: datetime

Only include logs with a timestamp at or before this time

PrefectFilterBaseModel pydantic-model

Base model for Prefect filters

Source code in prefect/orion/schemas/filters.py
class PrefectFilterBaseModel(PrefectBaseModel):
    """Base model for Prefect filters"""

    def as_sql_filter(self, db: "OrionDBInterface") -> BooleanClauseList:
        """Generate SQL filter from provided filter parameters. If no filters parameters are available, return a TRUE filter."""
        filters = self._get_filter_list(db)
        return sa.and_(*filters) if filters else sa.and_(True)

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        """Return a list of boolean filter statements based on filter parameters"""
        raise NotImplementedError("_get_filter_list must be implemented")

TaskRunFilter pydantic-model

Filter task runs. Only task runs matching all criteria will be returned

Source code in prefect/orion/schemas/filters.py
class TaskRunFilter(PrefectFilterBaseModel):
    """Filter task runs. Only task runs matching all criteria will be returned"""

    id: Optional[TaskRunFilterId] = Field(
        None, description="Filter criteria for `TaskRun.id`"
    )
    name: Optional[TaskRunFilterName] = Field(
        None, description="Filter criteria for `TaskRun.name`"
    )
    tags: Optional[TaskRunFilterTags] = Field(
        None, description="Filter criteria for `TaskRun.tags`"
    )
    state: Optional[TaskRunFilterState] = Field(
        None, description="Filter criteria for `TaskRun.state`"
    )
    start_time: Optional[TaskRunFilterStartTime] = Field(
        None, description="Filter criteria for `TaskRun.start_time`"
    )
    subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field(
        None, description="Filter criteria for `TaskRun.subflow_run`"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []

        if self.id is not None:
            filters.append(self.id.as_sql_filter(db))
        if self.name is not None:
            filters.append(self.name.as_sql_filter(db))
        if self.tags is not None:
            filters.append(self.tags.as_sql_filter(db))
        if self.state is not None:
            filters.append(self.state.as_sql_filter(db))
        if self.start_time is not None:
            filters.append(self.start_time.as_sql_filter(db))
        if self.subflow_runs is not None:
            filters.append(self.subflow_runs.as_sql_filter(db))

        return filters

id pydantic-field

Type: TaskRunFilterId

Filter criteria for TaskRun.id

name pydantic-field

Type: TaskRunFilterName

Filter criteria for TaskRun.name

start_time pydantic-field

Type: TaskRunFilterStartTime

Filter criteria for TaskRun.start_time

state pydantic-field

Type: TaskRunFilterState

Filter criteria for TaskRun.state

subflow_runs pydantic-field

Type: TaskRunFilterSubFlowRuns

Filter criteria for TaskRun.subflow_run

tags pydantic-field

Type: TaskRunFilterTags

Filter criteria for TaskRun.tags

TaskRunFilterId pydantic-model

Filter by TaskRun.id.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterId(PrefectFilterBaseModel):
    """Filter by `TaskRun.id`."""

    any_: List[UUID] = Field(None, description="A list of task run ids to include")

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.id.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[uuid.UUID]

A list of task run ids to include

TaskRunFilterName pydantic-model

Filter by TaskRun.name.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterName(PrefectFilterBaseModel):
    """Filter by `TaskRun.name`."""

    any_: List[str] = Field(
        None,
        description="A list of task run names to include",
        example=["my-task-run-1", "my-task-run-2"],
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of task run names to include

TaskRunFilterStartTime pydantic-model

Filter by TaskRun.start_time.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStartTime(PrefectFilterBaseModel):
    """Filter by `TaskRun.start_time`."""

    before_: datetime.datetime = Field(
        None, description="Only include task runs starting at or before this time"
    )
    after_: datetime.datetime = Field(
        None, description="Only include task runs starting at or after this time"
    )
    is_null_: bool = Field(
        None, description="If true, only return task runs without a start time"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.before_ is not None:
            filters.append(db.TaskRun.start_time <= self.before_)
        if self.after_ is not None:
            filters.append(db.TaskRun.start_time >= self.after_)
        if self.is_null_ is not None:
            filters.append(
                db.TaskRun.start_time == None
                if self.is_null_
                else db.TaskRun.start_time != None
            )
        return filters

after_ pydantic-field

Type: datetime

Only include task runs starting at or after this time

before_ pydantic-field

Type: datetime

Only include task runs starting at or before this time

is_null_ pydantic-field

Type: bool

If true, only return task runs without a start time

TaskRunFilterStateName pydantic-model

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStateName(PrefectFilterBaseModel):
    any_: List[str] = Field(
        None, description="A list of task run state names to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.state_name.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[str]

A list of task run state names to include

TaskRunFilterStateType pydantic-model

Filter by TaskRun.state_type.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterStateType(PrefectFilterBaseModel):
    """Filter by `TaskRun.state_type`."""

    any_: List[schemas.states.StateType] = Field(
        None, description="A list of task run state types to include"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.any_ is not None:
            filters.append(db.TaskRun.state_type.in_(self.any_))
        return filters

any_ pydantic-field

Type: List[prefect.orion.schemas.states.StateType]

A list of task run state types to include

TaskRunFilterSubFlowRuns pydantic-model

Filter by TaskRun.subflow_run.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterSubFlowRuns(PrefectFilterBaseModel):
    """Filter by `TaskRun.subflow_run`."""

    exists_: bool = Field(
        None,
        description="If true, only include task runs that are subflow run parents; if false, exclude parent task runs",
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        filters = []
        if self.exists_ is True:
            filters.append(db.TaskRun.subflow_run.has())
        elif self.exists_ is False:
            filters.append(sa.not_(db.TaskRun.subflow_run.has()))
        return filters

exists_ pydantic-field

Type: bool

If true, only include task runs that are subflow run parents; if false, exclude parent task runs

TaskRunFilterTags pydantic-model

Filter by TaskRun.tags.

Source code in prefect/orion/schemas/filters.py
class TaskRunFilterTags(PrefectFilterBaseModel):
    """Filter by `TaskRun.tags`."""

    all_: List[str] = Field(
        None,
        example=["tag-1", "tag-2"],
        description="A list of tags. Task runs will be returned only if their tags are a superset of the list",
    )
    is_null_: bool = Field(
        None, description="If true, only include task runs without tags"
    )

    def _get_filter_list(self, db: "OrionDBInterface") -> List:
        from prefect.orion.utilities.database import json_has_all_keys

        filters = []
        if self.all_ is not None:
            filters.append(json_has_all_keys(db.TaskRun.tags, self.all_))
        if self.is_null_ is not None:
            filters.append(
                db.TaskRun.tags == [] if self.is_null_ else db.TaskRun.tags != []
            )
        return filters

all_ pydantic-field

Type: List[str]

A list of tags. Task runs will be returned only if their tags are a superset of the list

is_null_ pydantic-field

Type: bool

If true, only include task runs without tags