Files
sunnypilot/selfdrive/controls/lib/alertmanager.py
Adeeb Shihadeh 017b084154 Alert when updated consistently fails (#2013)
* alert when update fails more than 10 times

* bring over offroad alert refactor from other branch

* and we have tests

* use it in snapshot

* bump apk

* don't show exceptions on release branches

* only write when changed

* why does delete use so much cpu

* clean that up

* little more
old-commit-hash: 8e63f06540
2020-08-11 16:23:57 -07:00

91 lines
2.9 KiB
Python

import os
import copy
import json
from cereal import car, log
from common.basedir import BASEDIR
from common.params import Params
from common.realtime import DT_CTRL
from selfdrive.swaglog import cloudlog
AlertSize = log.ControlsState.AlertSize
AlertStatus = log.ControlsState.AlertStatus
VisualAlert = car.CarControl.HUDControl.VisualAlert
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f:
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert, show_alert, extra_text=None):
if show_alert:
a = OFFROAD_ALERTS[alert]
if extra_text is not None:
a = copy.copy(OFFROAD_ALERTS[alert])
a['text'] += extra_text
Params().put(alert, json.dumps(a))
else:
Params().delete(alert)
class AlertManager():
def __init__(self):
self.activealerts = []
def alert_present(self):
return len(self.activealerts) > 0
def add_many(self, frame, alerts, enabled=True):
for a in alerts:
self.add(frame, a, enabled=enabled)
def add(self, frame, alert, enabled=True):
added_alert = copy.copy(alert)
added_alert.start_time = frame * DT_CTRL
# if new alert is higher priority, log it
if not self.alert_present() or added_alert.alert_priority > self.activealerts[0].alert_priority:
cloudlog.event('alert_add', alert_type=added_alert.alert_type, enabled=enabled)
self.activealerts.append(added_alert)
# sort by priority first and then by start_time
self.activealerts.sort(key=lambda k: (k.alert_priority, k.start_time), reverse=True)
def process_alerts(self, frame):
cur_time = frame * DT_CTRL
# first get rid of all the expired alerts
self.activealerts = [a for a in self.activealerts if a.start_time +
max(a.duration_sound, a.duration_hud_alert, a.duration_text) > cur_time]
current_alert = self.activealerts[0] if self.alert_present() else None
# start with assuming no alerts
self.alert_type = ""
self.alert_text_1 = ""
self.alert_text_2 = ""
self.alert_status = AlertStatus.normal
self.alert_size = AlertSize.none
self.visual_alert = VisualAlert.none
self.audible_alert = AudibleAlert.none
self.alert_rate = 0.
if current_alert:
self.alert_type = current_alert.alert_type
if current_alert.start_time + current_alert.duration_sound > cur_time:
self.audible_alert = current_alert.audible_alert
if current_alert.start_time + current_alert.duration_hud_alert > cur_time:
self.visual_alert = current_alert.visual_alert
if current_alert.start_time + current_alert.duration_text > cur_time:
self.alert_text_1 = current_alert.alert_text_1
self.alert_text_2 = current_alert.alert_text_2
self.alert_status = current_alert.alert_status
self.alert_size = current_alert.alert_size
self.alert_rate = current_alert.alert_rate