Skip to content

Flows

Flows are the most basic Prefect object. They are containers for workflow logic and allow users to interact with and reason about the state of their workflows. They are the only abstraction that can be interacted with, displayed and run without needing to reference any other aspect of the Prefect engine.

Flows are uniquely identified by name. Each run of a flow can optionally publish a version to distinguish different variations of a flow from each other. Additionally, flow runs generated by deployments are identified as such by the presence of a deployment_id field on the run.

Parameters

Arguments and keyword arguments to flow functions are called parameters. Type hints provide an easy way to enforce typing on your flow parameters via pydantic. This means that any pydantic model used as a type hint within a flow will be automatically coerced into the relevant object type:

from pydantic import BaseModel

class Model(BaseModel):
    a: int
    b: float
    c: str

@flow
def model_validator(model: Model):
    printer(model)

Type hints unify API types with Python

Note that parameter values can be provided to flow via API using the concept of a deployment. In general the API only knows how to parse JSON compatible entities, but type hints on your flow functions provide you a way of automatically coercing JSON provided values to their appropriate Python representation. For example, to automatically convert something to a datetime:

from prefect import flow
from datetime import datetime

@flow
def what_day_is_it(date: datetime = None):
    if date is None:
        date = datetime.utcnow()
    print(f"It was {date.strftime('%A')} on {date.isoformat()}")

what_day_is_it("2021-01-01T02:00:19.180906")
# It was Friday on 2021-01-01T02:00:19.180906

Final state determination

The final state of the flow is determined by its return value. The following rules apply:

  • if an exception is raised directly in the flow function, the flow run will be marked as failed
  • if the flow does not return a value (or returns None), its state is determined by the states of all of the tasks and subflows within it - in particular, if any task run or subflow run failed then the final flow run state is marked as failed
  • if a flow returns one or more task run futures, these runs will be used as the reference tasks for determininig the final state of the run - if any returned task runs fail, the flow run will be marked as failed
  • if a flow returns a manually created state, it will be used as the state of the final flow run; this allows for manual determination of final state
  • lastly, if the flow run returns any other object then it will be considered successfully 'Completed'

The following examples illustrate each of these cases:

If an exception is raised within the flow function, the flow will immediately be marked as failed.

from prefect import flow

@flow
def always_fail_flow():
    raise ValueError("This flow immediately fails")

A flow with no return statement is determined by the state of all of its task runs.

from prefect import flow, task

@task
def always_fails_task():
    raise ValueError("I am bad task")

@task
def always_succeeds_task():
    return "foo"

@flow
def always_fails_flow():
    always_fails_task()
    always_succeeds_task()

If a flow returns one or more futures, the final state will be determined based on the underlying states.

from prefect import task, flow

@task
def always_fails_task():
    raise ValueError("I am bad task")

@task
def always_succeeds_task():
    return "foo"

@flow
def always_succeeds_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    return y

If a flow returns one or more states, the final state will be determined based on the return value.

from prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed

@task
def always_fails_task():
    raise ValueError("I am bad task")

@task
def always_succeeds_task():
    return "foo"

@flow
def always_succeeds_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    if y.result() == "foo"
        return Completed(message="I am happy with this result")
    else:
        return Failed(message="How did this happen!?")

If a flow returns any other Python object, the final state will always be 'Completed'.

from prefect import task, flow

@task
def always_fails_task():
    raise ValueError("I am bad task")

@flow
def always_succeeds_flow():
    always_fails_task()
    return "foo"

Subflows

A subflow run takes place when a flow function is called inside the execution of another flow. We refer to the primary flow as the "parent" and the second flow as the "child" or "subflow."

Regardless of what execution they perform, subflows are treated as task runs of the parent flow: a task run is created and properly hooked up to its upstream inputs. This task run inherits and mirrors the state of the subflow it represents. This allows subflows to participate in all dataflow operations that the parent might implement, since they receive and return data like any task. A task that represents a subflow will be annotated as such in its state_details via the presence of a child_flow_run_id field. Conversely, a flow run that was run as a subflow can be identified via the presence of a parent_task_run_id on state_details.