Skip to content

First steps

Before we stand up our own Orion webserver, database, and UI let's explore the building blocks of a Prefect workflow via an interactive Python session. All code below is copy / pastable into your favorite async-compatible Python REPL.

Define a basic flow

The simplest way to begin with Prefect is to annotate your favorite Python function using the @flow decorator:

from prefect import flow

@flow
def my_favorite_function():
    print("This function doesn't do much")
    return 42

Any function will work, including those that accept arguments:

import requests
from prefect import flow

@flow
def send_post(url):
    return requests.post(url).json()

The positional and keyword arguments defined on your flow function are called parameters.

Asynchronous functions

Even asynchronous functions work with Prefect! We can alter the above example to be fully asynchronous using the httpx library:

import httpx
from prefect import flow

@flow
async def send_post(url):
    with httpx.AsyncClient() as client:
        return await client.post(url).json()
This is a more advanced use case and will be covered in future tutorials.

Run a basic flow

Running a Prefect workflow manually is as easy as calling the annotated function:

>>> state = my_favorite_function()
This function doesn't do much
>>> print(state)
Completed(message=None, type=COMPLETED)

Flows return states

You may notice that this call did not return the number 42 but rather a Prefect State object. States are the basic currency of communication between Prefect clients and the Prefect API, and can be used to define the conditions for orchestration rules as well as an interface for client-side logic. Data can be accessed via the .result() method on the State object.

Error handling

We can see what happens whenever our flow does not complete successfully by running the simple send_post flow above with a bad value for the URL:

>>> state = send_post("foo")
>>> print(state)
Failed(message='Flow run encountered an exception.', type=FAILED)

We see that in this situation the call still returns without raising an exception; however, in contrast to the 'Completed' state, we now encounter a 'Failed' state signaling that something unexpected happened during execution.

As we will see, this behavior is consistent across flow runs and task runs and allows users to respond to failure in a first-class way; whether by configuring orchestration rules in the Orion backend (e.g., retry logic) or by directly responding to failed states in client code.

Run a basic flow with tasks

Let's now add some tasks to our flow so that we can orchestrate and monitor at a more granular level. Creating and adding tasks follows the exact same pattern as for flows - using the @task decorator we annotate our favorite functions:

from prefect import task, flow

import requests

@task
def extract_url_content(url, params=None):
    return requests.get(url, params=params).content


@task
def is_trending(trending_page, repo="prefect"):
    is_trending = repo.encode() in trending_page
    is_phrase = 'not ' if not is_trending else ' '
    print(f"{repo} is {is_phrase}trending.", "\n")
    return is_trending


@flow
def repo_trending_check(url="https://github.com/trending/python", 
                        window="daily"):
    content = extract_url_content(url, params={"since": window})
    return is_trending(content)

As you can see, we still call these tasks as normal functions and can pass their return values to other tasks. We can then call our flow function just as before and see the printed output - Prefect will manage all the relevant intermediate state.

Combining task code with arbitrary Python code

Notice in the above example that all of our Python logic is encapsulated within task functions. While there are many benefits to using Prefect in this way, it is not a strict requirement. Interacting with the results of your Prefect tasks requires an understanding of Prefect futures which will be covered in a later section.

Run a flow within a flow

Not only can you call task functions within a flow, but you can also call other flow functions! Flows run within other flows are called subflows and allow you to efficiently manage, track and version common multi-task logic. Consider the following simple example:

from prefect import flow

@flow
def common_flow(config: dict):
    print("I am a subgraph that shows up in lots of places!")
    intermediate_result = 42
    return intermediate_result

@flow
def main_flow():
    # do some things
    # then call another flow function
    data = common_flow(config={})
    # do more things

# run the flow
flow_state = main_flow()

Whenever we run the main flow as above, a new run will be generated for common_flow as well. Not only is this run tracked as a subflow run of main_flow, but it is also independently inspectable in the UI! You can confirm this for yourself by spinning up the UI using the prefect orion start CLI command from your terminal:

$ prefect orion start

and navigating to the displayed UI URL; you should see all of the runs that we have run throughout this tutorial, including one for common_flow:

Additional Reading

To learn more about the concepts presented here, check out the following resources: