Skip to content

prefect.context

Async and thread safe models for passing runtime context data.

These contexts should never be directly mutated by the user.

ContextModel pydantic-model

A base model for context data that forbids mutation and extra data while providing a context manager

Source code in prefect/context.py
class ContextModel(BaseModel):
    """
    A base model for context data that forbids mutation and extra data while providing
    a context manager
    """

    # The context variable for storing data must be defined by the child class
    __var__: ContextVar
    _token: Token = PrivateAttr(None)

    class Config:
        allow_mutation = False
        arbitrary_types_allowed = True
        extra = "forbid"

    def __enter__(self):
        if self._token is not None:
            raise RuntimeError(
                "Context already entered. Context enter calls cannot be nested."
            )
        self._token = self.__var__.set(self)
        return self

    def __exit__(self, *_):
        if not self._token:
            raise RuntimeError(
                "Asymmetric use of context. Context exit called without an enter."
            )
        self.__var__.reset(self._token)
        self._token = None

    @classmethod
    def get(cls: Type[T]) -> Optional[T]:
        return cls.__var__.get(None)

    def copy(self, **kwargs):
        # Remove the token on copy to avoid re-entrance errors
        new = super().copy(**kwargs)
        new._token = None
        return new

ContextModel.copy

Duplicate a model, optionally choose which fields to include, exclude and change.

:param include: fields to include in new model :param exclude: fields to exclude from new model, as with values this takes precedence over include :param update: values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data :param deep: set to True to make a deep copy of the model :return: new model instance

Source code in prefect/context.py
def copy(self, **kwargs):
    # Remove the token on copy to avoid re-entrance errors
    new = super().copy(**kwargs)
    new._token = None
    return new

FlowRunContext pydantic-model

The context for a flow run. Data in this context is only available from within a flow run function.

Attributes:

Name Description
flow

The flow instance associated with the run

flow_run

The API metadata for the flow run

task_runner

The task runner instance being used for the flow run

result_filesystem

A block to used to persist run state data

task_run_futures

A list of futures for task runs submitted within this flow run

task_run_states

A list of states for task runs created within this flow run

task_run_results

A mapping of result ids to task run states for this flow run

flow_run_states

A list of states for flow runs created within this flow run

sync_portal

A blocking portal for sync task/flow runs in an async flow

timeout_scope

The cancellation scope for flow level timeouts

Source code in prefect/context.py
class FlowRunContext(RunContext):
    """
    The context for a flow run. Data in this context is only available from within a
    flow run function.

    Attributes:
        flow: The flow instance associated with the run
        flow_run: The API metadata for the flow run
        task_runner: The task runner instance being used for the flow run
        result_filesystem: A block to used to persist run state data
        task_run_futures: A list of futures for task runs submitted within this flow run
        task_run_states: A list of states for task runs created within this flow run
        task_run_results: A mapping of result ids to task run states for this flow run
        flow_run_states: A list of states for flow runs created within this flow run
        sync_portal: A blocking portal for sync task/flow runs in an async flow
        timeout_scope: The cancellation scope for flow level timeouts
    """

    flow: "Flow"
    flow_run: FlowRun
    task_runner: BaseTaskRunner
    result_filesystem: WritableFileSystem

    # Counter for task calls allowing unique
    task_run_dynamic_keys: Dict[str, int] = Field(default_factory=dict)

    # Tracking for objects created by this flow run
    task_run_futures: List[PrefectFuture] = Field(default_factory=list)
    task_run_states: List[State] = Field(default_factory=list)
    task_run_results: Dict[int, State] = Field(default_factory=dict)
    flow_run_states: List[State] = Field(default_factory=list)

    # The synchronous portal is only created for async flows for creating engine calls
    # from synchronous task and subflow calls
    sync_portal: Optional[anyio.abc.BlockingPortal] = None
    timeout_scope: Optional[anyio.abc.CancelScope] = None

    # Task group that can be used for background tasks during the flow run
    background_tasks: anyio.abc.TaskGroup

    __var__ = ContextVar("flow_run")

PrefectObjectRegistry pydantic-model

A context that acts as a registry for all Prefect objects that are registered during load and execution.

Attributes:

Name Description
start_time

The time the object registry was created.

DateTimeTZ
block_code_execution

If set, flow calls will be ignored.

bool
capture_failures

If set, failures during init will be silenced and tracked.

