Skip to content

prefect.projects.steps.core

Core primitives for running Prefect project steps.

Project steps are YAML representations of Python functions along with their inputs.

Whenever a step is run, the following actions are taken:

  • The step's inputs and block / variable references are resolved (see the projects concepts documentation for more details)
  • The step's function is imported; if it cannot be found, the requires keyword is used to install the necessary packages
  • The step's function is called with the resolved inputs
  • The step's output is returned and used to resolve inputs for subsequent steps

StepExecutionError

Bases: Exception

Raised when a step fails to execute.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/steps/core.py
33
34
35
36
class StepExecutionError(Exception):
    """
    Raised when a step fails to execute.
    """

run_step async

Runs a step, returns the step's output.

Steps are assumed to be in the format {"importable.func.name": {"kwarg1": "value1", ...}}.

The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the inputs before passing to the step function:

This keyword is used to specify packages that should be installed before running the step.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/steps/core.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def run_step(step: Dict, upstream_outputs: Optional[Dict] = None) -> Dict:
    """
    Runs a step, returns the step's output.

    Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`.

    The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the
    inputs before passing to the step function:

    This keyword is used to specify packages that should be installed before running the step.
    """
    fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
    upstream_outputs = upstream_outputs or {}

    if len(step.keys()) > 1:
        raise ValueError(
            f"Step has unexpected additional keys: {', '.join(list(step.keys())[1:])}"
        )

    keywords = {
        keyword: inputs.pop(keyword)
        for keyword in RESERVED_KEYWORDS
        if keyword in inputs
    }

    inputs = apply_values(inputs, upstream_outputs)
    inputs = await resolve_block_document_references(inputs)
    inputs = await resolve_variables(inputs)

    step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
    result = await from_async.call_soon_in_new_thread(
        Call.new(step_func, **inputs)
    ).aresult()
    return result