Events: Migrate sunnypilot onroad events to its own cereal (#603)

* Events: Migrate sunnypilot onroad events to its own cereal

* more

* slightly more

* typing

* fix more

* fix mads state machine tests

* readjust order

* fix event

* abstract

* need these

* move around

* let's make sure it cleared on every loop

* Update selfdrive/selfdrived/alertmanager.py

Co-authored-by: DevTekVE <devtekve@gmail.com>

* use upstream custom struct

---------

Co-authored-by: DevTekVE <devtekve@gmail.com>
This commit is contained in:
Jason Wen
2025-01-20 22:18:19 -05:00
committed by GitHub
parent 4730a192b1
commit d08fd25784
15 changed files with 538 additions and 453 deletions

View File

@@ -99,7 +99,38 @@ struct LongitudinalPlanSP @0xf35cc4560bbf6ec2 {
}
}
struct CustomReserved3 @0xda96579883444c35 {
struct OnroadEventSP @0xda96579883444c35 {
name @0 :EventName;
# event types
enable @1 :Bool;
noEntry @2 :Bool;
warning @3 :Bool; # alerts presented only when enabled or soft disabling
userDisable @4 :Bool;
softDisable @5 :Bool;
immediateDisable @6 :Bool;
preEnable @7 :Bool;
permanent @8 :Bool; # alerts presented regardless of openpilot state
overrideLateral @10 :Bool;
overrideLongitudinal @9 :Bool;
enum EventName {
lkasEnable @0;
lkasDisable @1;
manualSteeringRequired @2;
manualLongitudinalRequired @3;
silentLkasEnable @4;
silentLkasDisable @5;
silentBrakeHold @6;
silentWrongGear @7;
silentReverseGear @8;
silentDoorOpen @9;
silentSeatbeltNotLatched @10;
silentParkBrake @11;
controlsMismatchLateral @12;
hyundaiRadarTracksConfirmed @13;
experimentalModeSwitched @14;
}
}
struct CustomReserved4 @0x80ae746ee2596b11 {

View File

@@ -125,81 +125,6 @@ struct OnroadEvent @0xc4fa6047f024e718 {
espActive @90;
personalityChanged @91;
aeb @92;
eventReserved93 @93;
eventReserved94 @94;
eventReserved95 @95;
eventReserved96 @96;
eventReserved97 @97;
eventReserved98 @98;
eventReserved99 @99;
eventReserved100 @100;
eventReserved101 @101;
eventReserved102 @102;
eventReserved103 @103;
eventReserved104 @104;
eventReserved105 @105;
eventReserved106 @106;
eventReserved107 @107;
eventReserved108 @108;
eventReserved109 @109;
eventReserved110 @110;
eventReserved111 @111;
eventReserved112 @112;
eventReserved113 @113;
eventReserved114 @114;
eventReserved115 @115;
eventReserved116 @116;
eventReserved117 @117;
eventReserved118 @118;
eventReserved119 @119;
eventReserved120 @120;
eventReserved121 @121;
eventReserved122 @122;
eventReserved123 @123;
eventReserved124 @124;
eventReserved125 @125;
eventReserved126 @126;
eventReserved127 @127;
eventReserved128 @128;
eventReserved129 @129;
eventReserved130 @130;
eventReserved131 @131;
eventReserved132 @132;
eventReserved133 @133;
eventReserved134 @134;
eventReserved135 @135;
eventReserved136 @136;
eventReserved137 @137;
eventReserved138 @138;
eventReserved139 @139;
eventReserved140 @140;
eventReserved141 @141;
eventReserved142 @142;
eventReserved143 @143;
eventReserved144 @144;
eventReserved145 @145;
eventReserved146 @146;
eventReserved147 @147;
eventReserved148 @148;
eventReserved149 @149;
eventReserved150 @150;
# sunnypilot
lkasEnable @151;
lkasDisable @152;
manualSteeringRequired @153;
manualLongitudinalRequired @154;
silentLkasEnable @155;
silentLkasDisable @156;
silentBrakeHold @157;
silentWrongGear @158;
silentReverseGear @159;
silentDoorOpen @160;
silentSeatbeltNotLatched @161;
silentParkBrake @162;
controlsMismatchLateral @163;
hyundaiRadarTracksConfirmed @164;
experimentalModeSwitched @165;
soundsUnavailableDEPRECATED @47;
}
@@ -2643,7 +2568,7 @@ struct Event {
selfdriveStateSP @107 :Custom.SelfdriveStateSP;
modelManagerSP @108 :Custom.ModelManagerSP;
longitudinalPlanSP @109 :Custom.LongitudinalPlanSP;
customReserved3 @110 :Custom.CustomReserved3;
onroadEventsSP @110 :List(Custom.OnroadEventSP);
customReserved4 @111 :Custom.CustomReserved4;
customReserved5 @112 :Custom.CustomReserved5;
customReserved6 @113 :Custom.CustomReserved6;

View File

@@ -78,6 +78,7 @@ _services: dict[str, tuple] = {
"modelManagerSP": (False, 1., 1),
"selfdriveStateSP": (True, 100., 10),
"longitudinalPlanSP": (True, 20., 10),
"onroadEventsSP": (True, 1., 1),
# debug
"uiDebug": (True, 0., 1),

View File

@@ -6,7 +6,8 @@ from dataclasses import dataclass
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.selfdrived.events import Alert, EmptyAlert
from openpilot.selfdrive.selfdrived.events import Alert
from openpilot.sunnypilot.selfdrive.selfdrived.events_base import EmptyAlert
with open(os.path.join(BASEDIR, "selfdrive/selfdrived/alerts_offroad.json")) as f:

View File

@@ -1,9 +1,6 @@
#!/usr/bin/env python3
import bisect
import math
import os
from enum import IntEnum
from collections.abc import Callable
from cereal import log, car
import cereal.messaging as messaging
@@ -12,6 +9,11 @@ from openpilot.common.git import get_short_branch
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.locationd.calibrationd import MIN_SPEED_FILTER
from openpilot.sunnypilot.selfdrive.selfdrived.events_base import EventsBase, Priority, ET, Alert, \
NoEntryAlert, SoftDisableAlert, UserSoftDisableAlert, ImmediateDisableAlert, EngagementAlert, NormalPermanentAlert, \
StartupAlert, AlertCallbackType
AlertSize = log.SelfdriveState.AlertSize
AlertStatus = log.SelfdriveState.AlertStatus
VisualAlert = car.CarControl.HUDControl.VisualAlert
@@ -19,201 +21,23 @@ AudibleAlert = car.CarControl.HUDControl.AudibleAlert
EventName = log.OnroadEvent.EventName
# Alert priorities
class Priority(IntEnum):
LOWEST = 0
LOWER = 1
LOW = 2
MID = 3
HIGH = 4
HIGHEST = 5
# Event types
class ET:
ENABLE = 'enable'
PRE_ENABLE = 'preEnable'
OVERRIDE_LATERAL = 'overrideLateral'
OVERRIDE_LONGITUDINAL = 'overrideLongitudinal'
NO_ENTRY = 'noEntry'
WARNING = 'warning'
USER_DISABLE = 'userDisable'
SOFT_DISABLE = 'softDisable'
IMMEDIATE_DISABLE = 'immediateDisable'
PERMANENT = 'permanent'
# get event name from enum
EVENT_NAME = {v: k for k, v in EventName.schema.enumerants.items()}
class Events:
class Events(EventsBase):
def __init__(self):
self.events: list[int] = []
self.static_events: list[int] = []
super().__init__()
self.event_counters = dict.fromkeys(EVENTS.keys(), 0)
@property
def names(self) -> list[int]:
return self.events
def get_events_mapping(self) -> dict[int, dict[str, Alert | AlertCallbackType]]:
return EVENTS
def __len__(self) -> int:
return len(self.events)
def get_event_name(self, event: int):
return EVENT_NAME[event]
def add(self, event_name: int, static: bool=False) -> None:
if static:
bisect.insort(self.static_events, event_name)
bisect.insort(self.events, event_name)
def clear(self) -> None:
self.event_counters = {k: (v + 1 if k in self.events else 0) for k, v in self.event_counters.items()}
self.events = self.static_events.copy()
def contains(self, event_type: str) -> bool:
return any(event_type in EVENTS.get(e, {}) for e in self.events)
def create_alerts(self, event_types: list[str], callback_args=None):
if callback_args is None:
callback_args = []
ret = []
for e in self.events:
types = EVENTS[e].keys()
for et in event_types:
if et in types:
alert = EVENTS[e][et]
if not isinstance(alert, Alert):
alert = alert(*callback_args)
if DT_CTRL * (self.event_counters[e] + 1) >= alert.creation_delay:
alert.alert_type = f"{EVENT_NAME[e]}/{et}"
alert.event_type = et
ret.append(alert)
return ret
def add_from_msg(self, events):
for e in events:
bisect.insort(self.events, e.name.raw)
def to_msg(self):
ret = []
for event_name in self.events:
event = log.OnroadEvent.new_message()
event.name = event_name
for event_type in EVENTS.get(event_name, {}):
setattr(event, event_type, True)
ret.append(event)
return ret
def has(self, event_name: int) -> bool:
return event_name in self.events
def contains_in_list(self, events_list: list[int]) -> bool:
return any(event_name in self.events for event_name in events_list)
def remove(self, event_name: int, static: bool = False) -> None:
if static and event_name in self.static_events:
self.static_events.remove(event_name)
if event_name in self.events:
self.event_counters[event_name] = self.event_counters[event_name] + 1
self.events.remove(event_name)
def replace(self, prev_event_name: int, cur_event_name: int, static: bool = False) -> None:
self.remove(prev_event_name, static)
self.add(cur_event_name, static)
class Alert:
def __init__(self,
alert_text_1: str,
alert_text_2: str,
alert_status: log.SelfdriveState.AlertStatus,
alert_size: log.SelfdriveState.AlertSize,
priority: Priority,
visual_alert: car.CarControl.HUDControl.VisualAlert,
audible_alert: car.CarControl.HUDControl.AudibleAlert,
duration: float,
creation_delay: float = 0.):
self.alert_text_1 = alert_text_1
self.alert_text_2 = alert_text_2
self.alert_status = alert_status
self.alert_size = alert_size
self.priority = priority
self.visual_alert = visual_alert
self.audible_alert = audible_alert
self.duration = int(duration / DT_CTRL)
self.creation_delay = creation_delay
self.alert_type = ""
self.event_type: str | None = None
def __str__(self) -> str:
return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}"
def __gt__(self, alert2) -> bool:
if not isinstance(alert2, Alert):
return False
return self.priority > alert2.priority
EmptyAlert = Alert("" , "", AlertStatus.normal, AlertSize.none, Priority.LOWEST,
VisualAlert.none, AudibleAlert.none, 0)
class NoEntryAlert(Alert):
def __init__(self, alert_text_2: str,
alert_text_1: str = "openpilot Unavailable",
visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none):
super().__init__(alert_text_1, alert_text_2, AlertStatus.normal,
AlertSize.mid, Priority.LOW, visual_alert,
AudibleAlert.refuse, 3.)
class SoftDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.userPrompt, AlertSize.full,
Priority.MID, VisualAlert.steerRequired,
AudibleAlert.warningSoft, 2.),
# less harsh version of SoftDisable, where the condition is user-triggered
class UserSoftDisableAlert(SoftDisableAlert):
def __init__(self, alert_text_2: str):
super().__init__(alert_text_2),
self.alert_text_1 = "openpilot will disengage"
class ImmediateDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.critical, AlertSize.full,
Priority.HIGHEST, VisualAlert.steerRequired,
AudibleAlert.warningImmediate, 4.),
class EngagementAlert(Alert):
def __init__(self, audible_alert: car.CarControl.HUDControl.AudibleAlert):
super().__init__("", "",
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none,
audible_alert, .2),
class NormalPermanentAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "", duration: float = 0.2, priority: Priority = Priority.LOWER, creation_delay: float = 0.):
super().__init__(alert_text_1, alert_text_2,
AlertStatus.normal, AlertSize.mid if len(alert_text_2) else AlertSize.small,
priority, VisualAlert.none, AudibleAlert.none, duration, creation_delay=creation_delay),
class StartupAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "Always keep hands on wheel and eyes on road", alert_status=AlertStatus.normal):
super().__init__(alert_text_1, alert_text_2,
alert_status, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, 5.),
def get_event_msg_type(self):
return log.OnroadEvent
# ********** helper functions **********
@@ -225,8 +49,6 @@ def get_display_speed(speed_ms: float, metric: bool) -> str:
# ********** alert callback functions **********
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int, log.ControlsState], Alert]
def soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
@@ -972,106 +794,6 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
ET.WARNING: personality_changed_alert,
},
# sunnypilot
EventName.lkasEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.engage),
},
EventName.lkasDisable: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
},
EventName.manualSteeringRequired: {
ET.USER_DISABLE: Alert(
"Automatic Lane Centering is OFF",
"Manual Steering Required",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.disengage, 1.),
},
EventName.manualLongitudinalRequired: {
ET.WARNING: Alert(
"Smart/Adaptive Cruise Control: OFF",
"Manual Speed Control Required",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 1.),
},
EventName.silentLkasEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.none),
},
EventName.silentLkasDisable: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.none),
},
EventName.silentBrakeHold: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.none),
ET.NO_ENTRY: NoEntryAlert("Brake Hold Active"),
},
EventName.silentWrongGear: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: Alert(
"Gear not D",
"openpilot Unavailable",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 0.),
},
EventName.silentReverseGear: {
ET.PERMANENT: Alert(
"Reverse\nGear",
"",
AlertStatus.normal, AlertSize.full,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2, creation_delay=0.5),
ET.NO_ENTRY: NoEntryAlert("Reverse Gear"),
},
EventName.silentDoorOpen: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Door Open"),
},
EventName.silentSeatbeltNotLatched: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Seatbelt Unlatched"),
},
EventName.silentParkBrake: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Parking Brake Engaged"),
},
EventName.controlsMismatchLateral: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Controls Mismatch: Lateral"),
ET.NO_ENTRY: NoEntryAlert("Controls Mismatch: Lateral"),
},
EventName.hyundaiRadarTracksConfirmed: {
ET.PERMANENT: NormalPermanentAlert("Radar tracks available. Restart the car to initialize")
},
EventName.experimentalModeSwitched: {
ET.WARNING: NormalPermanentAlert("Experimental Mode Switched", duration=1.5)
}
}