bool
Source code in prefect/context.py
class PrefectObjectRegistry(ContextModel):
    """
    A context that acts as a registry for all Prefect objects that are
    registered during load and execution.

    Attributes:
        start_time: The time the object registry was created.
        block_code_execution: If set, flow calls will be ignored.
        capture_failures: If set, failures during __init__ will be silenced and tracked.
    """

    start_time: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))

    _instance_registry: Dict[Type[T], List[T]] = PrivateAttr(
        default_factory=lambda: defaultdict(list)
    )

    # Failures will be a tuple of (exception, instance, args, kwargs)
    _instance_init_failures: Dict[
        Type[T], List[Tuple[Exception, T, Tuple, Dict]]
    ] = PrivateAttr(default_factory=lambda: defaultdict(list))

    block_code_execution: bool = False
    capture_failures: bool = False

    __var__ = ContextVar("object_registry")

    def get_instances(self, type_: Type[T]) -> List[T]:
        instances = []
        for registered_type, type_instances in self._instance_registry.items():
            if type_ in registered_type.mro():
                instances.extend(type_instances)
        return instances

    def get_instance_failures(
        self, type_: Type[T]
    ) -> List[Tuple[Exception, T, Tuple, Dict]]:
        failures = []
        for type__ in type_.mro():
            failures.extend(self._instance_init_failures[type__])
        return failures

    def register_instance(self, object):
        # TODO: Consider using a 'Set' to avoid duplicate entries
        self._instance_registry[type(object)].append(object)

    def register_init_failure(
        self, exc: Exception, object: Any, init_args: Tuple, init_kwargs: Dict
    ):
        self._instance_init_failures[type(object)].append(
            (exc, object, init_args, init_kwargs)
        )

    @classmethod
    def register_instances(cls, type_: Type):
        """
        Decorator for a class that adds registration to the `PrefectObjectRegistry`
        on initialization of instances.
        """
        __init__ = type_.__init__

        def __register_init__(__self__, *args, **kwargs):
            registry = cls.get()
            try:
                __init__(__self__, *args, **kwargs)
            except Exception as exc:
                if not registry or not registry.capture_failures:
                    raise
                else:
                    registry.register_init_failure(exc, __self__, args, kwargs)
            else:
                if registry:
                    registry.register_instance(__self__)

        type_.__init__ = __register_init__
        return type_

PrefectObjectRegistry.register_instances classmethod

Decorator for a class that adds registration to the PrefectObjectRegistry on initialization of instances.

Source code in prefect/context.py
@classmethod
def register_instances(cls, type_: Type):
    """
    Decorator for a class that adds registration to the `PrefectObjectRegistry`
    on initialization of instances.
    """
    __init__ = type_.__init__

    def __register_init__(__self__, *args, **kwargs):
        registry = cls.get()
        try:
            __init__(__self__, *args, **kwargs)
        except Exception as exc:
            if not registry or not registry.capture_failures:
                raise
            else:
                registry.register_init_failure(exc, __self__, args, kwargs)
        else:
            if registry:
                registry.register_instance(__self__)

    type_.__init__ = __register_init__
    return type_

RunContext pydantic-model

The base context for a flow or task run. Data in this context will always be available when get_run_context is called.

Attributes:

Name Description
start_time

The time the run context was entered

DateTimeTZ
client

The Orion client instance being used for API communication

OrionClient
Source code in prefect/context.py
class RunContext(ContextModel):
    """
    The base context for a flow or task run. Data in this context will always be
    available when `get_run_context` is called.

    Attributes:
        start_time: The time the run context was entered
        client: The Orion client instance being used for API communication
    """

    start_time: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))
    client: OrionClient

SettingsContext pydantic-model

The context for a Prefect settings.

This allows for safe concurrent access and modification of settings.

Attributes:

Name Description
profile

The profile that is in use.

Profile
settings

The complete settings model.

Settings
Source code in prefect/context.py
class SettingsContext(ContextModel):
    """
    The context for a Prefect settings.

    This allows for safe concurrent access and modification of settings.

    Attributes:
        profile: The profile that is in use.
        settings: The complete settings model.
    """

    profile: Profile
    settings: Settings

    __var__ = ContextVar("settings")

    def __hash__(self) -> int:
        return hash(self.settings)

    def __enter__(self):
        """
        Upon entrance, we ensure the home directory for the profile exists.
        """
        return_value = super().__enter__()

        try:
            os.makedirs(self.settings.value_of(PREFECT_HOME), exist_ok=True)
        except OSError:
            warnings.warn(
                "Failed to create the Prefect home directory at "
                f"{self.settings.value_of(PREFECT_HOME)}",
                stacklevel=2,
            )

        return return_value

    @classmethod
    def get(cls) -> "SettingsContext":
        # Return the global context instead of `None` if no context exists
        return super().get() or GLOBAL_SETTINGS_CONTEXT

SettingsContext.__enter__ special

Upon entrance, we ensure the home directory for the profile exists.

