import os import operator import platform from cereal import car, custom from openpilot.common.params import Params from openpilot.system.hardware import PC, TICI from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess from openpilot.system.hardware.hw import Paths from openpilot.sunnypilot.mapd.mapd_manager import MAPD_PATH from sunnypilot.models.helpers import get_active_model_runner from sunnypilot.sunnylink.utils import sunnylink_need_register, sunnylink_ready, use_sunnylink_uploader WEBCAM = os.getenv("USE_WEBCAM") is not None def driverview(started: bool, params: Params, CP: car.CarParams) -> bool: return started or params.get_bool("IsDriverViewEnabled") def notcar(started: bool, params: Params, CP: car.CarParams) -> bool: return started and CP.notCar def iscar(started: bool, params: Params, CP: car.CarParams) -> bool: return started and not CP.notCar def logging(started: bool, params: Params, CP: car.CarParams) -> bool: run = (not CP.notCar) or not params.get_bool("DisableLogging") return started and run def ublox_available() -> bool: return os.path.exists('/dev/ttyHS0') and not os.path.exists('/persist/comma/use-quectel-gps') def ublox(started: bool, params: Params, CP: car.CarParams) -> bool: use_ublox = ublox_available() if use_ublox != params.get_bool("UbloxAvailable"): params.put_bool("UbloxAvailable", use_ublox) return started and use_ublox def joystick(started: bool, params: Params, CP: car.CarParams) -> bool: return started and params.get_bool("JoystickDebugMode") def not_joystick(started: bool, params: Params, CP: car.CarParams) -> bool: return started and not params.get_bool("JoystickDebugMode") def long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool: return started and params.get_bool("LongitudinalManeuverMode") def not_long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool: return started and not params.get_bool("LongitudinalManeuverMode") def qcomgps(started: bool, params: Params, CP: car.CarParams) -> bool: return started and not ublox_available() def always_run(started: bool, params: Params, CP: car.CarParams) -> bool: return True def only_onroad(started: bool, params: Params, CP: car.CarParams) -> bool: return started def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool: return not started def use_github_runner(started, params, CP: car.CarParams) -> bool: return not PC and params.get_bool("EnableGithubRunner") and not params.get_bool("NetworkMetered") def sunnylink_ready_shim(started, params, CP: car.CarParams) -> bool: """Shim for sunnylink_ready to match the process manager signature.""" return sunnylink_ready(params) def sunnylink_need_register_shim(started, params, CP: car.CarParams) -> bool: """Shim for sunnylink_need_register to match the process manager signature.""" return sunnylink_need_register(params) def use_sunnylink_uploader_shim(started, params, CP: car.CarParams) -> bool: """Shim for use_sunnylink_uploader to match the process manager signature.""" return use_sunnylink_uploader(params) def is_snpe_model(started, params, CP: car.CarParams) -> bool: """Check if the active model runner is SNPE.""" return bool(get_active_model_runner(params, not started) == custom.ModelManagerSP.Runner.snpe) def is_tinygrad_model(started, params, CP: car.CarParams) -> bool: """Check if the active model runner is SNPE.""" return bool(get_active_model_runner(params, not started) == custom.ModelManagerSP.Runner.tinygrad) def is_stock_model(started, params, CP: car.CarParams) -> bool: """Check if the active model runner is stock.""" return bool(get_active_model_runner(params, not started) == custom.ModelManagerSP.Runner.stock) def or_(*fns): return lambda *args: operator.or_(*(fn(*args) for fn in fns)) def and_(*fns): return lambda *args: operator.and_(*(fn(*args) for fn in fns)) procs = [ DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"), NativeProcess("loggerd", "system/loggerd", ["./loggerd"], logging), NativeProcess("encoderd", "system/loggerd", ["./encoderd"], only_onroad), NativeProcess("stream_encoderd", "system/loggerd", ["./encoderd", "--stream"], notcar), PythonProcess("logmessaged", "system.logmessaged", always_run), NativeProcess("camerad", "system/camerad", ["./camerad"], driverview, enabled=not WEBCAM), PythonProcess("webcamerad", "tools.webcam.camerad", driverview, enabled=WEBCAM), NativeProcess("logcatd", "system/logcatd", ["./logcatd"], only_onroad, platform.system() != "Darwin"), NativeProcess("proclogd", "system/proclogd", ["./proclogd"], only_onroad, platform.system() != "Darwin"), PythonProcess("micd", "system.micd", iscar), PythonProcess("timed", "system.timed", always_run, enabled=not PC), PythonProcess("modeld", "selfdrive.modeld.modeld", and_(only_onroad, is_stock_model)), PythonProcess("dmonitoringmodeld", "selfdrive.modeld.dmonitoringmodeld", driverview, enabled=(WEBCAM or not PC)), PythonProcess("sensord", "system.sensord.sensord", only_onroad, enabled=not PC), NativeProcess("ui", "selfdrive/ui", ["./ui"], always_run, watchdog_max_dt=(5 if not PC else None)), PythonProcess("soundd", "selfdrive.ui.soundd", only_onroad), PythonProcess("locationd", "selfdrive.locationd.locationd", only_onroad), NativeProcess("_pandad", "selfdrive/pandad", ["./pandad"], always_run, enabled=False), PythonProcess("calibrationd", "selfdrive.locationd.calibrationd", only_onroad), PythonProcess("torqued", "selfdrive.locationd.torqued", only_onroad), PythonProcess("controlsd", "selfdrive.controls.controlsd", and_(not_joystick, iscar)), PythonProcess("joystickd", "tools.joystick.joystickd", or_(joystick, notcar)), PythonProcess("selfdrived", "selfdrive.selfdrived.selfdrived", only_onroad), PythonProcess("card", "selfdrive.car.card", only_onroad), PythonProcess("deleter", "system.loggerd.deleter", always_run), PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(WEBCAM or not PC)), PythonProcess("qcomgpsd", "system.qcomgpsd.qcomgpsd", qcomgps, enabled=TICI), PythonProcess("pandad", "selfdrive.pandad.pandad", always_run), PythonProcess("paramsd", "selfdrive.locationd.paramsd", only_onroad), PythonProcess("lagd", "selfdrive.locationd.lagd", only_onroad), NativeProcess("ubloxd", "system/ubloxd", ["./ubloxd"], ublox, enabled=TICI), PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI), PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver), PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver), PythonProcess("radard", "selfdrive.controls.radard", only_onroad), PythonProcess("hardwared", "system.hardware.hardwared", always_run), PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC), PythonProcess("updated", "system.updated.updated", only_offroad, enabled=not PC), PythonProcess("uploader", "system.loggerd.uploader", always_run), PythonProcess("statsd", "system.statsd", always_run), # debug procs NativeProcess("bridge", "cereal/messaging", ["./bridge"], notcar), PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar), PythonProcess("webjoystick", "tools.bodyteleop.web", notcar), PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)), # sunnylink <3 DaemonProcess("manage_sunnylinkd", "sunnypilot.sunnylink.athena.manage_sunnylinkd", "SunnylinkdPid"), PythonProcess("sunnylink_registration_manager", "sunnypilot.sunnylink.registration_manager", sunnylink_need_register_shim), ] # sunnypilot procs += [ # Models PythonProcess("models_manager", "sunnypilot.models.manager", only_offroad), NativeProcess("modeld_snpe", "sunnypilot/modeld", ["./modeld"], and_(only_onroad, is_snpe_model)), NativeProcess("modeld_tinygrad", "sunnypilot/modeld_v2", ["./modeld"], and_(only_onroad, is_tinygrad_model)), # Backup PythonProcess("backup_manager", "sunnypilot.sunnylink.backups.manager", and_(only_offroad, sunnylink_ready_shim)), # mapd NativeProcess("mapd", Paths.mapd_root(), [MAPD_PATH], always_run), PythonProcess("mapd_manager", "sunnypilot.mapd.mapd_manager", always_run), ] if os.path.exists("./github_runner.sh"): procs += [NativeProcess("github_runner_start", "system/manager", ["./github_runner.sh", "start"], and_(only_offroad, use_github_runner), sigkill=False)] if os.path.exists("../sunnypilot/sunnylink/uploader.py"): procs += [PythonProcess("sunnylink_uploader", "sunnypilot.sunnylink.uploader", use_sunnylink_uploader_shim)] managed_processes = {p.name: p for p in procs}