prefect.orion.models.flows
Functions for interacting with flow ORM objects. Intended for internal use by the Orion API.
count_flows
async
Count flows.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_filter |
only count flows that match these filters FlowFilter |
None |
flow_run_filter |
only count flows whose flow runs match these filters FlowRunFilter |
None |
task_run_filter |
only count flows whose task runs match these filters TaskRunFilter |
None |
deployment_filter |
only count flows whose deployments match these filters DeploymentFilter |
None |
Returns:
Type | Description |
---|---|
int |
count of flows |
Source code in prefect/orion/models/flows.py
@inject_db
async def count_flows(
session: sa.orm.Session,
db: OrionDBInterface,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
"""
Count flows.
Args:
session: A database session
flow_filter: only count flows that match these filters
flow_run_filter: only count flows whose flow runs match these filters
task_run_filter: only count flows whose task runs match these filters
deployment_filter: only count flows whose deployments match these filters
Returns:
int: count of flows
"""
query = select(sa.func.count(sa.text("*"))).select_from(db.Flow)
query = await _apply_flow_filters(
query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
result = await session.execute(query)
return result.scalar()
create_flow
async
Creates a new flow.
If a flow with the same name already exists, the existing flow is returned.
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
flow |
a flow model Flow |
required |
Returns:
Type | Description |
---|---|
db.Flow |
the newly-created or existing flow |
Source code in prefect/orion/models/flows.py
@inject_db
async def create_flow(
session: sa.orm.Session, flow: schemas.core.Flow, db: OrionDBInterface
):
"""
Creates a new flow.
If a flow with the same name already exists, the existing flow is returned.
Args:
session: a database session
flow: a flow model
Returns:
db.Flow: the newly-created or existing flow
"""
insert_stmt = (
(await db.insert(db.Flow))
.values(**flow.dict(shallow=True, exclude_unset=True))
.on_conflict_do_nothing(
index_elements=db.flow_unique_upsert_columns,
)
)
await session.execute(insert_stmt)
query = (
sa.select(db.Flow)
.where(
db.Flow.name == flow.name,
)
.limit(1)
.execution_options(populate_existing=True)
)
result = await session.execute(query)
model = result.scalar()
return model
delete_flow
async
Delete a flow by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_id |
a flow id UUID |
required |
Returns:
Type | Description |
---|---|
bool |
whether or not the flow was deleted |
Source code in prefect/orion/models/flows.py
@inject_db
async def delete_flow(
session: sa.orm.Session, flow_id: UUID, db: OrionDBInterface
) -> bool:
"""
Delete a flow by id.
Args:
session: A database session
flow_id: a flow id
Returns:
bool: whether or not the flow was deleted
"""
result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id))
return result.rowcount > 0
read_flow
async
Reads a flow by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_id |
a flow id UUID |
required |
Returns:
Type | Description |
---|---|
db.Flow |
the flow |
Source code in prefect/orion/models/flows.py
@inject_db
async def read_flow(session: sa.orm.Session, flow_id: UUID, db: OrionDBInterface):
"""
Reads a flow by id.
Args:
session: A database session
flow_id: a flow id
Returns:
db.Flow: the flow
"""
return await session.get(db.Flow, flow_id)
read_flow_by_name
async
Reads a flow by name.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
name |
a flow name str |
required |
Returns:
Type | Description |
---|---|
db.Flow |
the flow |
Source code in prefect/orion/models/flows.py
@inject_db
async def read_flow_by_name(session: sa.orm.Session, name: str, db: OrionDBInterface):
"""
Reads a flow by name.
Args:
session: A database session
name: a flow name
Returns:
db.Flow: the flow
"""
result = await session.execute(select(db.Flow).filter_by(name=name))
return result.scalar()
read_flows
async
Read multiple flows.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_filter |
only select flows that match these filters FlowFilter |
None |
flow_run_filter |
only select flows whose flow runs match these filters FlowRunFilter |
None |
task_run_filter |
only select flows whose task runs match these filters TaskRunFilter |
None |
deployment_filter |
only select flows whose deployments match these filters DeploymentFilter |
None |
offset |
Query offset int |
None |
limit |
Query limit int |
None |
Returns:
Type | Description |
---|---|
List[db.Flow] |
flows |
Source code in prefect/orion/models/flows.py
@inject_db
async def read_flows(
session: sa.orm.Session,
db: OrionDBInterface,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
offset: int = None,
limit: int = None,
):
"""
Read multiple flows.
Args:
session: A database session
flow_filter: only select flows that match these filters
flow_run_filter: only select flows whose flow runs match these filters
task_run_filter: only select flows whose task runs match these filters
deployment_filter: only select flows whose deployments match these filters
offset: Query offset
limit: Query limit
Returns:
List[db.Flow]: flows
"""
query = select(db.Flow).order_by(sort.as_sql_sort(db=db))
query = await _apply_flow_filters(
query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().unique().all()
update_flow
async
Updates a flow.
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
flow_id |
the flow id to update UUID |
required |
flow |
a flow update model FlowUpdate |
required |
Returns:
Type | Description |
---|---|
bool |
whether or not matching rows were found to update |
Source code in prefect/orion/models/flows.py
@inject_db
async def update_flow(
session: sa.orm.Session,
flow_id: UUID,
flow: schemas.actions.FlowUpdate,
db: OrionDBInterface,
):
"""
Updates a flow.
Args:
session: a database session
flow_id: the flow id to update
flow: a flow update model
Returns:
bool: whether or not matching rows were found to update
"""
if not isinstance(flow, schemas.actions.FlowUpdate):
raise ValueError(
f"Expected parameter flow to have type schemas.actions.FlowUpdate, got {type(flow)!r} instead"
)
update_stmt = (
sa.update(db.Flow).where(db.Flow.id == flow_id)
# exclude_unset=True allows us to only update values provided by
# the user, ignoring any defaults on the model
.values(**flow.dict(shallow=True, exclude_unset=True))
)
result = await session.execute(update_stmt)
return result.rowcount > 0