Skip to content

prefect.utilities.processutils

forward_signal_handler

Forward subsequent signum events (e.g. interrupts) to respective signums.

Source code in prefect/utilities/processutils.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def forward_signal_handler(
    pid: int, signum: int, *signums: int, process_name: str, print_fn: Callable
):
    """Forward subsequent signum events (e.g. interrupts) to respective signums."""
    current_signal, future_signals = signums[0], signums[1:]

    def handler(*args):
        print_fn(
            f"Received {getattr(signum, 'name', signum)}. "
            f"Sending {getattr(current_signal, 'name', current_signal)} to"
            f" {process_name} (PID {pid})..."
        )
        os.kill(pid, current_signal)
        if future_signals:
            forward_signal_handler(
                pid,
                signum,
                *future_signals,
                process_name=process_name,
                print_fn=print_fn,
            )

    # register current and future signal handlers
    signal.signal(signum, handler)

open_process async

Like anyio.open_process but with: - Support for Windows command joining - Termination of the process on exception during yield - Forced cleanup of process resources during cancellation

Source code in prefect/utilities/processutils.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@asynccontextmanager
async def open_process(command: List[str], **kwargs):
    """
    Like `anyio.open_process` but with:
    - Support for Windows command joining
    - Termination of the process on exception during yield
    - Forced cleanup of process resources during cancellation
    """
    # Passing a string to open_process is equivalent to shell=True which is
    # generally necessary for Unix-like commands on Windows but otherwise should
    # be avoided
    if not isinstance(command, list):
        raise TypeError(
            "The command passed to open process must be a list. You passed the command"
            f"'{command}', which is type '{type(command)}'."
        )

    if sys.platform == "win32":
        command = " ".join(command)
        process = await _open_anyio_process(command, **kwargs)
    else:
        process = await anyio.open_process(command, **kwargs)

    # if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP,
    # use SetConsoleCtrlHandler to handle CTRL-C
    win32_process_group = False
    if (
        sys.platform == "win32"
        and "creationflags" in kwargs
        and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP
    ):
        win32_process_group = True
        _windows_process_group_pids.add(process.pid)
        # Add a handler for CTRL-C. Re-adding the handler is safe as Windows
        # will not add a duplicate handler if _win32_ctrl_handler is
        # already registered.
        windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1)

    try:
        async with process:
            yield process
    finally:
        try:
            process.terminate()
            if win32_process_group:
                _windows_process_group_pids.remove(process.pid)

        except OSError:
            # Occurs if the process is already terminated
            pass

        # Ensure the process resource is closed. If not shielded from cancellation,
        # this resource can be left open and the subprocess output can appear after
        # the parent process has exited.
        with anyio.CancelScope(shield=True):
            await process.aclose()

run_process async

Like anyio.run_process but with:

  • Use of our open_process utility to ensure resources are cleaned up
  • Simple stream_output support to connect the subprocess to the parent stdout/err
  • Support for submission with TaskGroup.start marking as 'started' after the process has been created. When used, the PID is returned to the task status.
Source code in prefect/utilities/processutils.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def run_process(
    command: List[str],
    stream_output: Union[bool, Tuple[Optional[TextSink], Optional[TextSink]]] = False,
    task_status: Optional[anyio.abc.TaskStatus] = None,
    task_status_handler: Optional[Callable[[anyio.abc.Process], Any]] = None,
    **kwargs,
):
    """
    Like `anyio.run_process` but with:

    - Use of our `open_process` utility to ensure resources are cleaned up
    - Simple `stream_output` support to connect the subprocess to the parent stdout/err
    - Support for submission with `TaskGroup.start` marking as 'started' after the
        process has been created. When used, the PID is returned to the task status.

    """
    if stream_output is True:
        stream_output = (sys.stdout, sys.stderr)

    async with open_process(
        command,
        stdout=subprocess.PIPE if stream_output else subprocess.DEVNULL,
        stderr=subprocess.PIPE if stream_output else subprocess.DEVNULL,
        **kwargs,
    ) as process:
        if task_status is not None:
            if not task_status_handler:
                task_status_handler = lambda process: process.pid

            task_status.started(task_status_handler(process))

        if stream_output:
            await consume_process_output(
                process, stdout_sink=stream_output[0], stderr_sink=stream_output[1]
            )

        await process.wait()

    return process

setup_signal_handlers_server

Handle interrupts of the server gracefully.

Source code in prefect/utilities/processutils.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def setup_signal_handlers_server(pid: int, process_name: str, print_fn: Callable):
    """Handle interrupts of the server gracefully."""
    setup_handler = partial(
        forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
    )
    # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
    # https://bugs.python.org/issue26350
    if sys.platform == "win32":
        setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
    else:
        # first interrupt: SIGTERM, second interrupt: SIGKILL
        setup_handler(signal.SIGINT, signal.SIGTERM, signal.SIGKILL)
        # forward first SIGTERM directly, send SIGKILL on subsequent SIGTERM
        setup_handler(signal.SIGTERM, signal.SIGTERM, signal.SIGKILL)