Source code for otx.utils.signal

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Functions to append a signal handler."""

from __future__ import annotations

import os
import signal
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
    from types import FrameType


@dataclass
class SigHandler:
    """Signal handler dataclass having handler function and pid which registers the handler."""

    handler: Callable
    pid: int


_SIGNAL_HANDLERS: dict[int, list] = {}


[docs] def append_signal_handler(sig_num: int, sig_handler: Callable) -> None: """Append the handler for a signal. The function appended at last is called first. Args: sig_num (signal.Signals): Signal number to add a handler to. sig_handler (Callable): Callable function to be executed when the signal is sent. """ _register_signal_handler(sig_num, sig_handler, -1)
[docs] def append_main_proc_signal_handler(sig_num: int, sig_handler: Callable) -> None: """Append the handler for a signal triggered only by main process. The function appended at last is called first. It's almost same as append_signal_handler except that handler will be executed only by signal to process which registers handler. Args: sig_num (signal.Signals): Signal number to add a handler to. sig_handler (Callable): Callable function to be executed when the signal is sent. """ _register_signal_handler(sig_num, sig_handler, os.getpid())
def _register_signal_handler(sig_num: int, sig_handler: Callable, pid: int) -> None: if sig_num not in _SIGNAL_HANDLERS: old_sig_handler = signal.getsignal(sig_num) _SIGNAL_HANDLERS[sig_num] = [old_sig_handler] signal.signal(sig_num, _run_signal_handlers) _SIGNAL_HANDLERS[sig_num].insert(0, SigHandler(sig_handler, pid)) def _run_signal_handlers(sig_num: int, frame: FrameType | None) -> None: pid = os.getpid() for handler in _SIGNAL_HANDLERS[sig_num]: if handler == signal.SIG_DFL: signal.signal(sig_num, signal.SIG_DFL) signal.raise_signal(sig_num) elif isinstance(handler, SigHandler): if handler.pid < 0 or handler.pid == pid: handler.handler(sig_num, frame) else: handler(sig_num, frame)