diff --git a/selfdrive/monitoring/helpers.py b/selfdrive/monitoring/helpers.py index 002e01ae3..4f068c4f5 100644 --- a/selfdrive/monitoring/helpers.py +++ b/selfdrive/monitoring/helpers.py @@ -449,7 +449,6 @@ class DriverMonitoring: rpyCalib = [0., 0., 0.] else: highway_speed = sm['carState'].vEgo - # TODO-SP: unit test to assert both control checks are always present enabled = sm['selfdriveState'].enabled or sm['carControl'].latActive wrong_gear = sm['carState'].gearShifter not in (car.CarState.GearShifter.drive, car.CarState.GearShifter.low) standstill = sm['carState'].standstill diff --git a/selfdrive/monitoring/test_monitoring.py b/selfdrive/monitoring/test_monitoring.py index 6ea9b8028..75adb6a2c 100644 --- a/selfdrive/monitoring/test_monitoring.py +++ b/selfdrive/monitoring/test_monitoring.py @@ -1,6 +1,7 @@ import numpy as np +import pytest -from cereal import log +from cereal import log, car from openpilot.common.realtime import DT_DMON from openpilot.selfdrive.monitoring.helpers import DriverMonitoring, DRIVER_MONITOR_SETTINGS from openpilot.system.hardware import HARDWARE @@ -204,3 +205,66 @@ class TestMonitoring: assert EventName.driverUnresponsive in \ events[int((INVISIBLE_SECONDS_TO_RED-1+DT_DMON*d_status.settings._HI_STD_FALLBACK_TIME+0.1)/DT_DMON)].names + +@pytest.mark.parametrize("enabled_state, lat_active_state, expected", [ + (False, False, False), # Both Disabled + (True, False, True), # OP Enabled, Lat Inactive + (False, True, True), # OP Disabled, Lat Active (e.g. MADS) + (True, True, True) # Both Active +]) +def test_enabled_states(enabled_state, lat_active_state, expected): + """ + Test DriverMonitoring.run_step with all 4 combinations of: + - selfdriveState.enabled (True/False) + - carControl.latActive (True/False) + """ + cs = car.CarState.new_message() + cs.vEgo = 30.0 + cs.gearShifter = car.CarState.GearShifter.drive + cs.standstill = False + cs.steeringPressed = False + cs.gasPressed = False + + ss = log.SelfdriveState.new_message() + ss.enabled = enabled_state + + cc = car.CarControl.new_message() + cc.latActive = lat_active_state + + mv2 = log.ModelDataV2.new_message() + mv2.meta.disengagePredictions.brakeDisengageProbs = [0.0] + + lc = log.LiveCalibrationData.new_message() + lc.rpyCalib = [0.0, 0.0, 0.0] + + ds = make_msg(False) + + sm = { + 'carState': cs, + 'selfdriveState': ss, + 'carControl': cc, + 'modelV2': mv2, + 'liveCalibration': lc, + 'driverStateV2': ds + } + + driver_monitoring = DriverMonitoring() + + # run_test doesn't assign enabled to a variable, so we need to spy on _update_events to see its value + captured_args = [] + original_update_events = driver_monitoring._update_events + + def spy_update_events(driver_engaged, op_engaged, standstill, wrong_gear, car_speed): + captured_args.append(op_engaged) + return original_update_events(driver_engaged, op_engaged, standstill, wrong_gear, car_speed) + + driver_monitoring._update_events = spy_update_events + + driver_monitoring.run_step(sm, demo=False) + + # Assertion + assert len(captured_args) == 1, "Expected _update_events to be called exactly once" + actual_enabled = captured_args[0] + + assert actual_enabled == expected, f"Expected op_engaged={expected}, but got {actual_enabled}" +