Source code in prefect/context.py
def __enter__(self):
    """
    Upon entrance, we ensure the home directory for the profile exists.
    """
    return_value = super().__enter__()

    try:
        os.makedirs(self.settings.value_of(PREFECT_HOME), exist_ok=True)
    except OSError:
        warnings.warn(
            "Failed to create the Prefect home directory at "
            f"{self.settings.value_of(PREFECT_HOME)}",
            stacklevel=2,
        )

    return return_value

SettingsContext.__hash__ special

Return hash(self).

Source code in prefect/context.py
def __hash__(self) -> int:
    return hash(self.settings)

TagsContext pydantic-model

The context for prefect.tags management.

Attributes:

Name Description
current_tags

A set of current tags in the context

Set[str]
Source code in prefect/context.py
class TagsContext(ContextModel):
    """
    The context for `prefect.tags` management.

    Attributes:
        current_tags: A set of current tags in the context
    """

    current_tags: Set[str] = Field(default_factory=set)

    @classmethod
    def get(cls) -> "TagsContext":
        # Return an empty `TagsContext` instead of `None` if no context exists
        return cls.__var__.get(TagsContext())

    __var__ = ContextVar("tags")

TaskRunContext pydantic-model

The context for a task run. Data in this context is only available from within a task run function.

Attributes:

Name Description
task

The task instance associated with the task run

task_run

The API metadata for this task run

result_filesystem

A block to used to persist run state data

Source code in prefect/context.py
class TaskRunContext(RunContext):
    """
    The context for a task run. Data in this context is only available from within a
    task run function.

    Attributes:
        task: The task instance associated with the task run
        task_run: The API metadata for this task run
        result_filesystem: A block to used to persist run state data
    """

    task: "Task"
    task_run: TaskRun
    result_filesystem: WritableFileSystem

    __var__ = ContextVar("task_run")

get_run_context

Get the current run context from within a task or flow function.

Returns:

Type Description
Union[prefect.context.FlowRunContext, prefect.context.TaskRunContext]

A FlowRunContext or TaskRunContext depending on the function type.

Raises RuntimeError: If called outside of a flow or task run.

Source code in prefect/context.py
def get_run_context() -> Union[FlowRunContext, TaskRunContext]:
    """
    Get the current run context from within a task or flow function.

    Returns:
        A `FlowRunContext` or `TaskRunContext` depending on the function type.

    Raises
        RuntimeError: If called outside of a flow or task run.
    """
    task_run_ctx = TaskRunContext.get()
    if task_run_ctx:
        return task_run_ctx

    flow_run_ctx = FlowRunContext.get()
    if flow_run_ctx:
        return flow_run_ctx

    raise MissingContextError(
        "No run context available. You are not in a flow or task run context."
    )

get_settings_context

Get the current settings context which contains profile information and the settings that are being used.

Generally, the settings that are being used are a combination of values from the profile and environment. See prefect.context.use_profile for more details.

Source code in prefect/context.py
def get_settings_context() -> SettingsContext:
    """
    Get the current settings context which contains profile information and the
    settings that are being used.

    Generally, the settings that are being used are a combination of values from the
    profile and environment. See `prefect.context.use_profile` for more details.
    """
    settings_ctx = SettingsContext.get()

    if not settings_ctx:
        raise MissingContextError("No settings context found.")

    return settings_ctx

registry_from_script

Return a fresh registry with instances populated from execution of a script.

Source code in prefect/context.py
def registry_from_script(
    path: str,
    block_code_execution: bool = True,
    capture_failures: bool = True,
) -> PrefectObjectRegistry:
    """
    Return a fresh registry with instances populated from execution of a script.
    """
    with PrefectObjectRegistry(
        block_code_execution=block_code_execution,
        capture_failures=capture_failures,
    ) as registry:
        load_script_as_module(path)

    return registry

root_settings_context

Return the settings context that will exist as the root context for the module.

The profile to use is determined with the following precedence - Command line via 'prefect --profile ' - Environment variable via 'PREFECT_PROFILE' - Profiles file via the 'active' key

Source code in prefect/context.py
def root_settings_context():
    """
    Return the settings context that will exist as the root context for the module.

    The profile to use is determined with the following precedence
    - Command line via 'prefect --profile <name>'
    - Environment variable via 'PREFECT_PROFILE'
    - Profiles file via the 'active' key
    """
    profiles = prefect.settings.load_profiles()
    active_name = profiles.active_name
    profile_source = "in the profiles file"

    if "PREFECT_PROFILE" in os.environ:
        active_name = os.environ["PREFECT_PROFILE"]
        profile_source = "by environment variable"

    if (
        sys.argv[0].endswith("/prefect")
        and len(sys.argv) >= 3
        and sys.argv[1] == "--profile"
    ):
        active_name = sys.argv[2]
        profile_source = "by command line argument"

    if active_name not in profiles.names:
        print(
            f"WARNING: Active profile {active_name!r} set {profile_source} not "
            "found. The default profile will be used instead. ",
            file=sys.stderr,
        )
        active_name = "default"

    with use_profile(
        profiles[active_name],
        # Override environment variables if the profile was set by the CLI
        override_environment_variables=profile_source == "by command line argument",
    ) as settings_context:
        return settings_context

    # Note the above context is exited and the global settings context is used by
    # an override in the `SettingsContext.get` method.

tags

Context manager to add tags to flow and task run calls.

Tags are always combined with any existing tags.

Yields:

Type Description
Set[str]

The current set of tags

Examples:

>>> from prefect import tags, task, flow
>>> @task
>>> def my_task():
>>>     pass

Run a task with tags

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         my_task()  # has tags: a, b

Run a flow with tags

>>> @flow
>>> def my_flow():
>>>     pass
>>> with tags("a", b"):
>>>     my_flow()  # has tags: a, b

Run a task with nested tag contexts

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         with tags("c", "d"):
>>>             my_task()  # has tags: a, b, c, d
>>>         my_task()  # has tags: a, b

Inspect the current tags

>>> @flow
>>> def my_flow():
>>>     with tags("c", "d"):
>>>         with tags("e", "f") as current_tags:
>>>              print(current_tags)
>>> with tags("a", "b"):
>>>     my_flow()
{"a", "b", "c", "d", "e", "f"}
Source code in prefect/context.py
@contextmanager
def tags(*new_tags: str) -> Set[str]:
    """
    Context manager to add tags to flow and task run calls.

    Tags are always combined with any existing tags.

    Yields:
        The current set of tags

    Examples:
        >>> from prefect import tags, task, flow
        >>> @task
        >>> def my_task():
        >>>     pass

        Run a task with tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         my_task()  # has tags: a, b

        Run a flow with tags

        >>> @flow
        >>> def my_flow():
        >>>     pass
        >>> with tags("a", b"):
        >>>     my_flow()  # has tags: a, b

        Run a task with nested tag contexts

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         with tags("c", "d"):
        >>>             my_task()  # has tags: a, b, c, d
        >>>         my_task()  # has tags: a, b

        Inspect the current tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("c", "d"):
        >>>         with tags("e", "f") as current_tags:
        >>>              print(current_tags)
        >>> with tags("a", "b"):
        >>>     my_flow()
        {"a", "b", "c", "d", "e", "f"}
    """
    current_tags = TagsContext.get().current_tags
    new_tags = current_tags.union(new_tags)
    with TagsContext(current_tags=new_tags):
        yield new_tags

use_profile

Switch to a profile for the duration of this context.

Profile contexts are confined to an async context in a single thread.

Parameters:

Name Description Default
profile

The name of the profile to load or an instance of a Profile.

Union[prefect.settings.Profile, str]
required
override_environment_variable

If set, variables in the profile will take precedence over current environment variables. By default, environment variables will override profile settings.

required
include_current_context

If set, the new settings will be constructed with the current settings context as a base. If not set, the use_base settings will be loaded from the environment and defaults.

bool
True

Yields:

Type Description

The created SettingsContext object

Source code in prefect/context.py
@contextmanager
def use_profile(
    profile: Union[Profile, str],
    override_environment_variables: bool = False,
    include_current_context: bool = True,
):
    """
    Switch to a profile for the duration of this context.

    Profile contexts are confined to an async context in a single thread.

    Args:
        profile: The name of the profile to load or an instance of a Profile.
        override_environment_variable: If set, variables in the profile will take
            precedence over current environment variables. By default, environment
            variables will override profile settings.
        include_current_context: If set, the new settings will be constructed
            with the current settings context as a base. If not set, the use_base settings
            will be loaded from the environment and defaults.

    Yields:
        The created `SettingsContext` object
    """
    if isinstance(profile, str):
        profiles = prefect.settings.load_profiles()
        profile = profiles[profile]

    if not isinstance(profile, Profile):
        raise TypeError(
            f"Unexpected type {type(profile).__name__!r} for `profile`. "
            "Expected 'str' or 'Profile'."
        )

    # Create a copy of the profiles settings as we will mutate it
    profile_settings = profile.settings.copy()

    existing_context = SettingsContext.get()
    if existing_context and include_current_context:
        settings = existing_context.settings
    else:
        settings = prefect.settings.get_settings_from_env()

    if not override_environment_variables:
        for key in os.environ:
            if key in prefect.settings.SETTING_VARIABLES:
                profile_settings.pop(prefect.settings.SETTING_VARIABLES[key], None)

    new_settings = settings.copy_with_update(updates=profile_settings)

    with SettingsContext(profile=profile, settings=new_settings) as ctx:
        yield ctx