Files
sunnypilot/selfdrive/controls/lib/alertmanager.py
Adeeb Shihadeh a9626f95b6 add openpilot prefix to imports (#29498)
* add openpilot prefix to imports

* more

* more

* fix docs

* fix linter

* bump submodules

* fix patched tests

* update dynamic imports

* debug

* Revert "debug"

This reverts commit db5e13b9911cc74438bee123bc3430da6c31b24b.

* fix pm test
2023-08-20 20:49:55 -07:00

63 lines
1.8 KiB
Python

import copy
import os
import json
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Optional
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import Alert
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f:
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
Params().put(alert, json.dumps(a))
else:
Params().remove(alert)
@dataclass
class AlertEntry:
alert: Optional[Alert] = None
start_frame: int = -1
end_frame: int = -1
def active(self, frame: int) -> bool:
return frame <= self.end_frame
class AlertManager:
def __init__(self):
self.alerts: Dict[str, AlertEntry] = defaultdict(AlertEntry)
def add_many(self, frame: int, alerts: List[Alert]) -> None:
for alert in alerts:
entry = self.alerts[alert.alert_type]
entry.alert = alert
if not entry.active(frame):
entry.start_frame = frame
min_end_frame = entry.start_frame + alert.duration
entry.end_frame = max(frame + 1, min_end_frame)
def process_alerts(self, frame: int, clear_event_types: set) -> Optional[Alert]:
current_alert = AlertEntry()
for v in self.alerts.values():
if not v.alert:
continue
if v.alert.event_type in clear_event_types:
v.end_frame = -1
# sort by priority first and then by start_frame
greater = current_alert.alert is None or (v.alert.priority, v.start_frame) > (current_alert.alert.priority, current_alert.start_frame)
if v.active(frame) and greater:
current_alert = v
return current_alert.alert