Skip to content

Flow and task execution

So far you've seen that flows are the fundamental component of Prefect workflows, and tasks are components that enable you to encapsulate discrete, repeatable units of work within flows. You've also seen some of the configuration options for flows and tasks.

One of the configuration options demonstrated in the Flow and task configuration tutorial was setting a task runner that enables different execution capabilities for tasks within a single flow run.

Task runners

Task runners are responsible for running Prefect tasks within a flow. Each flow has a task runner associated with it. Depending on the task runner you use, the tasks within your flow can run sequentially, concurrently, or in parallel. You can even configure task runners to use distributed execution infrastructure such as a Dask cluster.

The default task runner is the ConcurrentTaskRunner, which will run submitted tasks concurrently. If you don't specify a task runner, Prefect uses the ConcurrentTaskRunner.

All Prefect task runners support asynchronous task execution.

Result

By default, the result of a task is a Python object, and execution of the task blocks the execution of the next task in a flow. To make sure that the tasks within your flow can run concurrently or in parallel, add .submit() to your task run. This method will return a PrefectFuture instead of a Python object.

A PrefectFuture is an object that provides access to a computation happening in a task runner. Here's an example of using .submit in a flow.

import time
from prefect import task, flow

@task
def my_task():
    return 1

@flow
def my_flow():
    result = my_task.submit()

if __name__ == "__main__":
    my_flow()

Concurrent execution

As mentioned, by default Prefect flows use the ConcurrentTaskRunner for non-blocking, concurrent execution of tasks.

Here's a basic flow and task using the default task runner.

import time
from prefect import task, flow

@task
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()

When you run this flow you should see the terminal output randomly switching between AAAA and BBBB showing that these two tasks are indeed not blocking.

Also notice that Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently... indicates Prefect is, in fact, using concurrent execution by default for the tasks in this flow.

15:07:10.015 | INFO    | prefect.engine - Created flow run 'mindful-tortoise' for flow 'parallel-flow'
15:07:10.015 | INFO    | Flow run 'mindful-tortoise' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
15:07:10.255 | INFO    | Flow run 'mindful-tortoise' - Created task run 'print_values-0bb9a2c3-0' for task 'print_values'
15:07:10.255 | INFO    | Flow run 'mindful-tortoise' - Submitted task run 'print_values-0bb9a2c3-0' for execution.
15:07:10.291 | INFO    | Flow run 'mindful-tortoise' - Created task run 'print_values-0bb9a2c3-1' for task 'print_values'
15:07:10.292 | INFO    | Flow run 'mindful-tortoise' - Submitted task run 'print_values-0bb9a2c3-1' for execution.
15:07:15.364 | INFO    | Task run 'print_values-0bb9a2c3-1' - Finished in state Completed()
15:07:17.849 | INFO    | Task run 'print_values-0bb9a2c3-0' - Finished in state Completed()
15:07:17.876 | INFO    | Flow run 'mindful-tortoise' - Finished in state Completed('All states completed.')

Sequential execution

Sometimes you may want to intentionally run tasks sequentially. The built-in Prefect SequentialTaskRunner lets you do this.

When using non-default task runner, you must import the task runner into your flow script.

import time
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner

@task
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow(task_runner=SequentialTaskRunner())
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()

When you run this flow you should see the terminal output first display AAAA, then BBBB showing that these two task runs execute sequentially, one completing before the second starts.

Also notice that Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially... indicates Prefect is, in fact, using sequential execution.

15:15:28.226 | INFO    | prefect.engine - Created flow run 'thundering-camel' for flow 'my-flow'
15:15:28.227 | INFO    | Flow run 'thundering-camel' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially...
15:15:28.460 | INFO    | Flow run 'thundering-camel' - Created task run 'print_values-0bb9a2c3-0' for task 'print_values'
15:15:28.461 | INFO    | Flow run 'thundering-camel' - Executing 'print_values-0bb9a2c3-0' immediately...
15:15:36.087 | INFO    | Task run 'print_values-0bb9a2c3-0' - Finished in state Completed()
15:15:36.110 | INFO    | Flow run 'thundering-camel' - Created task run 'print_values-0bb9a2c3-1' for task 'print_values'
15:15:36.111 | INFO    | Flow run 'thundering-camel' - Executing 'print_values-0bb9a2c3-1' immediately...
15:15:41.207 | INFO    | Task run 'print_values-0bb9a2c3-1' - Finished in state Completed()
15:15:41.237 | INFO    | Flow run 'thundering-camel' - Finished in state Completed('All states completed.')

Parallel execution

You can also run tasks using parallel or distributed execution by using the Dask or Ray task runners available through Prefect Integrations.

For example, you can achieve parallel task execution, even on in a local execution environment, but using the DaskTaskRunner.

  1. Install the prefect-dask collection with pip install prefect-dask.
  2. Switch your task runner to the DaskTaskRunner.
  3. Call .submit on the task instead of calling the task directly. This submits the task to the task runner rather than running the task in-process.
import time
from prefect import task, flow
from prefect_dask.task_runners import DaskTaskRunner

@task
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow(task_runner=DaskTaskRunner())
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()

Multiprocessing task runners

Because the DaskTaskRunner uses multiprocessing, it must be protected by an if __name__ == "__main__": guard when used in a script.

When you run this flow you should see the terminal output randomly switching between AAAA and BBBB showing that these two tasks are indeed running in parallel.

If you have the bokeh Python package installed you can follow the link to the Dask dashaboard in the terminal output and watch the Dask workers in action!

22:49:06.969 | INFO    | prefect.engine - Created flow run 'bulky-unicorn' for flow 'parallel-flow'
22:49:06.969 | INFO    | Flow run 'bulky-unicorn' - Using task runner 'DaskTaskRunner'
22:49:06.970 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
22:49:09.182 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
...

The Dask scheduler can be hard to predict

When using the DaskTaskRunner, Prefect is submitting each task run to a Dask cluster object. The Dask scheduler then determines when and how each individual run should be executed (with the constraint that the order matches the execution graph that Prefect provided).

This means the only way to force Dask to walk the task graph in a particular order is to configure Prefect dependencies between your tasks.

Read more about using Dask in the Dask task runner tutorial.

Asynchronous execution

Prefect also supports asynchronous task and flow definitions by default. All of the standard rules of async apply:

import asyncio

from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)

asyncio.run(async_flow())

When you run this flow, the coroutines that were gathered yield control to one another and are run concurrently:

1 2 a 6 b 7 c 8 d 9

The example above is equivalent to below:

import asyncio

from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    await print_values.submit("abcd")
    await print_values.submit("6789")

asyncio.run(async_flow())

Note, if you are not using asyncio.gather, calling submit is required for asynchronous execution on the ConcurrentTaskRunner.

Flow execution

Task runners only manage task runs within a flow run. But what about flows?

Any given flow run — meaning in this case your workflow including any subflows and tasks — executes in its own environment using the infrastructure configured for that environment. In these examples, that infrastructure is probably your local computing environment. But for flow runs based on deployments, that infrastructure might be a server in a datacenter, a VM, a Docker container, or a Kubernetes cluster.

The ability to execute flow runs in a non-blocking or parallel manner is subject to execution infrastructure and the configuration of agents and work pools — advanced topics that are covered in other tutorials.

Within a flow, subflow runs behave like normal flow runs, except subflows will block execution of the parent flow until completion. However, asynchronous subflows are supported using AnyIO task groups or asyncio.gather.

Next steps: Flow orchestration with Prefect

The next step is learning about the components of Prefect that enable coordination and orchestration of your flow and task runs.