Skip to content

prefect.orion.api.flows

Routes for interacting with flow objects.

count_flows async

Count flows.

Source code in prefect/orion/api/flows.py
@router.post("/count")
async def count_flows(
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> int:
    """
    Count flows.
    """
    return await models.flows.count_flows(
        session=session,
        flow_filter=flows,
        flow_run_filter=flow_runs,
        task_run_filter=task_runs,
        deployment_filter=deployments,
    )

create_flow async

Gracefully creates a new flow from the provided schema. If a flow with the same name already exists, the existing flow is returned.

Source code in prefect/orion/api/flows.py
@router.post("/")
async def create_flow(
    flow: schemas.actions.FlowCreate,
    response: Response,
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> schemas.core.Flow:
    """Gracefully creates a new flow from the provided schema. If a flow with the
    same name already exists, the existing flow is returned.
    """
    # hydrate the input model into a full flow model
    flow = schemas.core.Flow(**flow.dict())

    now = pendulum.now("UTC")
    model = await models.flows.create_flow(session=session, flow=flow)
    if model.created >= now:
        response.status_code = status.HTTP_201_CREATED
    return model

delete_flow async

Delete a flow by id.

Source code in prefect/orion/api/flows.py
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
):
    """
    Delete a flow by id.
    """
    result = await models.flows.delete_flow(session=session, flow_id=flow_id)
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )

read_flow async

Get a flow by id.

Source code in prefect/orion/api/flows.py
@router.get("/{id}")
async def read_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> schemas.core.Flow:
    """
    Get a flow by id.
    """
    flow = await models.flows.read_flow(session=session, flow_id=flow_id)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flow_by_name async

Get a flow by name.

Source code in prefect/orion/api/flows.py
@router.get("/name/{name}")
async def read_flow_by_name(
    name: str = Path(..., description="The name of the flow"),
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> schemas.core.Flow:
    """
    Get a flow by name.
    """
    flow = await models.flows.read_flow_by_name(session=session, name=name)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flows async

Query for flows.

Source code in prefect/orion/api/flows.py
@router.post("/filter")
async def read_flows(
    limit: int = dependencies.LimitBody(),
    offset: int = Body(0, ge=0),
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
    session: sa.orm.Session = Depends(dependencies.get_session),
) -> List[schemas.core.Flow]:
    """
    Query for flows.
    """
    return await models.flows.read_flows(
        session=session,
        flow_filter=flows,
        flow_run_filter=flow_runs,
        task_run_filter=task_runs,
        deployment_filter=deployments,
        sort=sort,
        offset=offset,
        limit=limit,
    )

update_flow async

Updates a flow.

Source code in prefect/orion/api/flows.py
@router.patch("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_flow(
    flow: schemas.actions.FlowUpdate,
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    session: sa.orm.Session = Depends(dependencies.get_session),
):
    """
    Updates a flow.
    """
    result = await models.flows.update_flow(session=session, flow=flow, flow_id=flow_id)
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )