Files
sunnypilot/selfdrive/selfdrived/alertmanager.py
Shane Smiskol cf50d4ae19 AlertManager: reset duration when re-adding an event (#33478)
* fix alert duration when re-adding an alert before it's ended (personality)

* clean up

* messy test

* clean up test

* Revert "clean up test"

This reverts commit e7c0f80cb3d0061558920cf2baf37c6ea6935c81.

* better name

* debug

* Revert "debug"

This reverts commit da8d561445f0f0d142cec99c583dda798973544c.

* Reapply "clean up test"

This reverts commit a7dba540f78d1c32317e5f89e83de5aac1136c38.

* update refs
2024-10-04 16:57:55 -07:00

68 lines
1.9 KiB
Python

import copy
import os
import json
from collections import defaultdict
from dataclasses import dataclass
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.selfdrived.events import Alert, EmptyAlert
with open(os.path.join(BASEDIR, "selfdrive/selfdrived/alerts_offroad.json")) as f:
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
Params().put(alert, json.dumps(a))
else:
Params().remove(alert)
@dataclass
class AlertEntry:
alert: Alert | None = None
start_frame: int = -1
end_frame: int = -1
added_frame: int = -1
def active(self, frame: int) -> bool:
return frame <= self.end_frame
def just_added(self, frame: int) -> bool:
return self.active(frame) and frame == (self.added_frame + 1)
class AlertManager:
def __init__(self):
self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry)
self.current_alert = EmptyAlert
def add_many(self, frame: int, alerts: list[Alert]) -> None:
for alert in alerts:
entry = self.alerts[alert.alert_type]
entry.alert = alert
if not entry.just_added(frame):
entry.start_frame = frame
min_end_frame = entry.start_frame + alert.duration
entry.end_frame = max(frame + 1, min_end_frame)
entry.added_frame = frame
def process_alerts(self, frame: int, clear_event_types: set):
ae = AlertEntry()
for v in self.alerts.values():
if not v.alert:
continue
if v.alert.event_type in clear_event_types:
v.end_frame = -1
# sort by priority first and then by start_frame
greater = ae.alert is None or (v.alert.priority, v.start_frame) > (ae.alert.priority, ae.start_frame)
if v.active(frame) and greater:
ae = v
self.current_alert = ae.alert if ae.alert is not None else EmptyAlert