prefect.orion.api.flow_runs
Routes for interacting with flow run objects.
count_flow_runs
async
Query for flow runs.
Source code in prefect/orion/api/flow_runs.py
@router.post("/count")
async def count_flow_runs(
flows: schemas.filters.FlowFilter = None,
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
session: sa.orm.Session = Depends(dependencies.get_session),
) -> int:
"""
Query for flow runs.
"""
return await models.flow_runs.count_flow_runs(
session=session,
flow_filter=flows,
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
)
create_flow_run
async
Create a flow run. If a flow run with the same flow_id and idempotency key already exists, the existing flow run will be returned.
If no state is provided, the flow run will be created in a PENDING state.
Source code in prefect/orion/api/flow_runs.py
@router.post("/")
async def create_flow_run(
flow_run: schemas.actions.FlowRunCreate,
session: sa.orm.Session = Depends(dependencies.get_session),
response: Response = None,
) -> schemas.core.FlowRun:
"""
Create a flow run. If a flow run with the same flow_id and
idempotency key already exists, the existing flow run will be returned.
If no state is provided, the flow run will be created in a PENDING state.
"""
# hydrate the input model into a full flow run / state model
flow_run = schemas.core.FlowRun(**flow_run.dict())
if not flow_run.state:
flow_run.state = schemas.states.Pending()
now = pendulum.now("UTC")
model = await models.flow_runs.create_flow_run(session=session, flow_run=flow_run)
if model.created >= now:
response.status_code = status.HTTP_201_CREATED
return model
delete_flow_run
async
Delete a flow run by id.
Source code in prefect/orion/api/flow_runs.py
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_flow_run(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
session: sa.orm.Session = Depends(dependencies.get_session),
):
"""
Delete a flow run by id.
"""
result = await models.flow_runs.delete_flow_run(
session=session, flow_run_id=flow_run_id
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flow run not found"
)
flow_run_history
async
Query for flow run history data across a given range and interval.
Source code in prefect/orion/api/flow_runs.py
@router.post("/history")
async def flow_run_history(
history_start: datetime.datetime = Body(
..., description="The history's start time."
),
history_end: datetime.datetime = Body(..., description="The history's end time."),
history_interval: datetime.timedelta = Body(
...,
description="The size of each history interval, in seconds. Must be at least 1 second.",
alias="history_interval_seconds",
),
flows: schemas.filters.FlowFilter = None,
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[schemas.responses.HistoryResponse]:
"""
Query for flow run history data across a given range and interval.
"""
if history_interval < datetime.timedelta(seconds=1):
raise HTTPException(
status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="History interval must not be less than 1 second.",
)
return await run_history(
session=session,
run_type="flow_run",
history_start=history_start,
history_end=history_end,
history_interval=history_interval,
flows=flows,
flow_runs=flow_runs,
task_runs=task_runs,
deployments=deployments,
)
read_flow_run
async
Get a flow run by id.
Source code in prefect/orion/api/flow_runs.py
@router.get("/{id}")
async def read_flow_run(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
session: sa.orm.Session = Depends(dependencies.get_session),
) -> schemas.core.FlowRun:
"""
Get a flow run by id.
"""
flow_run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)
if not flow_run:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found")
return flow_run
read_flow_run_graph
async
Get a task run dependency map for a given flow run.
Source code in prefect/orion/api/flow_runs.py
@router.get("/{id}/graph")
async def read_flow_run_graph(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[DependencyResult]:
"""
Get a task run dependency map for a given flow run.
"""
return await models.flow_runs.read_task_run_dependencies(
session=session, flow_run_id=flow_run_id
)
read_flow_runs
async
Query for flow runs.
Source code in prefect/orion/api/flow_runs.py
@router.post("/filter")
async def read_flow_runs(
sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC),
limit: int = dependencies.LimitBody(),
offset: int = Body(0, ge=0),
flows: schemas.filters.FlowFilter = None,
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[schemas.core.FlowRun]:
"""
Query for flow runs.
"""
return await models.flow_runs.read_flow_runs(
session=session,
flow_filter=flows,
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
offset=offset,
limit=limit,
sort=sort,
)
set_flow_run_state
async
Set a flow run state, invoking any orchestration rules.
Source code in prefect/orion/api/flow_runs.py
@router.post("/{id}/set_state")
async def set_flow_run_state(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
state: schemas.actions.StateCreate = Body(..., description="The intended state."),
force: bool = Body(
False,
description=(
"If false, orchestration rules will be applied that may alter "
"or prevent the state transition. If True, orchestration rules are not applied."
),
),
session: sa.orm.Session = Depends(dependencies.get_session),
response: Response = None,
flow_policy: BaseOrchestrationPolicy = Depends(
orchestration_dependencies.provide_flow_policy
),
) -> OrchestrationResult:
"""Set a flow run state, invoking any orchestration rules."""
# create the state
orchestration_result = await models.flow_runs.set_flow_run_state(
session=session,
flow_run_id=flow_run_id,
# convert to a full State object
state=schemas.states.State.parse_obj(state),
force=force,
flow_policy=flow_policy,
)
# set the 201 because a new state was created
if orchestration_result.status == schemas.responses.SetStateStatus.WAIT:
response.status_code = status.HTTP_200_OK
elif orchestration_result.status == schemas.responses.SetStateStatus.ABORT:
response.status_code = status.HTTP_200_OK
else:
response.status_code = status.HTTP_201_CREATED
return orchestration_result
update_flow_run
async
Updates a flow run.
Source code in prefect/orion/api/flow_runs.py
@router.patch("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_flow_run(
flow_run: schemas.actions.FlowRunUpdate,
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
session: sa.orm.Session = Depends(dependencies.get_session),
):
"""
Updates a flow run.
"""
result = await models.flow_runs.update_flow_run(
session=session, flow_run=flow_run, flow_run_id=flow_run_id
)
if not result:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found")