prefect.tasks
Module containing the base workflow task class and decorator - for most use cases, using the @task
decorator is preferred.
Task
A Prefect task definition.
Note
We recommend using the @task
decorator for most use-cases.
Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function creates a new task run.
To preserve the input and output types, we use the generic type variables P and R for "Parameters" and "Returns" respectively.
Parameters:
Name | Description | Default |
---|---|---|
name |
An optional name for the task; if not provided, the name will be inferred from the given function. str |
None |
description |
An optional string description for the task. str |
None |
tags |
An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a Iterable[str] |
None |
cache_key_fn |
An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again. Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] |
None |
cache_expiration |
An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire. timedelta |
None |
retries |
An optional number of times to retry on task run failure. int |
0 |
retry_delay_seconds |
An optional number of seconds to wait before retrying the
task after failure. This is only applicable if Union[float, int] |
0 |
Source code in prefect/tasks.py
class Task(Generic[P, R]):
"""
A Prefect task definition.
!!! note
We recommend using [the `@task` decorator][prefect.tasks.task] for most use-cases.
Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function
creates a new task run.
To preserve the input and output types, we use the generic type variables P and R for "Parameters" and
"Returns" respectively.
Args:
name: An optional name for the task; if not provided, the name will be inferred
from the given function.
description: An optional string description for the task.
tags: An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a `prefect.tags` context at
task runtime.
cache_key_fn: An optional callable that, given the task run context and call
parameters, generates a string key; if the key matches a previous completed
state, that state result will be restored instead of running the task again.
cache_expiration: An optional amount of time indicating how long cached states
for this task should be restorable; if not provided, cached states will
never expire.
retries: An optional number of times to retry on task run failure.
retry_delay_seconds: An optional number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
"""
# NOTE: These parameters (types, defaults, and docstrings) should be duplicated
# exactly in the @task decorator
def __init__(
self,
fn: Callable[P, R],
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
if not callable(fn):
raise TypeError("'fn' must be callable")
self.name = name or fn.__name__
self.description = description or inspect.getdoc(fn)
update_wrapper(self, fn)
self.fn = fn
self.isasync = inspect.iscoroutinefunction(self.fn)
if "wait_for" in inspect.signature(self.fn).parameters:
raise ReservedArgumentError(
"'wait_for' is a reserved argument name and cannot be used in task functions."
)
self.tags = set(tags if tags else [])
# the task key is a hash of (name, fn, tags)
# which is a stable representation of this unit of work.
# note runtime tags are not part of the task key; they will be
# recorded as metadata only.
self.task_key = stable_hash(
self.name,
to_qualified_name(self.fn),
str(sorted(self.tags or [])),
)
self._dynamic_key = 0
self.cache_key_fn = cache_key_fn
self.cache_expiration = cache_expiration
# TaskRunPolicy settings
# TODO: We can instantiate a `TaskRunPolicy` and add Pydantic bound checks to
# validate that the user passes positive numbers here
self.retries = retries
self.retry_delay_seconds = retry_delay_seconds
def with_options(
self,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Create a new task from the current object, updating provided options.
Args:
name: A new name for the task.
description: A new description for the task.
tags: A new set of tags for the task. If given, existing tags are ignored,
not merged.
cache_key_fn: A new cache key function for the task.
cache_expiration: A new cache expiration time for the task.
retries: A new number of times to retry on task run failure.
retry_delay_seconds: A new number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A new `Task` instance.
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
"""
return Task(
fn=self.fn,
name=name or self.name,
description=description or self.description,
tags=tags or copy(self.tags),
cache_key_fn=cache_key_fn or self.cache_key_fn,
cache_expiration=cache_expiration or self.cache_expiration,
retries=retries or self.retries,
retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
)
@overload
def __call__(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFuture[None, Sync]:
# `NoReturn` matches if a type can't be inferred for the function which stops a
# sync function from matching the `Coroutine` overload
...
@overload
def __call__(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> Awaitable[PrefectFuture[T, Async]]:
...
@overload
def __call__(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFuture[T, Sync]:
...
def __call__(
self,
*args: Any,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
"""
Run the task - must be called within a flow function.
If writing an async task, this call must be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Args:
*args: Arguments to run the task with
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword arguments to run the task with
Returns:
A future allowing asynchronous access to the state of the task
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task().wait().result)
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2(wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
# Get the dynamic key for this call
dynamic_key = self.get_and_update_dynamic_key()
# Update the dynamic key so future task calls are distinguishable from this one
return enter_task_run_engine(
self,
parameters=parameters,
dynamic_key=dynamic_key,
wait_for=wait_for,
)
def get_and_update_dynamic_key(self) -> str:
"""
When tasks are called, they call this method to get a key unique to that task
call; this allows the backend to distinguish repeated task calls.
"""
current_key = str(self._dynamic_key)
# Increment the key
self._dynamic_key += 1
return current_key
Task.__call__
special
Run the task - must be called within a flow function.
If writing an async task, this call must be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the SequentialTaskRunner
does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Parameters:
Name | Description | Default |
---|---|---|
*args |
Arguments to run the task with Any |
() |
wait_for |
Upstream task futures to wait for before starting the task Optional[Iterable[prefect.futures.PrefectFuture]] |
None |
**kwargs |
Keyword arguments to run the task with Any |
{} |
Returns:
Type | Description |
---|---|
Union[prefect.futures.PrefectFuture, Awaitable[prefect.futures.PrefectFuture]] |
A future allowing asynchronous access to the state of the task |
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task().wait().result)
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2(wait_for=[x])
Source code in prefect/tasks.py
def __call__(
self,
*args: Any,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
"""
Run the task - must be called within a flow function.
If writing an async task, this call must be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Args:
*args: Arguments to run the task with
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword arguments to run the task with
Returns:
A future allowing asynchronous access to the state of the task
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task().wait().result)
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2(wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
# Get the dynamic key for this call
dynamic_key = self.get_and_update_dynamic_key()
# Update the dynamic key so future task calls are distinguishable from this one
return enter_task_run_engine(
self,
parameters=parameters,
dynamic_key=dynamic_key,
wait_for=wait_for,
)
Task.get_and_update_dynamic_key
When tasks are called, they call this method to get a key unique to that task call; this allows the backend to distinguish repeated task calls.
Source code in prefect/tasks.py
def get_and_update_dynamic_key(self) -> str:
"""
When tasks are called, they call this method to get a key unique to that task
call; this allows the backend to distinguish repeated task calls.
"""
current_key = str(self._dynamic_key)
# Increment the key
self._dynamic_key += 1
return current_key
Task.with_options
Create a new task from the current object, updating provided options.
Parameters:
Name | Description | Default |
---|---|---|
name |
A new name for the task. str |
None |
description |
A new description for the task. str |
None |
tags |
A new set of tags for the task. If given, existing tags are ignored, not merged. Iterable[str] |
None |
cache_key_fn |
A new cache key function for the task. Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] |
None |
cache_expiration |
A new cache expiration time for the task. timedelta |
None |
retries |
A new number of times to retry on task run failure. int |
0 |
retry_delay_seconds |
A new number of seconds to wait before retrying the
task after failure. This is only applicable if Union[float, int] |
0 |
Returns:
Type | Description |
---|---|
A new |
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
Source code in prefect/tasks.py
def with_options(
self,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Create a new task from the current object, updating provided options.
Args:
name: A new name for the task.
description: A new description for the task.
tags: A new set of tags for the task. If given, existing tags are ignored,
not merged.
cache_key_fn: A new cache key function for the task.
cache_expiration: A new cache expiration time for the task.
retries: A new number of times to retry on task run failure.
retry_delay_seconds: A new number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A new `Task` instance.
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
"""
return Task(
fn=self.fn,
name=name or self.name,
description=description or self.description,
tags=tags or copy(self.tags),
cache_key_fn=cache_key_fn or self.cache_key_fn,
cache_expiration=cache_expiration or self.cache_expiration,
retries=retries or self.retries,
retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
)
task
Decorator to designate a function as a task in a Prefect workflow.
This decorator may be used for asynchronous or synchronous functions.
Parameters:
Name | Description | Default |
---|---|---|
name |
An optional name for the task; if not provided, the name will be inferred from the given function. str |
None |
description |
An optional string description for the task. str |
None |
tags |
An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a Iterable[str] |
None |
cache_key_fn |
An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again. Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] |
None |
cache_expiration |
An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire. timedelta |
None |
retries |
An optional number of times to retry on task run failure int |
0 |
retry_delay_seconds |
An optional number of seconds to wait before retrying the
task after failure. This is only applicable if Union[float, int] |
0 |
Returns:
Type | Description |
---|---|
A callable |
Examples:
Define a simple task
>>> @task
>>> def add(x, y):
>>> return x + y
Define an async task
>>> @task
>>> async def add(x, y):
>>> return x + y
Define a task with tags and a description
>>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
>>> def my_task():
>>> pass
Define a task with a custom name
>>> @task(name="The Ultimate Task")
>>> def my_task():
>>> pass
Define a task that retries 3 times with a 5 second delay between attempts
>>> from random import randint
>>>
>>> @task(retries=3, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
Define a task that is cached for a day based on its inputs
>>> from prefect.tasks import task_input_hash
>>> from datetime import timedelta
>>>
>>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
>>> def my_task():
>>> return "hello"
Source code in prefect/tasks.py
def task(
__fn=None,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Decorator to designate a function as a task in a Prefect workflow.
This decorator may be used for asynchronous or synchronous functions.
Args:
name: An optional name for the task; if not provided, the name will be inferred
from the given function.
description: An optional string description for the task.
tags: An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a `prefect.tags` context at
task runtime.
cache_key_fn: An optional callable that, given the task run context and call
parameters, generates a string key; if the key matches a previous completed
state, that state result will be restored instead of running the task again.
cache_expiration: An optional amount of time indicating how long cached states
for this task should be restorable; if not provided, cached states will
never expire.
retries: An optional number of times to retry on task run failure
retry_delay_seconds: An optional number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A callable `Task` object which, when called, will submit the task for execution.
Examples:
Define a simple task
>>> @task
>>> def add(x, y):
>>> return x + y
Define an async task
>>> @task
>>> async def add(x, y):
>>> return x + y
Define a task with tags and a description
>>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
>>> def my_task():
>>> pass
Define a task with a custom name
>>> @task(name="The Ultimate Task")
>>> def my_task():
>>> pass
Define a task that retries 3 times with a 5 second delay between attempts
>>> from random import randint
>>>
>>> @task(retries=3, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
Define a task that is cached for a day based on its inputs
>>> from prefect.tasks import task_input_hash
>>> from datetime import timedelta
>>>
>>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
>>> def my_task():
>>> return "hello"
"""
if __fn:
return cast(
Task[P, R],
Task(
fn=__fn,
name=name,
description=description,
tags=tags,
cache_key_fn=cache_key_fn,
cache_expiration=cache_expiration,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
),
)
else:
return cast(
Callable[[Callable[P, R]], Task[P, R]],
partial(
task,
name=name,
description=description,
tags=tags,
cache_key_fn=cache_key_fn,
cache_expiration=cache_expiration,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
),
)
task_input_hash
A task cache key implementation which hashes all inputs to the task using a JSON or cloudpickle serializer. If any arguments are not JSON serializable, the pickle serializer is used as a fallback. If cloudpickle fails, this will return a null key indicating that a cache key could not be generated for the given inputs.
Parameters:
Name | Description | Default |
---|---|---|
context |
the active TaskRunContext |
required |
arguments |
a dictionary of arguments to be passed to the underlying task Dict[str, Any] |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
a string hash if hashing succeeded, else |
Source code in prefect/tasks.py
def task_input_hash(
context: "TaskRunContext", arguments: Dict[str, Any]
) -> Optional[str]:
"""
A task cache key implementation which hashes all inputs to the task using a JSON or
cloudpickle serializer. If any arguments are not JSON serializable, the pickle
serializer is used as a fallback. If cloudpickle fails, this will return a null key
indicating that a cache key could not be generated for the given inputs.
Arguments:
context: the active `TaskRunContext`
arguments: a dictionary of arguments to be passed to the underlying task
Returns:
a string hash if hashing succeeded, else `None`
"""
return hash_objects(
# We use the task key to get the qualified name for the task and include the
# task functions `co_code` bytes to avoid caching when the underlying function
# changes
context.task.task_key,
context.task.fn.__code__.co_code,
arguments,
)