View File

@@ -26,6 +26,7 @@ from openpilot.system.version import get_build_metadata
from openpilot.sunnypilot.mads.mads import ModularAssistiveDrivingSystem
from openpilot.sunnypilot.selfdrive.car.car_specific import CarSpecificEventsSP
from openpilot.sunnypilot.selfdrive.car.cruise_helpers import CruiseHelper
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
@@ -135,8 +136,11 @@ class SelfdriveD(CruiseHelper):
elif self.CP.passive:
self.events.add(EventName.dashcamMode, static=True)
self.events_sp = EventsSP()
self.events_sp_prev = []
self.mads = ModularAssistiveDrivingSystem(self)
sock_services = list(self.pm.sock.keys()) + ['selfdriveStateSP']
sock_services = list(self.pm.sock.keys()) + ['selfdriveStateSP', 'onroadEventsSP']
self.pm = messaging.PubMaster(sock_services)
self.car_events_sp = CarSpecificEventsSP(self.CP, self.params)
@@ -147,6 +151,7 @@ class SelfdriveD(CruiseHelper):
"""Compute onroadEvents from carState"""
self.events.clear()
self.events_sp.clear()
if self.sm['controlsState'].lateralControlState.which() == 'debugState':
self.events.add(EventName.joystickDebug)
@@ -184,7 +189,7 @@ class SelfdriveD(CruiseHelper):
self.events.add_from_msg(car_events)
car_events_sp = self.car_events_sp.update().to_msg()
self.events.add_from_msg(car_events_sp)
self.events_sp.add_from_msg(car_events_sp)
if self.CP.notCar:
# wait for everything to init first
@@ -370,7 +375,7 @@ class SelfdriveD(CruiseHelper):
if self.sm['modelV2'].frameDropPerc > 20:
self.events.add(EventName.modeldLagging)
CruiseHelper.update(self, CS, self.events, self.experimental_mode)
CruiseHelper.update(self, CS, self.events_sp, self.experimental_mode)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
@@ -436,9 +441,13 @@ class SelfdriveD(CruiseHelper):
clear_event_types.add(ET.NO_ENTRY)
pers = LONGITUDINAL_PERSONALITY_MAP[self.personality]
alerts = self.events.create_alerts(self.state_machine.current_alert_types, [self.CP, CS, self.sm, self.is_metric,
self.state_machine.soft_disable_timer, pers])
self.AM.add_many(self.sm.frame, alerts)
callback_args = [self.CP, CS, self.sm, self.is_metric,
self.state_machine.soft_disable_timer, pers]
alerts = self.events.create_alerts(self.state_machine.current_alert_types, callback_args)
alerts_sp = self.events_sp.create_alerts(self.state_machine.current_alert_types, callback_args)
self.AM.add_many(self.sm.frame, alerts + alerts_sp)
self.AM.process_alerts(self.sm.frame, clear_event_types)
def publish_selfdriveState(self, CS):
@@ -483,6 +492,14 @@ class SelfdriveD(CruiseHelper):
self.pm.send('selfdriveStateSP', ss_sp_msg)
# onroadEventsSP - logged every second or on change
if (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events_sp.names != self.events_sp_prev):
ce_send_sp = messaging.new_message('onroadEventsSP', len(self.events_sp))
ce_send_sp.valid = True
ce_send_sp.onroadEventsSP = self.events_sp.to_msg()
self.pm.send('onroadEventsSP', ce_send_sp)
self.events_sp_prev = self.events_sp.names.copy()
def step(self):
CS = self.data_sample()
self.update_events(CS)

View File

@@ -1,8 +1,10 @@
import random
from openpilot.selfdrive.selfdrived.events import Alert, EmptyAlert, EVENTS
from openpilot.selfdrive.selfdrived.events import Alert, EVENTS
from openpilot.selfdrive.selfdrived.alertmanager import AlertManager
from openpilot.sunnypilot.selfdrive.selfdrived.events_base import EmptyAlert
class TestAlertManager:

View File

@@ -35,6 +35,7 @@ from openpilot.sunnypilot.mads.state import StateMachine, GEARS_ALLOW_PAUSED_SIL
State = custom.SelfdriveStateSP.ModularAssistiveDrivingSystem.ModularAssistiveDrivingSystemState
ButtonType = car.CarState.ButtonEvent.Type
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
SafetyModel = car.CarParams.SafetyModel
SET_SPEED_BUTTONS = (ButtonType.accelCruise, ButtonType.resumeCruise, ButtonType.decelCruise, ButtonType.setCruise)
@@ -53,6 +54,7 @@ class ModularAssistiveDrivingSystem:
self.selfdrive.enabled_prev = False
self.state_machine = StateMachine(self)
self.events = self.selfdrive.events
self.events_sp = self.selfdrive.events_sp
if self.selfdrive.CP.carName == "hyundai":
if (self.selfdrive.CP.sunnypilotFlags & HyundaiFlagsSP.HAS_LFA_BUTTON) or \
@@ -78,26 +80,30 @@ class ModularAssistiveDrivingSystem:
def transition_paused_state():
if self.state_machine.state != State.paused:
self.events.add(EventName.silentLkasDisable)
self.events_sp.add(EventNameSP.silentLkasDisable)
def replace_event(old_event: int, new_event: int):
self.events.remove(old_event)
self.events_sp.add(new_event)
if not self.selfdrive.enabled and self.enabled:
if self.events.has(EventName.doorOpen):
self.events.replace(EventName.doorOpen, EventName.silentDoorOpen)
replace_event(EventName.doorOpen, EventNameSP.silentDoorOpen)
transition_paused_state()
if self.events.has(EventName.seatbeltNotLatched):
self.events.replace(EventName.seatbeltNotLatched, EventName.silentSeatbeltNotLatched)
replace_event(EventName.seatbeltNotLatched, EventNameSP.silentSeatbeltNotLatched)
transition_paused_state()
if self.events.has(EventName.wrongGear):
self.events.replace(EventName.wrongGear, EventName.silentWrongGear)
replace_event(EventName.wrongGear, EventNameSP.silentWrongGear)
transition_paused_state()
if self.events.has(EventName.reverseGear):
self.events.replace(EventName.reverseGear, EventName.silentReverseGear)
replace_event(EventName.reverseGear, EventNameSP.silentReverseGear)
transition_paused_state()
if self.events.has(EventName.brakeHold):
self.events.replace(EventName.brakeHold, EventName.silentBrakeHold)
replace_event(EventName.brakeHold, EventNameSP.silentBrakeHold)
transition_paused_state()
if self.events.has(EventName.parkBrake):
self.events.replace(EventName.parkBrake, EventName.silentParkBrake)
replace_event(EventName.parkBrake, EventNameSP.silentParkBrake)
transition_paused_state()
if self.pause_lateral_on_brake_toggle:
@@ -107,7 +113,7 @@ class ModularAssistiveDrivingSystem:
if not (self.pause_lateral_on_brake_toggle and CS.brakePressed) and \
not self.events.contains_in_list(GEARS_ALLOW_PAUSED_SILENT):
if self.state_machine.state == State.paused:
self.events.add(EventName.silentLkasEnable)
self.events_sp.add(EventNameSP.silentLkasEnable)
self.events.remove(EventName.preEnableStandstill)
self.events.remove(EventName.belowEngageSpeed)
@@ -120,25 +126,25 @@ class ModularAssistiveDrivingSystem:
else:
if self.main_enabled_toggle:
if CS.cruiseState.available and not self.selfdrive.CS_prev.cruiseState.available:
self.events.add(EventName.lkasEnable)
self.events_sp.add(EventNameSP.lkasEnable)
for be in CS.buttonEvents:
if be.type == ButtonType.cancel:
if not self.selfdrive.enabled and self.selfdrive.enabled_prev:
self.events.add(EventName.manualLongitudinalRequired)
self.events_sp.add(EventNameSP.manualLongitudinalRequired)
if be.type == ButtonType.lkas and be.pressed and (CS.cruiseState.available or self.allow_always):
if self.enabled:
if self.selfdrive.enabled:
self.events.add(EventName.manualSteeringRequired)
self.events_sp.add(EventNameSP.manualSteeringRequired)
else:
self.events.add(EventName.lkasDisable)
self.events_sp.add(EventNameSP.lkasDisable)
else:
self.events.add(EventName.lkasEnable)
self.events_sp.add(EventNameSP.lkasEnable)
if not CS.cruiseState.available:
self.events.remove(EventName.buttonEnable)
if self.selfdrive.CS_prev.cruiseState.available:
self.events.add(EventName.lkasDisable)
self.events_sp.add(EventNameSP.lkasDisable)
self.events.remove(EventName.pcmDisable)
self.events.remove(EventName.buttonCancel)
@@ -154,7 +160,7 @@ class ModularAssistiveDrivingSystem:
self.update_events(CS)
if not self.selfdrive.CP.passive and self.selfdrive.initialized:
self.enabled, self.active = self.state_machine.update(self.events)
self.enabled, self.active = self.state_machine.update(self.events, self.events_sp)
# Copy of previous SelfdriveD states for MADS events handling
self.selfdrive.enabled_prev = self.selfdrive.enabled

View File

@@ -29,14 +29,17 @@ from openpilot.selfdrive.selfdrived.events import ET, Events
from openpilot.selfdrive.selfdrived.state import SOFT_DISABLE_TIME
from openpilot.common.realtime import DT_CTRL
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
State = custom.SelfdriveStateSP.ModularAssistiveDrivingSystem.ModularAssistiveDrivingSystemState
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.paused, *ACTIVE_STATES)
GEARS_ALLOW_PAUSED_SILENT = [EventName.silentWrongGear, EventName.silentReverseGear, EventName.silentBrakeHold,
EventName.silentDoorOpen, EventName.silentSeatbeltNotLatched, EventName.silentParkBrake]
GEARS_ALLOW_PAUSED_SILENT = [EventNameSP.silentWrongGear, EventNameSP.silentReverseGear, EventNameSP.silentBrakeHold,
EventNameSP.silentDoorOpen, EventNameSP.silentSeatbeltNotLatched, EventNameSP.silentParkBrake]
GEARS_ALLOW_PAUSED = [EventName.wrongGear, EventName.reverseGear, EventName.brakeHold,
EventName.doorOpen, EventName.seatbeltNotLatched, EventName.parkBrake,
*GEARS_ALLOW_PAUSED_SILENT]
@@ -49,45 +52,57 @@ class StateMachine:
self.state = State.disabled
self._events = Events()
self._events_sp = EventsSP()
def add_current_alert_types(self, alert_type):
if not self.selfdrive.enabled:
self.ss_state_machine.current_alert_types.append(alert_type)
def update(self, events: Events):
def check_contains(self, event_type: str) -> bool:
return bool(self._events.contains(event_type) or self._events_sp.contains(event_type))
def check_contains_in_list(self, events_list: list[int]) -> bool:
return bool(self._events.contains_in_list(events_list) or self._events_sp.contains_in_list(events_list))
def update(self, events: Events, events_sp: EventsSP):
# soft disable timer and current alert types are from the state machine of openpilot
# decrement the soft disable timer at every step, as it's reset on
# entrance in SOFT_DISABLING state
self._events = events
self._events_sp = events_sp
# ENABLED, SOFT DISABLING, PAUSED, OVERRIDING
if self.state != State.disabled:
# user and immediate disable always have priority in a non-disabled state
if events.contains(ET.USER_DISABLE):
if events.has(EventName.silentLkasDisable) or events.has(EventName.silentBrakeHold):
if self.check_contains(ET.USER_DISABLE):
if events_sp.has(EventNameSP.silentLkasDisable) or events_sp.has(EventNameSP.silentBrakeHold):
self.state = State.paused
else:
self.state = State.disabled
self.ss_state_machine.current_alert_types.append(ET.USER_DISABLE)
elif events.contains(ET.IMMEDIATE_DISABLE):
elif self.check_contains(ET.IMMEDIATE_DISABLE):
self.state = State.disabled
self.add_current_alert_types(ET.IMMEDIATE_DISABLE)
else:
# ENABLED
if self.state == State.enabled:
if events.contains(ET.SOFT_DISABLE):
if self.check_contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
if not self.selfdrive.enabled:
self.ss_state_machine.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.ss_state_machine.current_alert_types.append(ET.SOFT_DISABLE)
elif events.contains(ET.OVERRIDE_LATERAL):
elif self.check_contains(ET.OVERRIDE_LATERAL):
self.state = State.overriding
self.add_current_alert_types(ET.OVERRIDE_LATERAL)
# SOFT DISABLING
elif self.state == State.softDisabling:
if not events.contains(ET.SOFT_DISABLE):
if not self.check_contains(ET.SOFT_DISABLE):
# no more soft disabling condition, so go back to ENABLED
self.state = State.enabled
@@ -99,12 +114,12 @@ class StateMachine:
# PAUSED
elif self.state == State.paused:
if events.contains(ET.ENABLE):
if events.contains(ET.NO_ENTRY):
if self.check_contains(ET.ENABLE):
if self.check_contains(ET.NO_ENTRY):
self.add_current_alert_types(ET.NO_ENTRY)
else:
if events.contains(ET.OVERRIDE_LATERAL):
if self.check_contains(ET.OVERRIDE_LATERAL):
self.state = State.overriding
else:
self.state = State.enabled
@@ -112,26 +127,26 @@ class StateMachine:
# OVERRIDING
elif self.state == State.overriding:
if events.contains(ET.SOFT_DISABLE):
if self.check_contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
if not self.selfdrive.enabled:
self.ss_state_machine.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.ss_state_machine.current_alert_types.append(ET.SOFT_DISABLE)
elif not events.contains(ET.OVERRIDE_LATERAL):
elif not self.check_contains(ET.OVERRIDE_LATERAL):
self.state = State.enabled
else:
self.ss_state_machine.current_alert_types += [ET.OVERRIDE_LATERAL]
# DISABLED
elif self.state == State.disabled:
if events.contains(ET.ENABLE):
if events.contains(ET.NO_ENTRY):
if events.contains_in_list(GEARS_ALLOW_PAUSED):
if self.check_contains(ET.ENABLE):
if self.check_contains(ET.NO_ENTRY):
if self.check_contains_in_list(GEARS_ALLOW_PAUSED):
self.state = State.paused
self.add_current_alert_types(ET.NO_ENTRY)
else:
if events.contains(ET.OVERRIDE_LATERAL):
if self.check_contains(ET.OVERRIDE_LATERAL):
self.state = State.overriding
else:
self.state = State.enabled

