Skip to content

prefect.orion.models.flows

Functions for interacting with flow ORM objects. Intended for internal use by the Orion API.

count_flows async

Count flows.

Parameters:

Name Description Default
session

A database session

Session
required
flow_filter

only count flows that match these filters

FlowFilter
None
flow_run_filter

only count flows whose flow runs match these filters

FlowRunFilter
None
task_run_filter

only count flows whose task runs match these filters

TaskRunFilter
None
deployment_filter

only count flows whose deployments match these filters

DeploymentFilter
None

Returns:

Type Description
int

count of flows

Source code in prefect/orion/models/flows.py
@inject_db
async def count_flows(
    session: sa.orm.Session,
    db: OrionDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
    """
    Count flows.

    Args:
        session: A database session
        flow_filter: only count flows that match these filters
        flow_run_filter: only count flows whose flow runs match these filters
        task_run_filter: only count flows whose task runs match these filters
        deployment_filter: only count flows whose deployments match these filters

    Returns:
        int: count of flows
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Flow)

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    result = await session.execute(query)
    return result.scalar()

create_flow async

Creates a new flow.

If a flow with the same name already exists, the existing flow is returned.

Parameters:

Name Description Default
session

a database session

Session
required
flow

a flow model

Flow
required

Returns:

Type Description
db.Flow

the newly-created or existing flow

Source code in prefect/orion/models/flows.py
@inject_db
async def create_flow(
    session: sa.orm.Session, flow: schemas.core.Flow, db: OrionDBInterface
):
    """
    Creates a new flow.

    If a flow with the same name already exists, the existing flow is returned.

    Args:
        session: a database session
        flow: a flow model

    Returns:
        db.Flow: the newly-created or existing flow
    """

    insert_stmt = (
        (await db.insert(db.Flow))
        .values(**flow.dict(shallow=True, exclude_unset=True))
        .on_conflict_do_nothing(
            index_elements=db.flow_unique_upsert_columns,
        )
    )
    await session.execute(insert_stmt)

    query = (
        sa.select(db.Flow)
        .where(
            db.Flow.name == flow.name,
        )
        .limit(1)
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()
    return model

delete_flow async

Delete a flow by id.

Parameters:

Name Description Default
session

A database session

Session
required
flow_id

a flow id

UUID
required

Returns:

Type Description
bool

whether or not the flow was deleted

Source code in prefect/orion/models/flows.py
@inject_db
async def delete_flow(
    session: sa.orm.Session, flow_id: UUID, db: OrionDBInterface
) -> bool:
    """
    Delete a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        bool: whether or not the flow was deleted
    """

    result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id))
    return result.rowcount > 0

read_flow async

Reads a flow by id.

Parameters:

Name Description Default
session

A database session

Session
required
flow_id

a flow id

UUID
required

Returns:

Type Description
db.Flow

the flow

Source code in prefect/orion/models/flows.py
@inject_db
async def read_flow(session: sa.orm.Session, flow_id: UUID, db: OrionDBInterface):
    """
    Reads a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        db.Flow: the flow
    """
    return await session.get(db.Flow, flow_id)

read_flow_by_name async

Reads a flow by name.

Parameters:

Name Description Default
session

A database session

Session
required
name

a flow name

str
required

Returns:

Type Description
db.Flow

the flow

Source code in prefect/orion/models/flows.py
@inject_db
async def read_flow_by_name(session: sa.orm.Session, name: str, db: OrionDBInterface):
    """
    Reads a flow by name.

    Args:
        session: A database session
        name: a flow name

    Returns:
        db.Flow: the flow
    """

    result = await session.execute(select(db.Flow).filter_by(name=name))
    return result.scalar()

read_flows async

Read multiple flows.

Parameters:

Name Description Default
session

A database session

Session
required
flow_filter

only select flows that match these filters

FlowFilter
None
flow_run_filter

only select flows whose flow runs match these filters

FlowRunFilter
None
task_run_filter

only select flows whose task runs match these filters

TaskRunFilter
None
deployment_filter

only select flows whose deployments match these filters

DeploymentFilter
None
offset

Query offset

int
None
limit

Query limit

int
None

Returns:

Type Description
List[db.Flow]

flows

Source code in prefect/orion/models/flows.py
@inject_db
async def read_flows(
    session: sa.orm.Session,
    db: OrionDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
    offset: int = None,
    limit: int = None,
):
    """
    Read multiple flows.

    Args:
        session: A database session
        flow_filter: only select flows that match these filters
        flow_run_filter: only select flows whose flow runs match these filters
        task_run_filter: only select flows whose task runs match these filters
        deployment_filter: only select flows whose deployments match these filters
        offset: Query offset
        limit: Query limit

    Returns:
        List[db.Flow]: flows
    """

    query = select(db.Flow).order_by(sort.as_sql_sort(db=db))

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    if offset is not None:
        query = query.offset(offset)

    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

update_flow async

Updates a flow.

Parameters:

Name Description Default
session

a database session

Session
required
flow_id

the flow id to update

UUID
required
flow

a flow update model

FlowUpdate
required

Returns:

Type Description
bool

whether or not matching rows were found to update

Source code in prefect/orion/models/flows.py
@inject_db
async def update_flow(
    session: sa.orm.Session,
    flow_id: UUID,
    flow: schemas.actions.FlowUpdate,
    db: OrionDBInterface,
):
    """
    Updates a flow.

    Args:
        session: a database session
        flow_id: the flow id to update
        flow: a flow update model

    Returns:
        bool: whether or not matching rows were found to update
    """

    if not isinstance(flow, schemas.actions.FlowUpdate):
        raise ValueError(
            f"Expected parameter flow to have type schemas.actions.FlowUpdate, got {type(flow)!r} instead"
        )

    update_stmt = (
        sa.update(db.Flow).where(db.Flow.id == flow_id)
        # exclude_unset=True allows us to only update values provided by
        # the user, ignoring any defaults on the model
        .values(**flow.dict(shallow=True, exclude_unset=True))
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0