import importlib import os import signal import struct import time import subprocess from collections.abc import Callable, ValuesView from abc import ABC, abstractmethod from multiprocessing import Process from setproctitle import setproctitle from cereal import car, log import cereal.messaging as messaging import openpilot.system.sentry as sentry from openpilot.common.basedir import BASEDIR from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog WATCHDOG_FN = "/dev/shm/wd_" ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None def launcher(proc: str, name: str) -> None: try: # import the process mod = importlib.import_module(proc) # rename the process setproctitle(proc) # create new context since we forked messaging.reset_context() # add daemon name tag to logs cloudlog.bind(daemon=name) sentry.set_tag("daemon", name) # exec the process mod.main() except KeyboardInterrupt: cloudlog.warning(f"child {proc} got SIGINT") except Exception: # can't install the crash handler because sys.excepthook doesn't play nice # with threads, so catch it here. sentry.capture_exception() raise def nativelauncher(pargs: list[str], cwd: str, name: str) -> None: os.environ['MANAGER_DAEMON'] = name # exec the process os.chdir(cwd) os.execvp(pargs[0], pargs) def join_process(process: Process, timeout: float) -> None: # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 # We have to poll the exitcode instead t = time.monotonic() while time.monotonic() - t < timeout and process.exitcode is None: time.sleep(0.001) class ManagerProcess(ABC): daemon = False sigkill = False should_run: Callable[[bool, Params, car.CarParams], bool] proc: Process | None = None enabled = True name = "" last_watchdog_time = 0 watchdog_max_dt: int | None = None watchdog_seen = False shutting_down = False @abstractmethod def prepare(self) -> None: pass @abstractmethod def start(self) -> None: pass def restart(self) -> None: self.stop(sig=signal.SIGKILL) self.start() def check_watchdog(self, started: bool) -> None: if self.watchdog_max_dt is None or self.proc is None: return try: fn = WATCHDOG_FN + str(self.proc.pid) with open(fn, "rb") as f: # TODO: why can't pylint find struct.unpack? self.last_watchdog_time = struct.unpack('Q', f.read())[0] except Exception: pass dt = time.monotonic() - self.last_watchdog_time / 1e9 if dt > self.watchdog_max_dt: if self.watchdog_seen and ENABLE_WATCHDOG: cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})") self.restart() else: self.watchdog_seen = True def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None: if self.proc is None: return None if self.proc.exitcode is None: if not self.shutting_down: cloudlog.info(f"killing {self.name}") if sig is None: sig = signal.SIGKILL if self.sigkill else signal.SIGINT self.signal(sig) self.shutting_down = True if not block: return None join_process(self.proc, 5) # If process failed to die send SIGKILL if self.proc.exitcode is None and retry: cloudlog.info(f"killing {self.name} with SIGKILL") self.signal(signal.SIGKILL) self.proc.join() ret = self.proc.exitcode cloudlog.info(f"{self.name} is dead with {ret}") if self.proc.exitcode is not None: self.shutting_down = False self.proc = None return ret def signal(self, sig: int) -> None: if self.proc is None: return # Don't signal if already exited if self.proc.exitcode is not None and self.proc.pid is not None: return # Can't signal if we don't have a pid if self.proc.pid is None: return cloudlog.info(f"sending signal {sig} to {self.name}") os.kill(self.proc.pid, sig) def get_process_state_msg(self): state = log.ManagerState.ProcessState.new_message() state.name = self.name if self.proc: state.running = self.proc.is_alive() state.shouldBeRunning = self.proc is not None and not self.shutting_down state.pid = self.proc.pid or 0 state.exitCode = self.proc.exitcode or 0 return state class NativeProcess(ManagerProcess): def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): self.name = name self.cwd = cwd self.cmdline = cmdline self.should_run = should_run self.enabled = enabled self.sigkill = sigkill self.watchdog_max_dt = watchdog_max_dt self.launcher = nativelauncher def prepare(self) -> None: pass def start(self) -> None: # In case we only tried a non blocking stop we need to stop it before restarting if self.shutting_down: self.stop() if self.proc is not None: return cwd = os.path.join(BASEDIR, self.cwd) cloudlog.info(f"starting process {self.name}") self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name)) self.proc.start() self.watchdog_seen = False self.shutting_down = False class PythonProcess(ManagerProcess): def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None): self.name = name self.module = module self.should_run = should_run self.enabled = enabled self.sigkill = sigkill self.watchdog_max_dt = watchdog_max_dt self.launcher = launcher def prepare(self) -> None: if self.enabled: cloudlog.info(f"preimporting {self.module}") importlib.import_module(self.module) def start(self) -> None: # In case we only tried a non blocking stop we need to stop it before restarting if self.shutting_down: self.stop() if self.proc is not None: return cloudlog.info(f"starting python {self.module}") self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name)) self.proc.start() self.watchdog_seen = False self.shutting_down = False class DaemonProcess(ManagerProcess): """Python process that has to stay running across manager restart. This is used for athena so you don't lose SSH access when restarting manager.""" def __init__(self, name, module, param_name, enabled=True): self.name = name self.module = module self.param_name = param_name self.enabled = enabled self.params = None @staticmethod def should_run(started, params, CP): return True def prepare(self) -> None: pass def start(self) -> None: if self.params is None: self.params = Params() pid = self.params.get(self.param_name, encoding='utf-8') if pid is not None: try: os.kill(int(pid), 0) with open(f'/proc/{pid}/cmdline') as f: if self.module in f.read(): # daemon is running return except (OSError, FileNotFoundError): # process is dead pass cloudlog.info(f"starting daemon {self.name}") proc = subprocess.Popen(['python', '-m', self.module], stdin=open('/dev/null'), stdout=open('/dev/null', 'w'), stderr=open('/dev/null', 'w'), preexec_fn=os.setpgrp) self.params.put(self.param_name, str(proc.pid)) def stop(self, retry=True, block=True, sig=None) -> None: pass def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None, not_run: list[str] | None=None) -> list[ManagerProcess]: if not_run is None: not_run = [] running = [] for p in procs: if p.enabled and p.name not in not_run and p.should_run(started, params, CP): running.append(p) else: p.stop(block=False) p.check_watchdog(started) for p in running: p.start() return running