Skip to content

prefect.utilities.processutils

forward_signal_handler

Forward subsequent signum events (e.g. interrupts) to respective signums.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def forward_signal_handler(
    pid: int, signum: int, *signums: int, process_name: str, print_fn: Callable
):
    """Forward subsequent signum events (e.g. interrupts) to respective signums."""
    current_signal, future_signals = signums[0], signums[1:]

    # avoid RecursionError when setting up a direct signal forward to the same signal for the main pid
    avoid_infinite_recursion = signum == current_signal and pid == os.getpid()
    if avoid_infinite_recursion:
        # store the vanilla handler so it can be temporarily restored below
        original_handler = signal.getsignal(current_signal)

    def handler(*args):
        print_fn(
            f"Received {getattr(signum, 'name', signum)}. "
            f"Sending {getattr(current_signal, 'name', current_signal)} to"
            f" {process_name} (PID {pid})..."
        )
        if avoid_infinite_recursion:
            signal.signal(current_signal, original_handler)
        os.kill(pid, current_signal)
        if future_signals:
            forward_signal_handler(
                pid,
                signum,
                *future_signals,
                process_name=process_name,
                print_fn=print_fn,
            )

    # register current and future signal handlers
    signal.signal(signum, handler)

open_process async

Like anyio.open_process but with: - Support for Windows command joining - Termination of the process on exception during yield - Forced cleanup of process resources during cancellation

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@asynccontextmanager
async def open_process(command: List[str], **kwargs):
    """
    Like `anyio.open_process` but with:
    - Support for Windows command joining
    - Termination of the process on exception during yield
    - Forced cleanup of process resources during cancellation
    """
    # Passing a string to open_process is equivalent to shell=True which is
    # generally necessary for Unix-like commands on Windows but otherwise should
    # be avoided
    if not isinstance(command, list):
        raise TypeError(
            "The command passed to open process must be a list. You passed the command"
            f"'{command}', which is type '{type(command)}'."
        )

    if sys.platform == "win32":
        command = " ".join(command)
        process = await _open_anyio_process(command, **kwargs)
    else:
        process = await anyio.open_process(command, **kwargs)

    # if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP,
    # use SetConsoleCtrlHandler to handle CTRL-C
    win32_process_group = False
    if (
        sys.platform == "win32"
        and "creationflags" in kwargs
        and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP
    ):
        win32_process_group = True
        _windows_process_group_pids.add(process.pid)
        # Add a handler for CTRL-C. Re-adding the handler is safe as Windows
        # will not add a duplicate handler if _win32_ctrl_handler is
        # already registered.
        windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1)

    try:
        async with process:
            yield process
    finally:
        try:
            process.terminate()
            if win32_process_group:
                _windows_process_group_pids.remove(process.pid)

        except OSError:
            # Occurs if the process is already terminated
            pass

        # Ensure the process resource is closed. If not shielded from cancellation,
        # this resource can be left open and the subprocess output can appear after
        # the parent process has exited.
        with anyio.CancelScope(shield=True):
            await process.aclose()

run_process async

Like anyio.run_process but with:

  • Use of our open_process utility to ensure resources are cleaned up
  • Simple stream_output support to connect the subprocess to the parent stdout/err
  • Support for submission with TaskGroup.start marking as 'started' after the process has been created. When used, the PID is returned to the task status.
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
async def run_process(
    command: List[str],
    stream_output: Union[bool, Tuple[Optional[TextSink], Optional[TextSink]]] = False,
    task_status: Optional[anyio.abc.TaskStatus] = None,
    task_status_handler: Optional[Callable[[anyio.abc.Process], Any]] = None,
    **kwargs,
):
    """
    Like `anyio.run_process` but with:

    - Use of our `open_process` utility to ensure resources are cleaned up
    - Simple `stream_output` support to connect the subprocess to the parent stdout/err
    - Support for submission with `TaskGroup.start` marking as 'started' after the
        process has been created. When used, the PID is returned to the task status.

    """
    if stream_output is True:
        stream_output = (sys.stdout, sys.stderr)

    async with open_process(
        command,
        stdout=subprocess.PIPE if stream_output else subprocess.DEVNULL,
        stderr=subprocess.PIPE if stream_output else subprocess.DEVNULL,
        **kwargs,
    ) as process:
        if task_status is not None:
            if not task_status_handler:

                def task_status_handler(process):
                    return process.pid

            task_status.started(task_status_handler(process))

        if stream_output:
            await consume_process_output(
                process, stdout_sink=stream_output[0], stderr_sink=stream_output[1]
            )

        await process.wait()

    return process

setup_signal_handlers_agent

Handle interrupts of the agent gracefully.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def setup_signal_handlers_agent(pid: int, process_name: str, print_fn: Callable):
    """Handle interrupts of the agent gracefully."""
    setup_handler = partial(
        forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
    )
    # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish
    # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete
    if sys.platform == "win32":
        # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
        # https://bugs.python.org/issue26350
        setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
    else:
        # forward first SIGINT directly, send SIGKILL on subsequent interrupt
        setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL)
        # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM
        setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL)

setup_signal_handlers_server

Handle interrupts of the server gracefully.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def setup_signal_handlers_server(pid: int, process_name: str, print_fn: Callable):
    """Handle interrupts of the server gracefully."""
    setup_handler = partial(
        forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
    )
    # when server receives a signal, it needs to be propagated to the uvicorn subprocess
    if sys.platform == "win32":
        # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
        # https://bugs.python.org/issue26350
        setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
    else:
        # first interrupt: SIGTERM, second interrupt: SIGKILL
        setup_handler(signal.SIGINT, signal.SIGTERM, signal.SIGKILL)
        # forward first SIGTERM directly, send SIGKILL on subsequent SIGTERM
        setup_handler(signal.SIGTERM, signal.SIGTERM, signal.SIGKILL)

setup_signal_handlers_worker

Handle interrupts of workers gracefully.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/processutils.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def setup_signal_handlers_worker(pid: int, process_name: str, print_fn: Callable):
    """Handle interrupts of workers gracefully."""
    setup_handler = partial(
        forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
    )
    # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish
    # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete
    if sys.platform == "win32":
        # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
        # https://bugs.python.org/issue26350
        setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
    else:
        # forward first SIGINT directly, send SIGKILL on subsequent interrupt
        setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL)
        # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM
        setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL)