prefect.orion.models.deployments
Functions for interacting with deployment ORM objects. Intended for internal use by the Orion API.
count_deployments
async
Count deployments.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
flow_filter |
only count deployments whose flows match these criteria FlowFilter |
None |
flow_run_filter |
only count deployments whose flow runs match these criteria FlowRunFilter |
None |
task_run_filter |
only count deployments whose task runs match these criteria TaskRunFilter |
None |
deployment_filter |
only count deployment that match these filters DeploymentFilter |
None |
Returns:
Type | Description |
---|---|
int |
the number of deployments matching filters |
Source code in prefect/orion/models/deployments.py
@inject_db
async def count_deployments(
session: sa.orm.Session,
db: OrionDBInterface,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
"""
Count deployments.
Args:
session: A database session
flow_filter: only count deployments whose flows match these criteria
flow_run_filter: only count deployments whose flow runs match these criteria
task_run_filter: only count deployments whose task runs match these criteria
deployment_filter: only count deployment that match these filters
Returns:
int: the number of deployments matching filters
"""
query = select(sa.func.count(sa.text("*"))).select_from(db.Deployment)
query = await _apply_deployment_filters(
query=query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
result = await session.execute(query)
return result.scalar()
create_deployment
async
Upserts a deployment.
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
deployment |
a deployment model Deployment |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the newly-created or updated deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def create_deployment(
session: sa.orm.Session, deployment: schemas.core.Deployment, db: OrionDBInterface
):
"""Upserts a deployment.
Args:
session: a database session
deployment: a deployment model
Returns:
db.Deployment: the newly-created or updated deployment
"""
# set `updated` manually
# known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
# https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
deployment.updated = pendulum.now("UTC")
insert_values = deployment.dict(shallow=True, exclude_unset=True)
# Unpack the flow runner composite if set
flow_runner = insert_values.pop("flow_runner", None)
flow_runner_values = {}
if flow_runner:
flow_runner_values["flow_runner_type"] = flow_runner.type
flow_runner_values["flow_runner_config"] = flow_runner.config
insert_stmt = (
(await db.insert(db.Deployment))
.values(**insert_values, **flow_runner_values)
.on_conflict_do_update(
index_elements=db.deployment_unique_upsert_columns,
set_={
**deployment.dict(
shallow=True,
include={
"schedule",
"is_schedule_active",
"tags",
"parameters",
"flow_data",
"updated",
},
),
**flow_runner_values,
},
)
)
await session.execute(insert_stmt)
query = (
sa.select(db.Deployment)
.where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == deployment.name,
)
)
.execution_options(populate_existing=True)
)
result = await session.execute(query)
model = result.scalar()
return model
delete_deployment
async
Delete a deployment by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
deployment_id |
a deployment id UUID |
required |
Returns:
Type | Description |
---|---|
bool |
whether or not the deployment was deleted |
Source code in prefect/orion/models/deployments.py
@inject_db
async def delete_deployment(
session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
) -> bool:
"""
Delete a deployment by id.
Args:
session: A database session
deployment_id: a deployment id
Returns:
bool: whether or not the deployment was deleted
"""
# delete any scheduled runs for this deployment
delete_query = sa.delete(db.FlowRun).where(
db.FlowRun.deployment_id == deployment_id,
db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value,
)
await session.execute(delete_query)
result = await session.execute(
delete(db.Deployment).where(db.Deployment.id == deployment_id)
)
return result.rowcount > 0
read_deployment
async
Reads a deployment by id.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
deployment_id |
a deployment id UUID |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment(
session: sa.orm.Session, deployment_id: UUID, db: OrionDBInterface
):
"""Reads a deployment by id.
Args:
session: A database session
deployment_id: a deployment id
Returns:
db.Deployment: the deployment
"""
return await session.get(db.Deployment, deployment_id)
read_deployment_by_name
async
Reads a deployment by name.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
name |
a deployment name str |
required |
flow_name |
the name of the flow the deployment belongs to str |
required |
Returns:
Type | Description |
---|---|
db.Deployment |
the deployment |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployment_by_name(
session: sa.orm.Session, name: str, flow_name: str, db: OrionDBInterface
):
"""Reads a deployment by name.
Args:
session: A database session
name: a deployment name
flow_name: the name of the flow the deployment belongs to
Returns:
db.Deployment: the deployment
"""
result = await session.execute(
select(db.Deployment)
.join(db.Flow, db.Deployment.flow_id == db.Flow.id)
.where(
sa.and_(
db.Flow.name == flow_name,
db.Deployment.name == name,
)
)
.limit(1)
)
return result.scalar()
read_deployments
async
Read deployments.
Parameters:
Name | Description | Default |
---|---|---|
session |
A database session Session |
required |
offset |
Query offset int |
None |
limit |
Query limit int |
None |
flow_filter |
only select deployments whose flows match these criteria FlowFilter |
None |
flow_run_filter |
only select deployments whose flow runs match these criteria FlowRunFilter |
None |
task_run_filter |
only select deployments whose task runs match these criteria TaskRunFilter |
None |
deployment_filter |
only select deployment that match these filters DeploymentFilter |
None |
Returns:
Type | Description |
---|---|
List[db.Deployment] |
deployments |
Source code in prefect/orion/models/deployments.py
@inject_db
async def read_deployments(
session: sa.orm.Session,
db: OrionDBInterface,
offset: int = None,
limit: int = None,
flow_filter: schemas.filters.FlowFilter = None,
flow_run_filter: schemas.filters.FlowRunFilter = None,
task_run_filter: schemas.filters.TaskRunFilter = None,
deployment_filter: schemas.filters.DeploymentFilter = None,
):
"""
Read deployments.
Args:
session: A database session
offset: Query offset
limit: Query limit
flow_filter: only select deployments whose flows match these criteria
flow_run_filter: only select deployments whose flow runs match these criteria
task_run_filter: only select deployments whose task runs match these criteria
deployment_filter: only select deployment that match these filters
Returns:
List[db.Deployment]: deployments
"""
query = select(db.Deployment).order_by(db.Deployment.name)
query = await _apply_deployment_filters(
query=query,
flow_filter=flow_filter,
flow_run_filter=flow_run_filter,
task_run_filter=task_run_filter,
deployment_filter=deployment_filter,
db=db,
)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().unique().all()
schedule_runs
async
Schedule flow runs for a deployment
Parameters:
Name | Description | Default |
---|---|---|
session |
a database session Session |
required |
deployment_id |
the id of the deployment to schedule UUID |
required |
start_time |
the time from which to start scheduling runs datetime |
None |
end_time |
a limit on how far in the future runs will be scheduled datetime |
None |
max_runs |
a maximum amount of runs to schedule int |
None |
Returns:
Type | Description |
---|---|
List[uuid.UUID] |
a list of flow run ids scheduled for the deployment |
Source code in prefect/orion/models/deployments.py
async def schedule_runs(
session: sa.orm.Session,
deployment_id: UUID,
start_time: datetime.datetime = None,
end_time: datetime.datetime = None,
max_runs: int = None,
) -> List[UUID]:
"""
Schedule flow runs for a deployment
Args:
session: a database session
deployment_id: the id of the deployment to schedule
start_time: the time from which to start scheduling runs
end_time: a limit on how far in the future runs will be scheduled
max_runs: a maximum amount of runs to schedule
Returns:
a list of flow run ids scheduled for the deployment
"""
if max_runs is None:
max_runs = PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS.value()
if start_time is None:
start_time = pendulum.now("UTC")
start_time = pendulum.instance(start_time)
if end_time is None:
end_time = start_time + (
PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
)
end_time = pendulum.instance(end_time)
runs = await _generate_scheduled_flow_runs(
session=session,
deployment_id=deployment_id,
start_time=start_time,
end_time=end_time,
max_runs=max_runs,
)
return await _insert_scheduled_flow_runs(session=session, runs=runs)