View File

@@ -27,13 +27,14 @@ Last updated: July 29, 2024
import pytest
from pytest_mock import MockerFixture
from cereal import log, custom
from cereal import custom
from openpilot.common.realtime import DT_CTRL
from openpilot.sunnypilot.mads.state import StateMachine, SOFT_DISABLE_TIME, GEARS_ALLOW_PAUSED
from openpilot.selfdrive.selfdrived.events import Events, ET, EVENTS, NormalPermanentAlert
from openpilot.selfdrive.selfdrived.events import ET, NormalPermanentAlert
from openpilot.sunnypilot.selfdrive.selfdrived.events import EVENTS_SP
State = custom.SelfdriveStateSP.ModularAssistiveDrivingSystem.ModularAssistiveDrivingSystemState
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
# The event types that maintain the current state
MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,),
@@ -47,7 +48,7 @@ def make_event(event_types):
event = {}
for ev in event_types:
event[ev] = NormalPermanentAlert("alert")
EVENTS[0] = event
EVENTS_SP[0] = event
return 0
@@ -62,87 +63,93 @@ class TestMADSStateMachine:
@pytest.fixture(autouse=True)
def setup_method(self, mocker: MockerFixture):
self.mads = MockMADS(mocker)
self.events = Events()
self.state_machine = StateMachine(self.mads)
self.events = self.state_machine._events
self.events_sp = self.state_machine._events_sp
self.mads.selfdrive.state_machine.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
def reset(self):
self.events.clear()
self.events_sp.clear()
self.state_machine.state = State.disabled
def test_immediate_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.IMMEDIATE_DISABLE]))
self.events_sp.add(make_event([et, ET.IMMEDIATE_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
self.state_machine.update(self.events, self.events_sp)
assert State.disabled == self.state_machine.state
self.events.clear()
self.reset()
def test_user_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.USER_DISABLE]))
self.events_sp.add(make_event([et, ET.USER_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
self.state_machine.update(self.events, self.events_sp)
assert State.disabled == self.state_machine.state
self.events.clear()
self.reset()
def test_user_disable_to_paused(self):
paused_events = (EventName.silentLkasDisable, EventName.silentBrakeHold)
paused_events = (EventNameSP.silentLkasDisable, EventNameSP.silentBrakeHold)
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.USER_DISABLE]))
self.events_sp.add(make_event([et, ET.USER_DISABLE]))
for en in paused_events:
self.events.add(en)
self.events_sp.add(en)
self.state_machine.state = state
self.state_machine.update(self.events)
final_state = State.paused if self.events.has(en) and state != State.disabled else State.disabled
self.state_machine.update(self.events, self.events_sp)
final_state = State.paused if self.events_sp.has(en) and state != State.disabled else State.disabled
assert self.state_machine.state == final_state
self.events.clear()
self.reset()
def test_soft_disable(self):
for state in ALL_STATES:
if state == State.paused: # paused considers USER_DISABLE instead
continue
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.SOFT_DISABLE]))
self.events_sp.add(make_event([et, ET.SOFT_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.disabled if state == State.disabled else State.softDisabling
self.events.clear()
self.reset()
def test_soft_disable_timer(self):
self.state_machine.state = State.enabled
self.events.add(make_event([ET.SOFT_DISABLE]))
self.state_machine.update(self.events)
self.events_sp.add(make_event([ET.SOFT_DISABLE]))
self.state_machine.update(self.events, self.events_sp)
for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
assert self.state_machine.state == State.softDisabling
self.mads.selfdrive.state_machine.soft_disable_timer -= 1
self.state_machine.update(self.events)
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.disabled
def test_no_entry(self):
for et in ENABLE_EVENT_TYPES:
self.events.add(make_event([ET.NO_ENTRY, et]))
if not self.events.contains_in_list(GEARS_ALLOW_PAUSED):
self.state_machine.update(self.events)
self.events_sp.add(make_event([ET.NO_ENTRY, et]))
if not self.state_machine.check_contains_in_list(GEARS_ALLOW_PAUSED):
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.disabled
self.events.clear()
self.reset()
def test_no_entry_paused(self):
self.state_machine.state = State.paused
self.events.add(make_event([ET.NO_ENTRY]))
self.state_machine.update(self.events)
self.events_sp.add(make_event([ET.NO_ENTRY]))
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.paused
def test_override_lateral(self):
self.state_machine.state = State.enabled
self.events.add(make_event([ET.OVERRIDE_LATERAL]))
self.state_machine.update(self.events)
self.events_sp.add(make_event([ET.OVERRIDE_LATERAL]))
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.overriding
def test_paused_to_enabled(self):
self.state_machine.state = State.paused
self.events.add(make_event([ET.ENABLE]))
self.state_machine.update(self.events)
self.events_sp.add(make_event([ET.ENABLE]))
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == State.enabled
def test_maintain_states(self):
@@ -150,7 +157,7 @@ class TestMADSStateMachine:
for et in MAINTAIN_STATES[state]:
self.state_machine.state = state
if et is not None:
self.events.add(make_event([et]))
self.state_machine.update(self.events)
self.events_sp.add(make_event([et]))
self.state_machine.update(self.events, self.events_sp)
assert self.state_machine.state == state
self.events.clear()
self.reset()

View File

@@ -5,12 +5,12 @@ This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from cereal import log
from cereal import custom
from opendbc.car import structs
from openpilot.selfdrive.selfdrived.events import Events
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
class CarSpecificEventsSP:
@@ -26,9 +26,9 @@ class CarSpecificEventsSP:
self.hyundai_radar_tracks_confirmed = self.params.get_bool("HyundaiRadarTracksConfirmed")
def update(self):
events = Events()
events = EventsSP()
if self.CP.carName == 'hyundai':
if self.hyundai_radar_tracks and not self.hyundai_radar_tracks_confirmed:
events.add(EventName.hyundaiRadarTracksConfirmed)
events.add(EventNameSP.hyundaiRadarTracksConfirmed)
return events

View File

@@ -5,12 +5,12 @@ This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from cereal import car, log
from cereal import car, custom
from opendbc.car import structs
from openpilot.common.params import Params
ButtonType = car.CarState.ButtonEvent.Type
EventName = log.OnroadEvent.EventName
EventNameSP = custom.OnroadEventSP.EventName
DISTANCE_LONG_PRESS = 50
@@ -46,5 +46,5 @@ class CruiseHelper:
if self.button_frame_counts[ButtonType.gapAdjustCruise] >= DISTANCE_LONG_PRESS and not self.experimental_mode_switched:
self._experimental_mode = not experimental_mode
self.params.put_bool_nonblocking("ExperimentalMode", self._experimental_mode)
events.add(EventName.experimentalModeSwitched)
events.add(EventNameSP.experimentalModeSwitched)
self.experimental_mode_switched = True

View File

@@ -0,0 +1,133 @@
from cereal import log, car, custom
from openpilot.sunnypilot.selfdrive.selfdrived.events_base import EventsBase, Priority, ET, Alert, \
NoEntryAlert, ImmediateDisableAlert, EngagementAlert, NormalPermanentAlert, AlertCallbackType
AlertSize = log.SelfdriveState.AlertSize
AlertStatus = log.SelfdriveState.AlertStatus
VisualAlert = car.CarControl.HUDControl.VisualAlert
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
EventNameSP = custom.OnroadEventSP.EventName
# get event name from enum
EVENT_NAME_SP = {v: k for k, v in EventNameSP.schema.enumerants.items()}
class EventsSP(EventsBase):
def __init__(self):
super().__init__()
self.event_counters = dict.fromkeys(EVENTS_SP.keys(), 0)
def get_events_mapping(self) -> dict[int, dict[str, Alert | AlertCallbackType]]:
return EVENTS_SP
def get_event_name(self, event: int):
return EVENT_NAME_SP[event]
def get_event_msg_type(self):
return custom.OnroadEventSP
EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
# sunnypilot
EventNameSP.lkasEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.engage),
},
EventNameSP.lkasDisable: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.disengage),
},
EventNameSP.manualSteeringRequired: {
ET.USER_DISABLE: Alert(
"Automatic Lane Centering is OFF",
"Manual Steering Required",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.disengage, 1.),
},
EventNameSP.manualLongitudinalRequired: {
ET.WARNING: Alert(
"Smart/Adaptive Cruise Control: OFF",
"Manual Speed Control Required",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 1.),
},
EventNameSP.silentLkasEnable: {
ET.ENABLE: EngagementAlert(AudibleAlert.none),
},
EventNameSP.silentLkasDisable: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.none),
},
EventNameSP.silentBrakeHold: {
ET.USER_DISABLE: EngagementAlert(AudibleAlert.none),
ET.NO_ENTRY: NoEntryAlert("Brake Hold Active"),
},
EventNameSP.silentWrongGear: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: Alert(
"Gear not D",
"openpilot Unavailable",
AlertStatus.normal, AlertSize.mid,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 0.),
},
EventNameSP.silentReverseGear: {
ET.PERMANENT: Alert(
"Reverse\nGear",
"",
AlertStatus.normal, AlertSize.full,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2, creation_delay=0.5),
ET.NO_ENTRY: NoEntryAlert("Reverse Gear"),
},
EventNameSP.silentDoorOpen: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Door Open"),
},
EventNameSP.silentSeatbeltNotLatched: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Seatbelt Unlatched"),
},
EventNameSP.silentParkBrake: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, 0.),
ET.NO_ENTRY: NoEntryAlert("Parking Brake Engaged"),
},
EventNameSP.controlsMismatchLateral: {
ET.IMMEDIATE_DISABLE: ImmediateDisableAlert("Controls Mismatch: Lateral"),
ET.NO_ENTRY: NoEntryAlert("Controls Mismatch: Lateral"),
},
EventNameSP.hyundaiRadarTracksConfirmed: {
ET.PERMANENT: NormalPermanentAlert("Radar tracks available. Restart the car to initialize")
},
EventNameSP.experimentalModeSwitched: {
ET.WARNING: NormalPermanentAlert("Experimental Mode Switched", duration=1.5)
}
}

