Skip to content

prefect.orion.api.flow_runs

Routes for interacting with flow run objects.

count_flow_runs async

Query for flow runs.

Source code in prefect/orion/api/flow_runs.py
@router.post("/count")
async def count_flow_runs(
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> int:
    """
    Query for flow runs.
    """
    return await models.flow_runs.count_flow_runs(
        session=session,
        flow_filter=flows,
        flow_run_filter=flow_runs,
        task_run_filter=task_runs,
        deployment_filter=deployments,
    )

create_flow_run async

Create a flow run. If a flow run with the same flow_id and idempotency key already exists, the existing flow run will be returned.

If no state is provided, the flow run will be created in a PENDING state.

Source code in prefect/orion/api/flow_runs.py
@router.post("/")
async def create_flow_run(
    flow_run: schemas.actions.FlowRunCreate,
    session: sa.orm.Session = Depends(dependencies.get_session),
    response: Response = None,
) -> schemas.core.FlowRun:
    """
    Create a flow run. If a flow run with the same flow_id and
    idempotency key already exists, the existing flow run will be returned.

    If no state is provided, the flow run will be created in a PENDING state.
    """
    # hydrate the input model into a full flow run / state model
    flow_run = schemas.core.FlowRun(**flow_run.dict())

    if not flow_run.state:
        flow_run.state = schemas.states.Pending()

    now = pendulum.now("UTC")
    model = await models.flow_runs.create_flow_run(session=session, flow_run=flow_run)
    if model.created >= now:
        response.status_code = status.HTTP_201_CREATED

    return model

delete_flow_run async

Delete a flow run by id.

Source code in prefect/orion/api/flow_runs.py
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_flow_run(
    flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
):
    """
    Delete a flow run by id.
    """
    result = await models.flow_runs.delete_flow_run(
        session=session, flow_run_id=flow_run_id
    )
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow run not found"
        )

flow_run_history async

Query for flow run history data across a given range and interval.

Source code in prefect/orion/api/flow_runs.py
@router.post("/history")
async def flow_run_history(
    history_start: datetime.datetime = Body(
        ..., description="The history's start time."
    ),
    history_end: datetime.datetime = Body(..., description="The history's end time."),
    history_interval: datetime.timedelta = Body(
        ...,
        description="The size of each history interval, in seconds. Must be at least 1 second.",
        alias="history_interval_seconds",
    ),
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[schemas.responses.HistoryResponse]:
    """
    Query for flow run history data across a given range and interval.
    """
    if history_interval < datetime.timedelta(seconds=1):
        raise HTTPException(
            status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="History interval must not be less than 1 second.",
        )

    return await run_history(
        session=session,
        run_type="flow_run",
        history_start=history_start,
        history_end=history_end,
        history_interval=history_interval,
        flows=flows,
        flow_runs=flow_runs,
        task_runs=task_runs,
        deployments=deployments,
    )

read_flow_run async

Get a flow run by id.

Source code in prefect/orion/api/flow_runs.py
@router.get("/{id}")
async def read_flow_run(
    flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> schemas.core.FlowRun:
    """
    Get a flow run by id.
    """
    flow_run = await models.flow_runs.read_flow_run(
        session=session, flow_run_id=flow_run_id
    )
    if not flow_run:
        raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found")
    return flow_run

read_flow_run_graph async

Get a task run dependency map for a given flow run.

Source code in prefect/orion/api/flow_runs.py
@router.get("/{id}/graph")
async def read_flow_run_graph(
    flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[DependencyResult]:
    """
    Get a task run dependency map for a given flow run.
    """
    return await models.flow_runs.read_task_run_dependencies(
        session=session, flow_run_id=flow_run_id
    )

read_flow_runs async

Query for flow runs.

Source code in prefect/orion/api/flow_runs.py
@router.post("/filter")
async def read_flow_runs(
    sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC),
    limit: int = dependencies.LimitBody(),
    offset: int = Body(0, ge=0),
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[schemas.core.FlowRun]:
    """
    Query for flow runs.
    """
    return await models.flow_runs.read_flow_runs(
        session=session,
        flow_filter=flows,
        flow_run_filter=flow_runs,
        task_run_filter=task_runs,
        deployment_filter=deployments,
        offset=offset,
        limit=limit,
        sort=sort,
    )

set_flow_run_state async

Set a flow run state, invoking any orchestration rules.

Source code in prefect/orion/api/flow_runs.py
@router.post("/{id}/set_state")
async def set_flow_run_state(
    flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
    state: schemas.actions.StateCreate = Body(..., description="The intended state."),
    force: bool = Body(
        False,
        description=(
            "If false, orchestration rules will be applied that may alter "
            "or prevent the state transition. If True, orchestration rules are not applied."
        ),
    ),
    session: sa.orm.Session = Depends(dependencies.get_session),
    response: Response = None,
    flow_policy: BaseOrchestrationPolicy = Depends(
        orchestration_dependencies.provide_flow_policy
    ),
) -> OrchestrationResult:
    """Set a flow run state, invoking any orchestration rules."""

    # create the state
    orchestration_result = await models.flow_runs.set_flow_run_state(
        session=session,
        flow_run_id=flow_run_id,
        # convert to a full State object
        state=schemas.states.State.parse_obj(state),
        force=force,
        flow_policy=flow_policy,
    )

    # set the 201 because a new state was created
    if orchestration_result.status == schemas.responses.SetStateStatus.WAIT:
        response.status_code = status.HTTP_200_OK
    elif orchestration_result.status == schemas.responses.SetStateStatus.ABORT:
        response.status_code = status.HTTP_200_OK
    else:
        response.status_code = status.HTTP_201_CREATED

    return orchestration_result

update_flow_run async

Updates a flow run.

Source code in prefect/orion/api/flow_runs.py
@router.patch("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_flow_run(
    flow_run: schemas.actions.FlowRunUpdate,
    flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
):
    """
    Updates a flow run.
    """
    result = await models.flow_runs.update_flow_run(
        session=session, flow_run=flow_run, flow_run_id=flow_run_id
    )
    if not result:
        raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found")