Skip to content

prefect.orion.models.deployments

Functions for interacting with deployment ORM objects. Intended for internal use by the Orion API.

count_deployments async

Count deployments.

Parameters:

Name Description Default
session

A database session

Session
required
flow_filter

only count deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only count deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only count deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only count deployment that match these filters

DeploymentFilter
None

Returns:

Type Description
int

the number of deployments matching filters

Source code in prefect/orion/models/deployments.py
@inject_db
async def count_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
    """
    Count deployments.

    Args:
        session: A database session
        flow_filter: only count deployments whose flows match these criteria
        flow_run_filter: only count deployments whose flow runs match these criteria
        task_run_filter: only count deployments whose task runs match these criteria
        deployment_filter: only count deployment that match these filters

    Returns:
        int: the number of deployments matching filters
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Deployment)

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    result = await session.execute(query)
    return result.scalar()

create_deployment async

Upserts a deployment.

Parameters:

Name Description Default
session

a database session

Session
required
deployment

a deployment model

Deployment
required

Returns:

Type Description
db.Deployment

the newly-created or updated deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def create_deployment(
    session: sa.orm.Session, deployment: schemas.core.Deployment, db: OrionDBInterface
):
    """Upserts a deployment.

    Args:
        session: a database session
        deployment: a deployment model

    Returns:
        db.Deployment: the newly-created or updated deployment

    """

    # set `updated` manually
    # known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
    # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
    deployment.updated = pendulum.now("UTC")

    insert_values = deployment.dict(shallow=True, exclude_unset=True)

    # Unpack the flow runner composite if set
    flow_runner = insert_values.pop("flow_runner", None)
    flow_runner_values = {}
    if flow_runner:
        flow_runner_values["flow_runner_type"] = flow_runner.type
        flow_runner_values["flow_runner_config"] = flow_runner.config

    insert_stmt = (
        (await db.insert(db.Deployment))
        .values(**insert_values, **flow_runner_values)
        .on_conflict_do_update(
            index_elements=db.deployment_unique_upsert_columns,
            set_={
                **deployment.dict(
                    shallow=True,
                    include={
                        "schedule",
                        "is_schedule_active",
                        "tags",
                        "parameters",
                        "flow_data",
                        "updated",
                    },
                ),
                **flow_runner_values,
            },
        )
    )

    await session.execute(insert_stmt)

    query = (
        sa.select(db.Deployment)
        .where(
            sa.and_(
                db.Deployment.flow_id == deployment.flow_id,
                db.Deployment.name == deployment.name,
            )
        )
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()

    return model

delete_deployment async

Delete a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
bool

whether or not the deployment was deleted

Source code in prefect/orion/models/deployments.py
@inject_db
async def delete_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
) -> bool:
    """
    Delete a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        bool: whether or not the deployment was deleted
    """
    # delete any scheduled runs for this deployment
    delete_query = sa.delete(db.FlowRun).where(
        db.FlowRun.deployment_id == deployment_id,
        db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value,
    )
    await session.execute(delete_query)

    result = await session.execute(
        delete(db.Deployment).where(db.Deployment.id == deployment_id)
    )
    return result.rowcount > 0

read_deployment async

Reads a deployment by id.

Parameters:

Name Description Default
session

A database session

Session
required
deployment_id

a deployment id

UUID
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment(
    session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
):
    """Reads a deployment by id.

    Args:
        session: A database session
        deployment_id: a deployment id

    Returns:
        db.Deployment: the deployment
    """

    return await session.get(db.Deployment, deployment_id)

read_deployment_by_name async

Reads a deployment by name.

Parameters:

Name Description Default
session

A database session

Session
required
name

a deployment name

str
required
flow_name

the name of the flow the deployment belongs to

str
required

Returns:

Type Description
db.Deployment

the deployment

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment_by_name(
    session: sa.orm.Session, name: str, flow_name: str, db: OrionDBInterface
):
    """Reads a deployment by name.

    Args:
        session: A database session
        name: a deployment name
        flow_name: the name of the flow the deployment belongs to

    Returns:
        db.Deployment: the deployment
    """

    result = await session.execute(
        select(db.Deployment)
        .join(db.Flow, db.Deployment.flow_id == db.Flow.id)
        .where(
            sa.and_(
                db.Flow.name == flow_name,
                db.Deployment.name == name,
            )
        )
        .limit(1)
    )
    return result.scalar()

read_deployments async

Read deployments.

Parameters:

Name Description Default
session

A database session

Session
required
offset

Query offset

int
None
limit

Query limit

int
None
flow_filter

only select deployments whose flows match these criteria

FlowFilter
None
flow_run_filter

only select deployments whose flow runs match these criteria

FlowRunFilter
None
task_run_filter

only select deployments whose task runs match these criteria

TaskRunFilter
None
deployment_filter

only select deployment that match these filters

DeploymentFilter
None

Returns:

Type Description
List[db.Deployment]

deployments

Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployments(
    session: sa.orm.Session,
    db: OrionDBInterface,
    offset: int = None,
    limit: int = None,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
):
    """
    Read deployments.

    Args:
        session: A database session
        offset: Query offset
        limit: Query limit
        flow_filter: only select deployments whose flows match these criteria
        flow_run_filter: only select deployments whose flow runs match these criteria
        task_run_filter: only select deployments whose task runs match these criteria
        deployment_filter: only select deployment that match these filters


    Returns:
        List[db.Deployment]: deployments
    """

    query = select(db.Deployment).order_by(db.Deployment.name)

    query = await _apply_deployment_filters(
        query=query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

schedule_runs async

Schedule flow runs for a deployment

Parameters:

Name Description Default
session

a database session

Session
required
deployment_id

the id of the deployment to schedule

UUID
required
start_time

the time from which to start scheduling runs

datetime
None
end_time

a limit on how far in the future runs will be scheduled

datetime
None
max_runs

a maximum amount of runs to schedule

int
None

Returns:

Type Description
List[uuid.UUID]

a list of flow run ids scheduled for the deployment

Source code in prefect/orion/models/deployments.py
async def schedule_runs(
    session: sa.orm.Session,
    deployment_id: UUID,
    start_time: datetime.datetime = None,
    end_time: datetime.datetime = None,
    max_runs: int = None,
) -> List[UUID]:
    """
    Schedule flow runs for a deployment

    Args:
        session: a database session
        deployment_id: the id of the deployment to schedule
        start_time: the time from which to start scheduling runs
        end_time: a limit on how far in the future runs will be scheduled
        max_runs: a maximum amount of runs to schedule

    Returns:
        a list of flow run ids scheduled for the deployment
    """
    if max_runs is None:
        max_runs = PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS.value()
    if start_time is None:
        start_time = pendulum.now("UTC")
    start_time = pendulum.instance(start_time)
    if end_time is None:
        end_time = start_time + (
            PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
        )
    end_time = pendulum.instance(end_time)

    runs = await _generate_scheduled_flow_runs(
        session=session,
        deployment_id=deployment_id,
        start_time=start_time,
        end_time=end_time,
        max_runs=max_runs,
    )
    return await _insert_scheduled_flow_runs(session=session, runs=runs)