Files
sunnypilot/system/manager/process_config.py
Shane Smiskol 5117a8c3a6 ui: test raylib ui (#35949)
* add raylib ui

* test

* this is better for now

* rm

rm

* finalize it

* need this?

* ?

* shite

shite

* try

* ?

* huh

* simp

* ?

* wtf is going on

* ???????????????

* lock

* stash

* no 2 packages

* Revert "stash"

This reverts commit 9efb0d9bda6a6309e7a567634d1921bf1cd0fb59.

* debug

* noo

* debug

* ?

* and

* yeah yeah

* init one

* 2

* i wonder

* oooh

* make sure

* fix dat

* try this

* see if wifiman

* forgot

* ?

* ???

* fuck this we can rewrite it later
2025-08-08 00:34:53 -07:00

120 lines
5.9 KiB
Python

import os
import operator
import platform
from cereal import car
from openpilot.common.params import Params
from openpilot.system.hardware import PC, TICI
from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess
WEBCAM = os.getenv("USE_WEBCAM") is not None
def driverview(started: bool, params: Params, CP: car.CarParams) -> bool:
return started or params.get_bool("IsDriverViewEnabled")
def notcar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and CP.notCar
def iscar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not CP.notCar
def logging(started: bool, params: Params, CP: car.CarParams) -> bool:
run = (not CP.notCar) or not params.get_bool("DisableLogging")
return started and run
def ublox_available() -> bool:
return os.path.exists('/dev/ttyHS0') and not os.path.exists('/persist/comma/use-quectel-gps')
def ublox(started: bool, params: Params, CP: car.CarParams) -> bool:
use_ublox = ublox_available()
if use_ublox != params.get_bool("UbloxAvailable"):
params.put_bool("UbloxAvailable", use_ublox)
return started and use_ublox
def joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("JoystickDebugMode")
def not_joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("JoystickDebugMode")
def long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("LongitudinalManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
def qcomgps(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not ublox_available()
def always_run(started: bool, params: Params, CP: car.CarParams) -> bool:
return True
def only_onroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return started
def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return not started
def or_(*fns):
return lambda *args: operator.or_(*(fn(*args) for fn in fns))
def and_(*fns):
return lambda *args: operator.and_(*(fn(*args) for fn in fns))
procs = [
DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"),
NativeProcess("loggerd", "system/loggerd", ["./loggerd"], logging),
NativeProcess("encoderd", "system/loggerd", ["./encoderd"], only_onroad),
NativeProcess("stream_encoderd", "system/loggerd", ["./encoderd", "--stream"], notcar),
PythonProcess("logmessaged", "system.logmessaged", always_run),
NativeProcess("camerad", "system/camerad", ["./camerad"], driverview, enabled=not WEBCAM),
PythonProcess("webcamerad", "tools.webcam.camerad", driverview, enabled=WEBCAM),
NativeProcess("logcatd", "system/logcatd", ["./logcatd"], only_onroad, platform.system() != "Darwin"),
NativeProcess("proclogd", "system/proclogd", ["./proclogd"], only_onroad, platform.system() != "Darwin"),
PythonProcess("micd", "system.micd", iscar),
PythonProcess("timed", "system.timed", always_run, enabled=not PC),
PythonProcess("modeld", "selfdrive.modeld.modeld", only_onroad),
PythonProcess("dmonitoringmodeld", "selfdrive.modeld.dmonitoringmodeld", driverview, enabled=(WEBCAM or not PC)),
PythonProcess("sensord", "system.sensord.sensord", only_onroad, enabled=not PC),
NativeProcess("ui", "selfdrive/ui", ["./ui"], always_run, watchdog_max_dt=(5 if not PC else None)),
PythonProcess("raylib_ui", "selfdrive.ui.ui", always_run, enabled=False, watchdog_max_dt=(5 if not PC else None)),
PythonProcess("soundd", "selfdrive.ui.soundd", only_onroad),
PythonProcess("locationd", "selfdrive.locationd.locationd", only_onroad),
NativeProcess("_pandad", "selfdrive/pandad", ["./pandad"], always_run, enabled=False),
PythonProcess("calibrationd", "selfdrive.locationd.calibrationd", only_onroad),
PythonProcess("torqued", "selfdrive.locationd.torqued", only_onroad),
PythonProcess("controlsd", "selfdrive.controls.controlsd", and_(not_joystick, iscar)),
PythonProcess("joystickd", "tools.joystick.joystickd", or_(joystick, notcar)),
PythonProcess("selfdrived", "selfdrive.selfdrived.selfdrived", only_onroad),
PythonProcess("card", "selfdrive.car.card", only_onroad),
PythonProcess("deleter", "system.loggerd.deleter", always_run),
PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(WEBCAM or not PC)),
PythonProcess("qcomgpsd", "system.qcomgpsd.qcomgpsd", qcomgps, enabled=TICI),
PythonProcess("pandad", "selfdrive.pandad.pandad", always_run),
PythonProcess("paramsd", "selfdrive.locationd.paramsd", only_onroad),
PythonProcess("lagd", "selfdrive.locationd.lagd", only_onroad),
NativeProcess("ubloxd", "system/ubloxd", ["./ubloxd"], ublox, enabled=TICI),
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),
PythonProcess("updated", "system.updated.updated", only_offroad, enabled=not PC),
PythonProcess("uploader", "system.loggerd.uploader", always_run),
PythonProcess("statsd", "system.statsd", always_run),
PythonProcess("feedbackd", "selfdrive.ui.feedback.feedbackd", only_onroad),
# debug procs
NativeProcess("bridge", "cereal/messaging", ["./bridge"], notcar),
PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar),
PythonProcess("webjoystick", "tools.bodyteleop.web", notcar),
PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)),
]
managed_processes = {p.name: p for p in procs}