View File

@@ -0,0 +1,225 @@
import bisect
from enum import IntEnum
from abc import abstractmethod
from collections.abc import Callable
from cereal import log, car
import cereal.messaging as messaging
from openpilot.common.realtime import DT_CTRL
AlertSize = log.SelfdriveState.AlertSize
AlertStatus = log.SelfdriveState.AlertStatus
VisualAlert = car.CarControl.HUDControl.VisualAlert
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
# Alert priorities
class Priority(IntEnum):
LOWEST = 0
LOWER = 1
LOW = 2
MID = 3
HIGH = 4
HIGHEST = 5
# Event types
class ET:
ENABLE = 'enable'
PRE_ENABLE = 'preEnable'
OVERRIDE_LATERAL = 'overrideLateral'
OVERRIDE_LONGITUDINAL = 'overrideLongitudinal'
NO_ENTRY = 'noEntry'
WARNING = 'warning'
USER_DISABLE = 'userDisable'
SOFT_DISABLE = 'softDisable'
IMMEDIATE_DISABLE = 'immediateDisable'
PERMANENT = 'permanent'
class Alert:
def __init__(self,
alert_text_1: str,
alert_text_2: str,
alert_status: log.SelfdriveState.AlertStatus,
alert_size: log.SelfdriveState.AlertSize,
priority: Priority,
visual_alert: car.CarControl.HUDControl.VisualAlert,
audible_alert: car.CarControl.HUDControl.AudibleAlert,
duration: float,
creation_delay: float = 0.):
self.alert_text_1 = alert_text_1
self.alert_text_2 = alert_text_2
self.alert_status = alert_status
self.alert_size = alert_size
self.priority = priority
self.visual_alert = visual_alert
self.audible_alert = audible_alert
self.duration = int(duration / DT_CTRL)
self.creation_delay = creation_delay
self.alert_type = ""
self.event_type: str | None = None
def __str__(self) -> str:
return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}"
def __gt__(self, alert2) -> bool:
if not isinstance(alert2, Alert):
return False
return self.priority > alert2.priority
class AlertBase(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str, alert_status: log.SelfdriveState.AlertStatus,
alert_size: log.SelfdriveState.AlertSize, priority: Priority,
visual_alert: car.CarControl.HUDControl.VisualAlert,
audible_alert: car.CarControl.HUDControl.AudibleAlert, duration: float):
super().__init__(alert_text_1, alert_text_2, alert_status, alert_size, priority, visual_alert, audible_alert, duration)
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int, log.ControlsState], Alert]
class EventsBase:
def __init__(self):
self.events: list[int] = []
self.static_events: list[int] = []
self.event_counters = {}
@property
def names(self) -> list[int]:
return self.events
def __len__(self) -> int:
return len(self.events)
def add(self, event_name: int, static: bool = False) -> None:
if static:
bisect.insort(self.static_events, event_name)
bisect.insort(self.events, event_name)
def clear(self) -> None:
self.event_counters = {k: (v + 1 if k in self.events else 0) for k, v in self.event_counters.items()}
self.events = self.static_events.copy()
def contains(self, event_type: str) -> bool:
return any(event_type in self.get_events_mapping().get(e, {}) for e in self.events)
def create_alerts(self, event_types: list[str], callback_args=None):
if callback_args is None:
callback_args = []
ret = []
for e in self.events:
types = self.get_events_mapping()[e].keys()
for et in event_types:
if et in types:
alert = self.get_events_mapping()[e][et]
if not isinstance(alert, Alert):
alert = alert(*callback_args)
if DT_CTRL * (self.event_counters[e] + 1) >= alert.creation_delay:
alert.alert_type = f"{self.get_event_name(e)}/{et}"
alert.event_type = et
ret.append(alert)
return ret
def add_from_msg(self, events):
for e in events:
bisect.insort(self.events, e.name.raw)
def to_msg(self):
ret = []
for event_name in self.events:
event = self.get_event_msg_type().new_message()
event.name = event_name
for event_type in self.get_events_mapping().get(event_name, {}):
setattr(event, event_type, True)
ret.append(event)
return ret
def has(self, event_name: int) -> bool:
return event_name in self.events
def contains_in_list(self, events_list: list[int]) -> bool:
return any(event_name in self.events for event_name in events_list)
def remove(self, event_name: int, static: bool = False) -> None:
if static and event_name in self.static_events:
self.static_events.remove(event_name)
if event_name in self.events:
self.event_counters[event_name] = self.event_counters[event_name] + 1
self.events.remove(event_name)
@abstractmethod
def get_events_mapping(self) -> dict[int, dict[str, Alert | AlertCallbackType]]:
raise NotImplementedError
@abstractmethod
def get_event_name(self, event: int) -> str:
raise NotImplementedError
@abstractmethod
def get_event_msg_type(self):
raise NotImplementedError
EmptyAlert = Alert("" , "", AlertStatus.normal, AlertSize.none, Priority.LOWEST,
VisualAlert.none, AudibleAlert.none, 0)
class NoEntryAlert(Alert):
def __init__(self, alert_text_2: str,
alert_text_1: str = "openpilot Unavailable",
visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none):
super().__init__(alert_text_1, alert_text_2, AlertStatus.normal,
AlertSize.mid, Priority.LOW, visual_alert,
AudibleAlert.refuse, 3.)
class SoftDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.userPrompt, AlertSize.full,
Priority.MID, VisualAlert.steerRequired,
AudibleAlert.warningSoft, 2.),
# less harsh version of SoftDisable, where the condition is user-triggered
class UserSoftDisableAlert(SoftDisableAlert):
def __init__(self, alert_text_2: str):
super().__init__(alert_text_2),
self.alert_text_1 = "openpilot will disengage"
class ImmediateDisableAlert(Alert):
def __init__(self, alert_text_2: str):
super().__init__("TAKE CONTROL IMMEDIATELY", alert_text_2,
AlertStatus.critical, AlertSize.full,
Priority.HIGHEST, VisualAlert.steerRequired,
AudibleAlert.warningImmediate, 4.),
class EngagementAlert(Alert):
def __init__(self, audible_alert: car.CarControl.HUDControl.AudibleAlert):
super().__init__("", "",
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none,
audible_alert, .2),
class NormalPermanentAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "", duration: float = 0.2, priority: Priority = Priority.LOWER, creation_delay: float = 0.):
super().__init__(alert_text_1, alert_text_2,
AlertStatus.normal, AlertSize.mid if len(alert_text_2) else AlertSize.small,
priority, VisualAlert.none, AudibleAlert.none, duration, creation_delay=creation_delay),
class StartupAlert(Alert):
def __init__(self, alert_text_1: str, alert_text_2: str = "Always keep hands on wheel and eyes on road", alert_status=AlertStatus.normal):
super().__init__(alert_text_1, alert_text_2,
alert_status, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, 5.),