fix changed events

This commit is contained in:
Jason Wen
2024-11-16 22:58:35 -05:00
parent 3ac6fa5c5c
commit 31d8c97f6a
2 changed files with 48 additions and 27 deletions

View File

@@ -226,31 +226,36 @@ AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster,
def soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
if soft_disable_time < int(0.5 / DT_CTRL):
return ImmediateDisableAlert(alert_text_2)
return SoftDisableAlert(alert_text_2)
return func
def user_soft_disable_alert(alert_text_2: str) -> AlertCallbackType:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
if soft_disable_time < int(0.5 / DT_CTRL):
return ImmediateDisableAlert(alert_text_2)
return UserSoftDisableAlert(alert_text_2)
return func
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
branch = get_short_branch() # Ensure get_short_branch is cached to avoid lags on startup
if "REPLAY" in os.environ:
branch = "replay"
return StartupAlert("WARNING: This branch is not tested", branch, alert_status=AlertStatus.userPrompt)
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
return NoEntryAlert(f"Drive above {get_display_speed(CP.minEnableSpeed, metric)} to engage")
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
return Alert(
f"Steer Unavailable Below {get_display_speed(CP.minSteerSpeed, metric)}",
"",
@@ -258,7 +263,8 @@ def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.S
Priority.LOW, VisualAlert.steerRequired, AudibleAlert.prompt, 0.4)
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
first_word = 'Recalibration' if sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.recalibrating else 'Calibration'
return Alert(
f"{first_word} in Progress: {sm['liveCalibration'].calPerc:.0f}%",
@@ -269,37 +275,43 @@ def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messag
# *** debug alerts ***
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
full_perc = round(100. - sm['deviceState'].freeSpacePercent)
return NormalPermanentAlert("Out of Storage", f"{full_perc}% full")
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
mdl = sm['modelV2'].velocity.x[0] if len(sm['modelV2'].velocity.x) else math.nan
err = CS.vEgo - mdl
msg = f"Speed Error: {err:.1f} m/s"
return NoEntryAlert(msg, alert_text_1="Posenet Speed Invalid")
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning]
msg = ', '.join(not_running)
return NoEntryAlert(msg, alert_text_1="Process Not Running")
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
bs = [s for s in sm.data.keys() if not sm.all_checks([s, ])]
msg = ', '.join(bs[:4]) # can't fit too many on one line
return NoEntryAlert(msg, alert_text_1="Communication Issue Between Processes")
def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
all_cams = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')
bad_cams = [s.replace('State', '') for s in all_cams if s in sm.data.keys() and not sm.all_checks([s, ])]
return NormalPermanentAlert("Camera Malfunction", ', '.join(bad_cams))
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
rpy = sm['liveCalibration'].rpyCalib
yaw = math.degrees(rpy[2] if len(rpy) == 3 else math.nan)
pitch = math.degrees(rpy[1] if len(rpy) == 3 else math.nan)
@@ -307,41 +319,48 @@ def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging
return NormalPermanentAlert("Calibration Invalid", angles)
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
cpu = max(sm['deviceState'].cpuTempC, default=0.)
gpu = max(sm['deviceState'].gpuTempC, default=0.)
temp = max((cpu, gpu, sm['deviceState'].memoryTempC))
return NormalPermanentAlert("System Overheated", f"{temp:.0f} °C")
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
return NormalPermanentAlert("Low Memory", f"{sm['deviceState'].memoryUsagePercent}% used")
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
x = max(sm['deviceState'].cpuUsagePercent, default=0.)
return NormalPermanentAlert("High CPU Usage", f"{x}% used")
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
return NormalPermanentAlert("Driving Model Lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped")
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
text = "Enable Adaptive Cruise to Engage"
if CP.carName == "honda":
text = "Enable Main Switch to Engage"
return NoEntryAlert(text)
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
gb = sm['carControl'].actuators.accel / 4.
steer = sm['carControl'].actuators.steer
vals = f"Gas: {round(gb * 100.)}%, Steer: {round(steer * 100.)}%"
return NormalPermanentAlert("Joystick Mode", vals)
def longitudinal_maneuver_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def longitudinal_maneuver_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
ad = sm['alertDebug']
audible_alert = AudibleAlert.prompt if 'Active' in ad.alertText1 else AudibleAlert.none
alert_status = AlertStatus.userPrompt if 'Active' in ad.alertText1 else AlertStatus.normal
@@ -351,17 +370,18 @@ def longitudinal_maneuver_alert(CP: car.CarParams, CS: car.CarState, sm: messagi
Priority.LOW, VisualAlert.none, audible_alert, 0.2)
def personality_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
def personality_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
personality = str(personality).title()
return NormalPermanentAlert(f"Driving Personality: {personality}", duration=1.5)
def mads_status_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
ss = sm["selfdriveState"]
mads = sm["selfdriveStateSP"].mads
lat_active_str = "ON" if mads.active else "OFF"
long_active_str = "ON" if ss.active else "OFF"
return NormalPermanentAlert(f"Lateral: {lat_active_str} | Longitudinal: {long_active_str}", duration=1.5)
def mads_status_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality,
mads_status: tuple[bool, bool]) -> Alert:
lat_active, long_active = mads_status
lat_active_str = "ON" if lat_active else "OFF"
long_active_str = "ON" if long_active else "OFF"
return NormalPermanentAlert(f"Steering: {lat_active_str} | ACC: {long_active_str}", duration=1.5)
EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {

View File

@@ -424,7 +424,8 @@ class SelfdriveD:
pers = LONGITUDINAL_PERSONALITY_MAP[self.personality]
alerts = self.events.create_alerts(self.state_machine.current_alert_types, [self.CP, CS, self.sm, self.is_metric,
self.state_machine.soft_disable_timer, pers])
self.state_machine.soft_disable_timer, pers,
(self.mads.active, self.enabled)])
self.AM.add_many(self.sm.frame, alerts)
self.AM.process_alerts(self.sm.frame, clear_event_types)