Skip to content

prefect.deployments.steps.utility

Utility project steps that are useful for managing a project's deployment lifecycle.

Steps within this module can be used within a build, push, or pull deployment action.

Example

Use the run_shell_script setp to retrieve the short Git commit hash of the current repository and use it as a Docker image tag:

build:
    - prefect.deployments.steps.run_shell_script:
        id: get-commit-hash
        script: git rev-parse --short HEAD
        stream_output: false
    - prefect_docker.deployments.steps.build_docker_image:
        requires: prefect-docker
        image_name: my-image
        image_tag: "{{ get-commit-hash.stdout }}"
        dockerfile: auto

RunShellScriptResult

Bases: TypedDict

The result of a run_shell_script step.

Attributes:

Name Type Description
stdout str

The captured standard output of the script.

stderr str

The captured standard error of the script.

Source code in prefect/deployments/steps/utility.py
62
63
64
65
66
67
68
69
70
71
72
class RunShellScriptResult(TypedDict):
    """
    The result of a `run_shell_script` step.

    Attributes:
        stdout: The captured standard output of the script.
        stderr: The captured standard error of the script.
    """

    stdout: str
    stderr: str

pip_install_requirements async

Installs dependencies from a requirements.txt file.

Parameters:

Name Type Description Default
requirements_file str

The requirements.txt to use for installation.

'requirements.txt'
directory Optional[str]

The directory the requirements.txt file is in. Defaults to the current working directory.

None
stream_output bool

Whether to stream the output from pip install should be streamed to the console

True

Returns:

Type Description

A dictionary with the keys stdout and stderr containing the output the pip install command

Raises:

Type Description
CalledProcessError

if the pip install command fails for any reason

Example
pull:
    - prefect.deployments.steps.git_clone:
        id: clone-step
        repository: https://github.com/org/repo.git
    - prefect.deployments.steps.pip_install_requirements:
        directory: {{ clone-step.directory }}
        requirements_file: requirements.txt
        stream_output: False
Source code in prefect/deployments/steps/utility.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
async def pip_install_requirements(
    directory: Optional[str] = None,
    requirements_file: str = "requirements.txt",
    stream_output: bool = True,
):
    """
    Installs dependencies from a requirements.txt file.

    Args:
        requirements_file: The requirements.txt to use for installation.
        directory: The directory the requirements.txt file is in. Defaults to
            the current working directory.
        stream_output: Whether to stream the output from pip install should be
            streamed to the console

    Returns:
        A dictionary with the keys `stdout` and `stderr` containing the output
            the `pip install` command

    Raises:
        subprocess.CalledProcessError: if the pip install command fails for any reason

    Example:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                id: clone-step
                repository: https://github.com/org/repo.git
            - prefect.deployments.steps.pip_install_requirements:
                directory: {{ clone-step.directory }}
                requirements_file: requirements.txt
                stream_output: False
        ```
    """
    stdout_sink = io.StringIO()
    stderr_sink = io.StringIO()

    async with open_process(
        [get_sys_executable(), "-m", "pip", "install", "-r", requirements_file],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        cwd=directory,
    ) as process:
        await _stream_capture_process_output(
            process,
            stdout_sink=stdout_sink,
            stderr_sink=stderr_sink,
            stream_output=stream_output,
        )
        await process.wait()

        if process.returncode != 0:
            raise RuntimeError(
                f"pip_install_requirements failed with error code {process.returncode}:"
                f" {stderr_sink.getvalue()}"
            )

    return {
        "stdout": stdout_sink.getvalue().strip(),
        "stderr": stderr_sink.getvalue().strip(),
    }

run_shell_script async

Runs one or more shell commands in a subprocess. Returns the standard output and standard error of the script.

Parameters:

Name Type Description Default
script str

The script to run

required
directory Optional[str]

The directory to run the script in. Defaults to the current working directory.

None
env Optional[Dict[str, str]]

A dictionary of environment variables to set for the script

None
stream_output bool

Whether to stream the output of the script to stdout/stderr

True
expand_env_vars bool

Whether to expand environment variables in the script before running it

False

Returns:

Type Description
RunShellScriptResult

A dictionary with the keys stdout and stderr containing the output of the script

Examples:

Retrieve the short Git commit hash of the current repository to use as a Docker image tag:

build:
    - prefect.deployments.steps.run_shell_script:
        id: get-commit-hash
        script: git rev-parse --short HEAD
        stream_output: false
    - prefect_docker.deployments.steps.build_docker_image:
        requires: prefect-docker
        image_name: my-image
        image_tag: "{{ get-commit-hash.stdout }}"
        dockerfile: auto

Run a multi-line shell script:

build:
    - prefect.deployments.steps.run_shell_script:
        script: |
            echo "Hello"
            echo "World"

Run a shell script with environment variables:

build:
    - prefect.deployments.steps.run_shell_script:
        script: echo "Hello $NAME"
        env:
            NAME: World

Run a shell script with environment variables expanded from the current environment:

pull:
    - prefect.deployments.steps.run_shell_script:
        script: |
            echo "User: $USER"
            echo "Home Directory: $HOME"
        stream_output: true
        expand_env_vars: true

Run a shell script in a specific directory:

build:
    - prefect.deployments.steps.run_shell_script:
        script: echo "Hello"
        directory: /path/to/directory

Run a script stored in a file:

build:
    - prefect.deployments.steps.run_shell_script:
        script: "bash path/to/script.sh"

Source code in prefect/deployments/steps/utility.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def run_shell_script(
    script: str,
    directory: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    stream_output: bool = True,
    expand_env_vars: bool = False,
) -> RunShellScriptResult:
    """
    Runs one or more shell commands in a subprocess. Returns the standard
    output and standard error of the script.

    Args:
        script: The script to run
        directory: The directory to run the script in. Defaults to the current
            working directory.
        env: A dictionary of environment variables to set for the script
        stream_output: Whether to stream the output of the script to
            stdout/stderr
        expand_env_vars: Whether to expand environment variables in the script
            before running it

    Returns:
        A dictionary with the keys `stdout` and `stderr` containing the output
            of the script

    Examples:
        Retrieve the short Git commit hash of the current repository to use as
            a Docker image tag:
        ```yaml
        build:
            - prefect.deployments.steps.run_shell_script:
                id: get-commit-hash
                script: git rev-parse --short HEAD
                stream_output: false
            - prefect_docker.deployments.steps.build_docker_image:
                requires: prefect-docker
                image_name: my-image
                image_tag: "{{ get-commit-hash.stdout }}"
                dockerfile: auto
        ```

        Run a multi-line shell script:
        ```yaml
        build:
            - prefect.deployments.steps.run_shell_script:
                script: |
                    echo "Hello"
                    echo "World"
        ```

        Run a shell script with environment variables:
        ```yaml
        build:
            - prefect.deployments.steps.run_shell_script:
                script: echo "Hello $NAME"
                env:
                    NAME: World
        ```

        Run a shell script with environment variables expanded
            from the current environment:
        ```yaml
        pull:
            - prefect.deployments.steps.run_shell_script:
                script: |
                    echo "User: $USER"
                    echo "Home Directory: $HOME"
                stream_output: true
                expand_env_vars: true
        ```

        Run a shell script in a specific directory:
        ```yaml
        build:
            - prefect.deployments.steps.run_shell_script:
                script: echo "Hello"
                directory: /path/to/directory
        ```

        Run a script stored in a file:
        ```yaml
        build:
            - prefect.deployments.steps.run_shell_script:
                script: "bash path/to/script.sh"
        ```
    """
    current_env = os.environ.copy()
    current_env.update(env or {})

    commands = script.splitlines()
    stdout_sink = io.StringIO()
    stderr_sink = io.StringIO()

    for command in commands:
        if expand_env_vars:
            # Expand environment variables in command and provided environment
            command = string.Template(command).safe_substitute(current_env)
        split_command = shlex.split(command, posix=sys.platform != "win32")
        if not split_command:
            continue
        async with open_process(
            split_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=directory,
            env=current_env,
        ) as process:
            await _stream_capture_process_output(
                process,
                stdout_sink=stdout_sink,
                stderr_sink=stderr_sink,
                stream_output=stream_output,
            )

            await process.wait()

            if process.returncode != 0:
                raise RuntimeError(
                    f"`run_shell_script` failed with error code {process.returncode}:"
                    f" {stderr_sink.getvalue()}"
                )

    return {
        "stdout": stdout_sink.getvalue().strip(),
        "stderr": stderr_sink.getvalue().strip(),
    }