Files
carrotpilot/system/manager/process_config.py
carrot 0423d12c9c Nuggets In Dijon model, C3xLite, livePose (#221)
* Update carrot_functions.py

* fix.. atc..

* TR16 model.

* fix.. atc

* Adjust interpolation value for t_follow calculation

* fix safeMode..

* fix safeMode2

* fix.. v_cruise

* model_turn_speed..

* fix..

* fix..

* fix.. cruise.py

* fix.. modelTurn..

* fix stopped car safe mode.

* model turn 120%

* remove model turn speed..

* paramsd...

* Revert "remove model turn speed.."

This reverts commit 564e9dd609.

* model_turn_speed...  120 -> 115%

* starting achange cost 30 -> 10

* fix..

* aChangeCostStarting

* fix..

* gwm v7

* Adjust traffic stop distance parameter

* Update carrot_functions.py

* update gwm 250929

* trafficStopDistance adjust

* localizer_roll_std

* scc13

* fix...

* fix.. scc13

* scc14

* bypass scc13

* fix scc13

* TheCoolPeople's Model

* North Nevada Model

* Revert "model_turn_speed...  120 -> 115%"

This reverts commit e842a7e99f.

* Reapply "remove model turn speed.."

This reverts commit 544ac16811.

* for c3x lite (#218)

add hardware c3x lite

* NNV(North Nevada) v2

* fix..

* Nuggets In Dijon Model

* toyota accel pid long

* for c3xlite fix (#219)

* LatSmoothSec

* Revert "Reapply "remove model turn speed..""

This reverts commit 2c10aae495.

* apply livePose

* releases 251017

---------

Co-authored-by: 「 crwusiz 」 <43285072+crwusiz@users.noreply.github.com>
2025-10-17 12:34:13 +09:00

148 lines
7.2 KiB
Python

import os
import operator
import importlib.util
from cereal import car
from openpilot.common.params import Params
from openpilot.system.hardware import PC, TICI
from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess
FLASK_AVAILABLE = importlib.util.find_spec("flask") is not None
WEBCAM = os.getenv("USE_WEBCAM") is not None
def driverview(started: bool, params: Params, CP: car.CarParams) -> bool:
return started or params.get_bool("IsDriverViewEnabled")
def notcar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and CP.notCar
def iscar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not CP.notCar
def logging(started: bool, params: Params, CP: car.CarParams) -> bool:
run = (not CP.notCar) or not params.get_bool("DisableLogging")
return started and run
def ublox_available() -> bool:
return os.path.exists('/dev/ttyHS0') and not os.path.exists('/persist/comma/use-quectel-gps')
def ublox(started: bool, params: Params, CP: car.CarParams) -> bool:
use_ublox = ublox_available()
if use_ublox != params.get_bool("UbloxAvailable"):
params.put_bool("UbloxAvailable", use_ublox)
return started and use_ublox
def joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("JoystickDebugMode")
def not_joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("JoystickDebugMode")
def long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("LongitudinalManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
def qcomgps(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not ublox_available()
def always_run(started: bool, params: Params, CP: car.CarParams) -> bool:
return True
def only_onroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return started
def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return not started
def enable_updated(started: bool, params: Params, CP: car.CarParams) -> bool:
return not started and params.get_bool("SoftwareMenu")
def check_fleet(started, params, CP: car.CarParams) -> bool:
return FLASK_AVAILABLE
def or_(*fns):
return lambda *args: operator.or_(*(fn(*args) for fn in fns))
def and_(*fns):
return lambda *args: operator.and_(*(fn(*args) for fn in fns))
def enable_dm(started, params, CP: car.CarParams) -> bool:
return (started or params.get_bool("IsDriverViewEnabled")) and params.get_int("DisableDM") == 0
def enable_connect(started, params, CP: car.CarParams) -> bool:
return params.get_int("EnableConnect") > 0
def c3x_lite(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("HardwareC3xLite")
procs = [
DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"),
NativeProcess("loggerd", "system/loggerd", ["./loggerd"], logging),
NativeProcess("encoderd", "system/loggerd", ["./encoderd"], only_onroad),
NativeProcess("stream_encoderd", "system/loggerd", ["./encoderd", "--stream"], notcar),
PythonProcess("logmessaged", "system.logmessaged", always_run),
NativeProcess("camerad", "system/camerad", ["./camerad"], driverview, enabled=not WEBCAM),
PythonProcess("webcamerad", "tools.webcam.camerad", driverview, enabled=WEBCAM),
NativeProcess("logcatd", "system/logcatd", ["./logcatd"], only_onroad),
NativeProcess("proclogd", "system/proclogd", ["./proclogd"], only_onroad),
PythonProcess("micd", "system.micd", iscar),
PythonProcess("timed", "system.timed", always_run, enabled=not PC),
# TODO: Make python process once TG allows opening QCOM from child pro
# https://github.com/tinygrad/tinygrad/blob/ac9c96dae1656dc220ee4acc39cef4dd449aa850/tinygrad/device.py#L26
NativeProcess("modeld", "selfdrive/modeld", ["./modeld.py"], only_onroad),
NativeProcess("dmonitoringmodeld", "selfdrive/modeld", ["./dmonitoringmodeld.py"], enable_dm, enabled=(WEBCAM or not PC)),
#NativeProcess("mapsd", "selfdrive/navd", ["./mapsd"], only_onroad),
#NativeProcess("mapsd", "selfdrive/navd", ["./mapsd"], always_run),
#PythonProcess("navmodeld", "selfdrive.modeld.navmodeld", only_onroad),
NativeProcess("sensord", "system/sensord", ["./sensord"], only_onroad, enabled=not PC),
NativeProcess("ui", "selfdrive/ui", ["./ui"], always_run, watchdog_max_dt=(5 if not PC else None)),
PythonProcess("soundd", "selfdrive.ui.soundd", only_onroad),
NativeProcess("locationd2", "selfdrive/locationd", ["./locationd"], only_onroad),
PythonProcess("locationd", "selfdrive.locationd.locationd", only_onroad),
NativeProcess("_pandad", "selfdrive/pandad", ["./pandad"], always_run, enabled=False),
PythonProcess("calibrationd", "selfdrive.locationd.calibrationd", only_onroad),
PythonProcess("torqued", "selfdrive.locationd.torqued", only_onroad),
PythonProcess("controlsd", "selfdrive.controls.controlsd", and_(not_joystick, iscar)),
PythonProcess("joystickd", "tools.joystick.joystickd", or_(joystick, notcar)),
PythonProcess("selfdrived", "selfdrive.selfdrived.selfdrived", only_onroad),
PythonProcess("card", "selfdrive.car.card", only_onroad),
PythonProcess("deleter", "system.loggerd.deleter", always_run),
PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", enable_dm, enabled=(WEBCAM or not PC)),
PythonProcess("qcomgpsd", "system.qcomgpsd.qcomgpsd", qcomgps, enabled=TICI),
PythonProcess("navd", "selfdrive.navd.navd", only_onroad),
PythonProcess("pandad", "selfdrive.pandad.pandad", always_run),
PythonProcess("paramsd", "selfdrive.locationd.paramsd", only_onroad),
PythonProcess("lagd", "selfdrive.locationd.lagd", only_onroad),
NativeProcess("ubloxd", "system/ubloxd", ["./ubloxd"], ublox, enabled=TICI),
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),
PythonProcess("updated", "system.updated.updated", enable_updated, enabled=not PC),
PythonProcess("uploader", "system.loggerd.uploader", enable_connect),
PythonProcess("statsd", "system.statsd", always_run),
# debug procs
NativeProcess("bridge", "cereal/messaging", ["./bridge"], notcar),
PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar),
PythonProcess("webjoystick", "tools.bodyteleop.web", notcar),
PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)),
#PythonProcess("fleet_manager", "selfdrive.frogpilot.fleetmanager.fleet_manager", check_fleet, enabled=not PC),
PythonProcess("fleet_manager", "selfdrive.frogpilot.fleetmanager.fleet_manager", check_fleet),
PythonProcess("carrot_man", "selfdrive.carrot.carrot_man", always_run),#, enabled=not PC),
# c3x lite
PythonProcess("beep", "selfdrive.controls.beep", c3x_lite, enabled=TICI),
]
managed_processes = {p.name: p for p in procs}