mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-02-18 17:23:52 +08:00
* alternative experience * safety init * fix * more update * not really * misra * Add Custom MIT License (#38) * brake check was not handled * revert * alt -> lkas * explicit checks * support toyota and ford * rename * hyundai can-fd support * only allow lkas if enabled * hyundai: main button handling * revert * hyundai: main button heartbeat * add logging for controls allowed lateral * fix panda safety * ford btn * toyota btn * fca btn * honda btn * mads safety tests * more tests * safety misra * safety mutation * misra * mutation experiment * fix * ford test main button * ford test lkas button * more ford test * hyundai lkas and main * more ford * hyundai canfd * rename * rename * cleaner * more fixes * more hyundai tests * no longer needed * thanks for tests! * more tests for lat * more explicit * make sure to reset * try this out * probably needed * move * misra * not needed * move to safety_mads * not really needed * remove * MADS: Refactor MADS safety with improved state management (pull request #46) Refactor MADS safety with improved state management This commit introduces a major refactoring of the MADS safety module, improving state management and control flow. Key changes include: Core Changes: - Introduced a MADSState struct to centralize state management - Removed global state variables in favor of structured state - Implemented button transition handling with explicit state tracking (PRESSED/RELEASED/NO_CHANGE) - Added state flags for button availability detection - Simplified lateral control permission logic Button Handling: - Separated main button and LKAS button state tracking - Added independent engagement states for each button - Improved button press detection across multiple platforms - Added support for main and LKAS buttons on Hyundai platforms - Modified ACC main state handling Testing: - Added comprehensive test coverage for MADS state transitions - Added new MADS-specific test base class for consistent testing across platforms - Added mutation testing for state management - Extended timeout for mutation tests from 5 to 8 minutes - Added extensive button press validation tests - Enhanced debugging output in replay drive tests The refactored code provides a more organized implementation of MADS safety features while maintaining compatibility with existing safety checks. * adding note * adding ford (WIP) * adding honda (WIP) * adding toyota (WIP) * adding chrysler (WIP) * Standardize Button State Handling Across Platforms Refactor button state handling by replacing integer constants with an enumerated `ButtonState` type and updating logic to improve readability and maintainability. This change affects button press detection in Ford, Honda, Hyundai, and Toyota safety modules and aligns them with a unified MADS button state approach. Enums provide a clearer understanding of button states and transitions, facilitating easier maintenance and future enhancements. * Disable LKAS button press logic in Honda and Toyota safety. The code for processing LKAS button presses has been commented out in both Honda and Toyota safety implementations. This change aims to investigate or temporarily halt the button press effects without removing the logic altogether. It will be important to test for any impacts this may have on vehicle control functionality. * Remove commented out code in toyota_rx_hook function This commit cleans up the toyota_rx_hook function by removing unnecessary commented-out code that checks for LKAS button presses on bus 2. This helps improve code readability and maintainability without altering the existing functionality. * GM, mazda, nissan, subaru (global & preglobal) * Honda LKAS * Revert "Remove commented out code in toyota_rx_hook function" This reverts commit d6b012c01a08118d91fad56c9f6ac2f92b671968. * Toyota, Subaru Global LKAS * nissan fix * gm fix * use speed msg to force rx * im bored * misra * subaru/toyota/honda * nope * attempt * go through all buttons * try nissan * more nissan * nissan tests passed! * subaru lkas test (not sure why it's not passing 2 and 3 values) * Improved code organization in safety_subaru.h and test_subaru.py This commit includes a minor restructuring in safety_subaru.h and test_subaru.py for better readability and flow. The condition check in safety_subaru.h for lkas_hud now has explicit parentheses. With regard to test_subaru.py, an unnecessary import was removed, and the sequence of steps in the test was reordered - now enabling mads and cleaning up mads_states happens before each subtest. * Refactor tests to use _speed_msg instead of _user_brake_msg. Updated the MADS safety tests to utilize the _speed_msg(0) function call in place of _user_brake_msg(False). * Reworking the tests a little for clarity * disabling lkas again on toyota temporarily * fix mads condition to engage * hyundai and honda good with new tests * Redoing more tests * update for safety tick ensuring mads control is exited while lagging * Updating tests for toyota * cleaning up tests on hkg * commenting out temp_debug for future use * revert * constants * cleanup * format! * match yota * Apply suggestions from code review * force * explicit checks * revert --------- Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
994 lines
40 KiB
Python
994 lines
40 KiB
Python
import os
|
|
import abc
|
|
import unittest
|
|
import importlib
|
|
import numpy as np
|
|
from collections.abc import Callable
|
|
|
|
from opendbc.can.packer import CANPacker # pylint: disable=import-error
|
|
from panda import ALTERNATIVE_EXPERIENCE
|
|
from panda.tests.libpanda import libpanda_py
|
|
from panda.tests.safety.mads_common import MadsCommonBase
|
|
|
|
MAX_WRONG_COUNTERS = 5
|
|
MAX_SAMPLE_VALS = 6
|
|
VEHICLE_SPEED_FACTOR = 100
|
|
|
|
MessageFunction = Callable[[float], libpanda_py.CANPacket]
|
|
|
|
def sign_of(a):
|
|
return 1 if a > 0 else -1
|
|
|
|
|
|
def make_msg(bus, addr, length=8, dat=None):
|
|
if dat is None:
|
|
dat = b'\x00' * length
|
|
return libpanda_py.make_CANPacket(addr, bus, dat)
|
|
|
|
|
|
class CANPackerPanda(CANPacker):
|
|
def make_can_msg_panda(self, name_or_addr, bus, values, fix_checksum=None):
|
|
msg = self.make_can_msg(name_or_addr, bus, values)
|
|
if fix_checksum is not None:
|
|
msg = fix_checksum(msg)
|
|
addr, dat, bus = msg
|
|
return libpanda_py.make_CANPacket(addr, bus, dat)
|
|
|
|
|
|
def add_regen_tests(cls):
|
|
"""Dynamically adds regen tests for all user brake tests."""
|
|
|
|
# only rx/user brake tests, not brake command
|
|
found_tests = [func for func in dir(cls) if func.startswith("test_") and "user_brake" in func]
|
|
assert len(found_tests) >= 3, "Failed to detect known brake tests"
|
|
|
|
for test in found_tests:
|
|
def _make_regen_test(brake_func):
|
|
def _regen_test(self):
|
|
# only for safety modes with a regen message
|
|
if self._user_regen_msg(0) is None:
|
|
raise unittest.SkipTest("Safety mode implements no _user_regen_msg")
|
|
|
|
getattr(self, brake_func)(self._user_regen_msg, self.safety.get_regen_braking_prev)
|
|
return _regen_test
|
|
|
|
setattr(cls, test.replace("brake", "regen"), _make_regen_test(test))
|
|
|
|
return cls
|
|
|
|
|
|
class PandaSafetyTestBase(unittest.TestCase):
|
|
safety: libpanda_py.Panda
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "PandaSafetyTestBase":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
def _reset_safety_hooks(self):
|
|
self.safety.set_safety_hooks(self.safety.get_current_safety_mode(),
|
|
self.safety.get_current_safety_param())
|
|
|
|
def _rx(self, msg):
|
|
return self.safety.safety_rx_hook(msg)
|
|
|
|
def _tx(self, msg):
|
|
return self.safety.safety_tx_hook(msg)
|
|
|
|
def _generic_limit_safety_check(self, msg_function: MessageFunction, min_allowed_value: float, max_allowed_value: float,
|
|
min_possible_value: float, max_possible_value: float, test_delta: float = 1, inactive_value: float = 0,
|
|
msg_allowed = True, additional_setup: Callable[[float], None] | None = None):
|
|
"""
|
|
Enforces that a signal within a message is only allowed to be sent within a specific range, min_allowed_value -> max_allowed_value.
|
|
Tests the range of min_possible_value -> max_possible_value with a delta of test_delta.
|
|
Message is also only allowed to be sent when controls_allowed is true, unless the value is equal to inactive_value.
|
|
Message is never allowed if msg_allowed is false, for example when stock longitudinal is enabled and you are sending acceleration requests.
|
|
additional_setup is used for extra setup before each _tx, ex: for setting the previous torque for rate limits
|
|
"""
|
|
|
|
# Ensure that we at least test the allowed_value range
|
|
self.assertGreater(max_possible_value, max_allowed_value)
|
|
self.assertLessEqual(min_possible_value, min_allowed_value)
|
|
|
|
for controls_allowed in [False, True]:
|
|
# enforce we don't skip over 0 or inactive
|
|
for v in np.concatenate((np.arange(min_possible_value, max_possible_value, test_delta), np.array([0, inactive_value]))):
|
|
v = round(v, 2) # floats might not hit exact boundary conditions without rounding
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
if additional_setup is not None:
|
|
additional_setup(v)
|
|
should_tx = controls_allowed and min_allowed_value <= v <= max_allowed_value
|
|
should_tx = (should_tx or v == inactive_value) and msg_allowed
|
|
self.assertEqual(self._tx(msg_function(v)), should_tx, (controls_allowed, should_tx, v))
|
|
|
|
def _common_measurement_test(self, msg_func: Callable, min_value: float, max_value: float, factor: float,
|
|
meas_min_func: Callable[[], int], meas_max_func: Callable[[], int]):
|
|
"""Tests accurate measurement parsing, and that the struct is reset on safety mode init"""
|
|
for val in np.arange(min_value, max_value, 0.5):
|
|
for i in range(MAX_SAMPLE_VALS):
|
|
self.assertTrue(self._rx(msg_func(val + i * 0.1)))
|
|
|
|
# assert close by one decimal place
|
|
self.assertAlmostEqual(meas_min_func() / factor, val, delta=0.1)
|
|
self.assertAlmostEqual(meas_max_func() / factor - 0.5, val, delta=0.1)
|
|
|
|
# ensure sample_t is reset on safety init
|
|
self._reset_safety_hooks()
|
|
self.assertEqual(meas_min_func(), 0)
|
|
self.assertEqual(meas_max_func(), 0)
|
|
|
|
|
|
class LongitudinalAccelSafetyTest(PandaSafetyTestBase, abc.ABC):
|
|
|
|
MAX_ACCEL: float = 2.0
|
|
MIN_ACCEL: float = -3.5
|
|
INACTIVE_ACCEL: float = 0.0
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "LongitudinalAccelSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _accel_msg(self, accel: float):
|
|
pass
|
|
|
|
def test_accel_limits_correct(self):
|
|
self.assertGreater(self.MAX_ACCEL, 0)
|
|
self.assertLess(self.MIN_ACCEL, 0)
|
|
|
|
def test_accel_actuation_limits(self, stock_longitudinal=False):
|
|
limits = ((self.MIN_ACCEL, self.MAX_ACCEL, ALTERNATIVE_EXPERIENCE.DEFAULT),
|
|
(self.MIN_ACCEL, self.MAX_ACCEL, ALTERNATIVE_EXPERIENCE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX))
|
|
|
|
for min_accel, max_accel, alternative_experience in limits:
|
|
# enforce we don't skip over 0 or inactive accel
|
|
for accel in np.concatenate((np.arange(min_accel - 1, max_accel + 1, 0.05), [0, self.INACTIVE_ACCEL])):
|
|
accel = round(accel, 2) # floats might not hit exact boundary conditions without rounding
|
|
for controls_allowed in [True, False]:
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
self.safety.set_alternative_experience(alternative_experience)
|
|
if stock_longitudinal:
|
|
should_tx = False
|
|
else:
|
|
should_tx = controls_allowed and min_accel <= accel <= max_accel
|
|
should_tx = should_tx or accel == self.INACTIVE_ACCEL
|
|
self.assertEqual(should_tx, self._tx(self._accel_msg(accel)))
|
|
|
|
|
|
class LongitudinalGasBrakeSafetyTest(PandaSafetyTestBase, abc.ABC):
|
|
|
|
MIN_BRAKE: int = 0
|
|
MAX_BRAKE: int | None = None
|
|
MAX_POSSIBLE_BRAKE: int | None = None
|
|
|
|
MIN_GAS: int = 0
|
|
MAX_GAS: int | None = None
|
|
INACTIVE_GAS = 0
|
|
MAX_POSSIBLE_GAS: int | None = None
|
|
|
|
def test_gas_brake_limits_correct(self):
|
|
self.assertIsNotNone(self.MAX_POSSIBLE_BRAKE)
|
|
self.assertIsNotNone(self.MAX_POSSIBLE_GAS)
|
|
|
|
self.assertGreater(self.MAX_BRAKE, self.MIN_BRAKE)
|
|
self.assertGreater(self.MAX_GAS, self.MIN_GAS)
|
|
|
|
@abc.abstractmethod
|
|
def _send_gas_msg(self, gas: int):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _send_brake_msg(self, brake: int):
|
|
pass
|
|
|
|
def test_brake_safety_check(self):
|
|
self._generic_limit_safety_check(self._send_brake_msg, self.MIN_BRAKE, self.MAX_BRAKE, 0, self.MAX_POSSIBLE_BRAKE, 1)
|
|
|
|
def test_gas_safety_check(self):
|
|
self._generic_limit_safety_check(self._send_gas_msg, self.MIN_GAS, self.MAX_GAS, 0, self.MAX_POSSIBLE_GAS, 1, self.INACTIVE_GAS)
|
|
|
|
|
|
class TorqueSteeringSafetyTestBase(PandaSafetyTestBase, abc.ABC):
|
|
|
|
MAX_RATE_UP = 0
|
|
MAX_RATE_DOWN = 0
|
|
MAX_TORQUE = 0
|
|
MAX_RT_DELTA = 0
|
|
RT_INTERVAL = 0
|
|
|
|
NO_STEER_REQ_BIT = False
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "TorqueSteeringSafetyTestBase":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _torque_cmd_msg(self, torque, steer_req=1):
|
|
pass
|
|
|
|
def _set_prev_torque(self, t):
|
|
self.safety.set_desired_torque_last(t)
|
|
self.safety.set_rt_torque_last(t)
|
|
|
|
def test_steer_safety_check(self):
|
|
for enabled in [0, 1]:
|
|
for t in range(int(-self.MAX_TORQUE * 1.5), int(self.MAX_TORQUE * 1.5)):
|
|
self.safety.set_controls_allowed(enabled)
|
|
self._set_prev_torque(t)
|
|
if abs(t) > self.MAX_TORQUE or (not enabled and abs(t) > 0):
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(t)))
|
|
else:
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
|
|
def test_non_realtime_limit_up(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
self._set_prev_torque(0)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP)))
|
|
self._set_prev_torque(0)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(-self.MAX_RATE_UP)))
|
|
|
|
self._set_prev_torque(0)
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP + 1)))
|
|
self.safety.set_controls_allowed(True)
|
|
self._set_prev_torque(0)
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(-self.MAX_RATE_UP - 1)))
|
|
|
|
def test_steer_req_bit(self):
|
|
"""Asserts all torque safety modes check the steering request bit"""
|
|
if self.NO_STEER_REQ_BIT:
|
|
raise unittest.SkipTest("No steering request bit")
|
|
|
|
self.safety.set_controls_allowed(True)
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
|
|
# Send torque successfully, then only drop the request bit and ensure it stays blocked
|
|
for _ in range(10):
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, 1)))
|
|
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, 0)))
|
|
for _ in range(10):
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, 1)))
|
|
|
|
|
|
class SteerRequestCutSafetyTest(TorqueSteeringSafetyTestBase, abc.ABC):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "SteerRequestCutSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
# Safety around steering request bit mismatch tolerance
|
|
MIN_VALID_STEERING_FRAMES: int
|
|
MAX_INVALID_STEERING_FRAMES: int
|
|
MIN_VALID_STEERING_RT_INTERVAL: int
|
|
|
|
def test_steer_req_bit_frames(self):
|
|
"""
|
|
Certain safety modes implement some tolerance on their steer request bits matching the
|
|
requested torque to avoid a steering fault or lockout and maintain torque. This tests:
|
|
- We can't cut torque for more than one frame
|
|
- We can't cut torque until at least the minimum number of matching steer_req messages
|
|
- We can always recover from violations if steer_req=1
|
|
"""
|
|
|
|
for min_valid_steer_frames in range(self.MIN_VALID_STEERING_FRAMES * 2):
|
|
# Reset match count and rt timer to allow cut (valid_steer_req_count, ts_steer_req_mismatch_last)
|
|
self.safety.init_tests()
|
|
self.safety.set_timer(self.MIN_VALID_STEERING_RT_INTERVAL)
|
|
|
|
# Allow torque cut
|
|
self.safety.set_controls_allowed(True)
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
for _ in range(min_valid_steer_frames):
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
|
|
# should tx if we've sent enough valid frames, and we're not cutting torque for too many frames consecutively
|
|
should_tx = min_valid_steer_frames >= self.MIN_VALID_STEERING_FRAMES
|
|
for idx in range(self.MAX_INVALID_STEERING_FRAMES * 2):
|
|
tx = self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0))
|
|
self.assertEqual(should_tx and idx < self.MAX_INVALID_STEERING_FRAMES, tx)
|
|
|
|
# Keep blocking after one steer_req mismatch
|
|
for _ in range(100):
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
|
|
|
|
# Make sure we can recover
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(0, steer_req=1)))
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
|
|
def test_steer_req_bit_multi_invalid(self):
|
|
"""
|
|
For safety modes allowing multiple consecutive invalid frames, this ensures that once a valid frame
|
|
is sent after an invalid frame (even without sending the max number of allowed invalid frames),
|
|
all counters are reset.
|
|
"""
|
|
for max_invalid_steer_frames in range(1, self.MAX_INVALID_STEERING_FRAMES * 2):
|
|
self.safety.init_tests()
|
|
self.safety.set_timer(self.MIN_VALID_STEERING_RT_INTERVAL)
|
|
|
|
# Allow torque cut
|
|
self.safety.set_controls_allowed(True)
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
for _ in range(self.MIN_VALID_STEERING_FRAMES):
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
|
|
# Send partial amount of allowed invalid frames
|
|
for idx in range(max_invalid_steer_frames):
|
|
should_tx = idx < self.MAX_INVALID_STEERING_FRAMES
|
|
self.assertEqual(should_tx, self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
|
|
|
|
# Send one valid frame, and subsequent invalid should now be blocked
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
for _ in range(self.MIN_VALID_STEERING_FRAMES + 1):
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
|
|
|
|
def test_steer_req_bit_realtime(self):
|
|
"""
|
|
Realtime safety for cutting steer request bit. This tests:
|
|
- That we allow messages with mismatching steer request bit if time from last is >= MIN_VALID_STEERING_RT_INTERVAL
|
|
- That frame mismatch safety does not interfere with this test
|
|
"""
|
|
for rt_us in np.arange(self.MIN_VALID_STEERING_RT_INTERVAL - 50000, self.MIN_VALID_STEERING_RT_INTERVAL + 50000, 10000):
|
|
# Reset match count and rt timer (valid_steer_req_count, ts_steer_req_mismatch_last)
|
|
self.safety.init_tests()
|
|
|
|
# Make sure valid_steer_req_count doesn't affect this test
|
|
self.safety.set_controls_allowed(True)
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
for _ in range(self.MIN_VALID_STEERING_FRAMES):
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
|
|
# Normally, sending MIN_VALID_STEERING_FRAMES valid frames should always allow
|
|
self.safety.set_timer(max(rt_us, 0))
|
|
should_tx = rt_us >= self.MIN_VALID_STEERING_RT_INTERVAL
|
|
for _ in range(self.MAX_INVALID_STEERING_FRAMES):
|
|
self.assertEqual(should_tx, self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
|
|
|
|
# Keep blocking after one steer_req mismatch
|
|
for _ in range(100):
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
|
|
|
|
# Make sure we can recover
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(0, steer_req=1)))
|
|
self._set_prev_torque(self.MAX_TORQUE)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
|
|
|
|
|
|
class DriverTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase, abc.ABC):
|
|
|
|
DRIVER_TORQUE_ALLOWANCE = 0
|
|
DRIVER_TORQUE_FACTOR = 0
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "DriverTorqueSteeringSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _torque_driver_msg(self, torque):
|
|
pass
|
|
|
|
def _reset_torque_driver_measurement(self, torque):
|
|
for _ in range(MAX_SAMPLE_VALS):
|
|
self._rx(self._torque_driver_msg(torque))
|
|
|
|
def test_non_realtime_limit_up(self):
|
|
self._reset_torque_driver_measurement(0)
|
|
super().test_non_realtime_limit_up()
|
|
|
|
def test_against_torque_driver(self):
|
|
# Tests down limits and driver torque blending
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
# Cannot stay at MAX_TORQUE if above DRIVER_TORQUE_ALLOWANCE
|
|
for sign in [-1, 1]:
|
|
for driver_torque in np.arange(0, self.DRIVER_TORQUE_ALLOWANCE * 2, 1):
|
|
self._reset_torque_driver_measurement(-driver_torque * sign)
|
|
self._set_prev_torque(self.MAX_TORQUE * sign)
|
|
should_tx = abs(driver_torque) <= self.DRIVER_TORQUE_ALLOWANCE
|
|
self.assertEqual(should_tx, self._tx(self._torque_cmd_msg(self.MAX_TORQUE * sign)))
|
|
|
|
# arbitrary high driver torque to ensure max steer torque is allowed
|
|
max_driver_torque = int(self.MAX_TORQUE / self.DRIVER_TORQUE_FACTOR + self.DRIVER_TORQUE_ALLOWANCE + 1)
|
|
|
|
# spot check some individual cases
|
|
for sign in [-1, 1]:
|
|
# Ensure we wind down factor units for every unit above allowance
|
|
driver_torque = (self.DRIVER_TORQUE_ALLOWANCE + 10) * sign
|
|
torque_desired = (self.MAX_TORQUE - 10 * self.DRIVER_TORQUE_FACTOR) * sign
|
|
delta = 1 * sign
|
|
self._set_prev_torque(torque_desired)
|
|
self._reset_torque_driver_measurement(-driver_torque)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(torque_desired)))
|
|
self._set_prev_torque(torque_desired + delta)
|
|
self._reset_torque_driver_measurement(-driver_torque)
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(torque_desired + delta)))
|
|
|
|
# If we're well past the allowance, minimum wind down is MAX_RATE_DOWN
|
|
self._set_prev_torque(self.MAX_TORQUE * sign)
|
|
self._reset_torque_driver_measurement(-max_driver_torque * sign)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg((self.MAX_TORQUE - self.MAX_RATE_DOWN) * sign)))
|
|
self._set_prev_torque(self.MAX_TORQUE * sign)
|
|
self._reset_torque_driver_measurement(-max_driver_torque * sign)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(0)))
|
|
self._set_prev_torque(self.MAX_TORQUE * sign)
|
|
self._reset_torque_driver_measurement(-max_driver_torque * sign)
|
|
self.assertFalse(self._tx(self._torque_cmd_msg((self.MAX_TORQUE - self.MAX_RATE_DOWN + 1) * sign)))
|
|
|
|
def test_realtime_limits(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
for sign in [-1, 1]:
|
|
self.safety.init_tests()
|
|
self._set_prev_torque(0)
|
|
self._reset_torque_driver_measurement(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA, 1):
|
|
t *= sign
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA, 1):
|
|
t *= sign
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
|
|
# Increase timer to update rt_torque_last
|
|
self.safety.set_timer(self.RT_INTERVAL + 1)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA - 1))))
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
def test_reset_driver_torque_measurements(self):
|
|
# Tests that the driver torque measurement sample_t is reset on safety mode init
|
|
for t in np.linspace(-self.MAX_TORQUE, self.MAX_TORQUE, MAX_SAMPLE_VALS):
|
|
self.assertTrue(self._rx(self._torque_driver_msg(t)))
|
|
|
|
self.assertNotEqual(self.safety.get_torque_driver_min(), 0)
|
|
self.assertNotEqual(self.safety.get_torque_driver_max(), 0)
|
|
|
|
self._reset_safety_hooks()
|
|
self.assertEqual(self.safety.get_torque_driver_min(), 0)
|
|
self.assertEqual(self.safety.get_torque_driver_max(), 0)
|
|
|
|
|
|
class MotorTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase, abc.ABC):
|
|
|
|
MAX_TORQUE_ERROR = 0
|
|
TORQUE_MEAS_TOLERANCE = 0
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "MotorTorqueSteeringSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _torque_meas_msg(self, torque):
|
|
pass
|
|
|
|
def _set_prev_torque(self, t):
|
|
super()._set_prev_torque(t)
|
|
self.safety.set_torque_meas(t, t)
|
|
|
|
def test_torque_absolute_limits(self):
|
|
for controls_allowed in [True, False]:
|
|
for torque in np.arange(-self.MAX_TORQUE - 1000, self.MAX_TORQUE + 1000, self.MAX_RATE_UP):
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
self.safety.set_rt_torque_last(torque)
|
|
self.safety.set_torque_meas(torque, torque)
|
|
self.safety.set_desired_torque_last(torque - self.MAX_RATE_UP)
|
|
|
|
if controls_allowed:
|
|
send = (-self.MAX_TORQUE <= torque <= self.MAX_TORQUE)
|
|
else:
|
|
send = torque == 0
|
|
|
|
self.assertEqual(send, self._tx(self._torque_cmd_msg(torque)))
|
|
|
|
def test_non_realtime_limit_down(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
torque_meas = self.MAX_TORQUE - self.MAX_TORQUE_ERROR - 50
|
|
|
|
self.safety.set_rt_torque_last(self.MAX_TORQUE)
|
|
self.safety.set_torque_meas(torque_meas, torque_meas)
|
|
self.safety.set_desired_torque_last(self.MAX_TORQUE)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN)))
|
|
|
|
self.safety.set_rt_torque_last(self.MAX_TORQUE)
|
|
self.safety.set_torque_meas(torque_meas, torque_meas)
|
|
self.safety.set_desired_torque_last(self.MAX_TORQUE)
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN + 1)))
|
|
|
|
def test_exceed_torque_sensor(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
for sign in [-1, 1]:
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_TORQUE_ERROR + 2, 2): # step needs to be smaller than MAX_TORQUE_ERROR
|
|
t *= sign
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_TORQUE_ERROR + 2))))
|
|
|
|
def test_realtime_limit_up(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
for sign in [-1, 1]:
|
|
self.safety.init_tests()
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
|
|
t *= sign
|
|
self.safety.set_torque_meas(t, t)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
|
|
t *= sign
|
|
self.safety.set_torque_meas(t, t)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
|
|
|
|
# Increase timer to update rt_torque_last
|
|
self.safety.set_timer(self.RT_INTERVAL + 1)
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(sign * self.MAX_RT_DELTA)))
|
|
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
def test_torque_measurements(self):
|
|
trq = 50
|
|
for t in [trq, -trq, 0, 0, 0, 0]:
|
|
self._rx(self._torque_meas_msg(t))
|
|
|
|
max_range = range(trq, trq + self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
max_range = range(self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
|
|
self._rx(self._torque_meas_msg(0))
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
max_range = range(self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-self.TORQUE_MEAS_TOLERANCE, 0 + 1)
|
|
self._rx(self._torque_meas_msg(0))
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
def test_reset_torque_measurements(self):
|
|
# Tests that the torque measurement sample_t is reset on safety mode init
|
|
for t in np.linspace(-self.MAX_TORQUE, self.MAX_TORQUE, MAX_SAMPLE_VALS):
|
|
self.assertTrue(self._rx(self._torque_meas_msg(t)))
|
|
|
|
self.assertNotEqual(self.safety.get_torque_meas_min(), 0)
|
|
self.assertNotEqual(self.safety.get_torque_meas_max(), 0)
|
|
|
|
self._reset_safety_hooks()
|
|
self.assertEqual(self.safety.get_torque_meas_min(), 0)
|
|
self.assertEqual(self.safety.get_torque_meas_max(), 0)
|
|
|
|
|
|
class AngleSteeringSafetyTest(PandaSafetyTestBase):
|
|
|
|
DEG_TO_CAN: float
|
|
ANGLE_RATE_BP: list[float]
|
|
ANGLE_RATE_UP: list[float] # windup limit
|
|
ANGLE_RATE_DOWN: list[float] # unwind limit
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "AngleSteeringSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _speed_msg(self, speed):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _angle_cmd_msg(self, angle: float, enabled: bool):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _angle_meas_msg(self, angle: float):
|
|
pass
|
|
|
|
def _set_prev_desired_angle(self, t):
|
|
t = round(t * self.DEG_TO_CAN)
|
|
self.safety.set_desired_angle_last(t)
|
|
|
|
def _reset_angle_measurement(self, angle):
|
|
for _ in range(MAX_SAMPLE_VALS):
|
|
self._rx(self._angle_meas_msg(angle))
|
|
|
|
def _reset_speed_measurement(self, speed):
|
|
for _ in range(MAX_SAMPLE_VALS):
|
|
self._rx(self._speed_msg(speed))
|
|
|
|
def test_vehicle_speed_measurements(self):
|
|
self._common_measurement_test(self._speed_msg, 0, 80, VEHICLE_SPEED_FACTOR, self.safety.get_vehicle_speed_min, self.safety.get_vehicle_speed_max)
|
|
|
|
def test_steering_angle_measurements(self, max_angle=300):
|
|
self._common_measurement_test(self._angle_meas_msg, -max_angle, max_angle, self.DEG_TO_CAN, self.safety.get_angle_meas_min, self.safety.get_angle_meas_max)
|
|
|
|
def test_angle_cmd_when_enabled(self, max_angle=300):
|
|
# when controls are allowed, angle cmd rate limit is enforced
|
|
speeds = [0., 1., 5., 10., 15., 50.]
|
|
angles = np.concatenate((np.arange(-max_angle, max_angle, 5), [0]))
|
|
for a in angles:
|
|
for s in speeds:
|
|
max_delta_up = np.interp(s, self.ANGLE_RATE_BP, self.ANGLE_RATE_UP)
|
|
max_delta_down = np.interp(s, self.ANGLE_RATE_BP, self.ANGLE_RATE_DOWN)
|
|
|
|
# first test against false positives
|
|
self._reset_angle_measurement(a)
|
|
self._reset_speed_measurement(s)
|
|
|
|
self._set_prev_desired_angle(a)
|
|
self.safety.set_controls_allowed(1)
|
|
|
|
# Stay within limits
|
|
# Up
|
|
self.assertTrue(self._tx(self._angle_cmd_msg(a + sign_of(a) * max_delta_up, True)))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
# Don't change
|
|
self.assertTrue(self._tx(self._angle_cmd_msg(a, True)))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
# Down
|
|
self.assertTrue(self._tx(self._angle_cmd_msg(a - sign_of(a) * max_delta_down, True)))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
# Inject too high rates
|
|
# Up
|
|
self.assertFalse(self._tx(self._angle_cmd_msg(a + sign_of(a) * (max_delta_up + 1.1), True)))
|
|
|
|
# Don't change
|
|
self.safety.set_controls_allowed(1)
|
|
self._set_prev_desired_angle(a)
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.assertTrue(self._tx(self._angle_cmd_msg(a, True)))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
# Down
|
|
self.assertFalse(self._tx(self._angle_cmd_msg(a - sign_of(a) * (max_delta_down + 1.1), True)))
|
|
|
|
# Check desired steer should be the same as steer angle when controls are off
|
|
self.safety.set_controls_allowed(0)
|
|
self.assertTrue(self._tx(self._angle_cmd_msg(a, False)))
|
|
|
|
def test_angle_cmd_when_disabled(self):
|
|
# Tests that only angles close to the meas are allowed while
|
|
# steer actuation bit is 0, regardless of controls allowed.
|
|
for controls_allowed in (True, False):
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
|
|
for steer_control_enabled in (True, False):
|
|
for angle_meas in np.arange(-90, 91, 10):
|
|
self._reset_angle_measurement(angle_meas)
|
|
|
|
for angle_cmd in np.arange(-90, 91, 10):
|
|
self._set_prev_desired_angle(angle_cmd)
|
|
|
|
# controls_allowed is checked if actuation bit is 1, else the angle must be close to meas (inactive)
|
|
should_tx = controls_allowed if steer_control_enabled else angle_cmd == angle_meas
|
|
self.assertEqual(should_tx, self._tx(self._angle_cmd_msg(angle_cmd, steer_control_enabled)))
|
|
|
|
|
|
class PandaSafetyTest(PandaSafetyTestBase):
|
|
TX_MSGS: list[list[int]] | None = None
|
|
SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space
|
|
*range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing
|
|
*range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing
|
|
*range(0x3300, 0x3400)] # Honda
|
|
FWD_BLACKLISTED_ADDRS: dict[int, list[int]] = {} # {bus: [addr]}
|
|
FWD_BUS_LOOKUP: dict[int, int] = {}
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "PandaSafetyTest" or cls.__name__.endswith('Base'):
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
# ***** standard tests for all safety modes *****
|
|
|
|
def test_tx_msg_in_scanned_range(self):
|
|
# the relay malfunction, fwd hook, and spam can tests don't exhaustively
|
|
# scan the entire 29-bit address space, only some known important ranges
|
|
# make sure SCANNED_ADDRS stays up to date with car port TX_MSGS; new
|
|
# model ports should expand the range if needed
|
|
for msg in self.TX_MSGS:
|
|
self.assertTrue(msg[0] in self.SCANNED_ADDRS, f"{msg[0]=:#x}")
|
|
|
|
def test_fwd_hook(self):
|
|
# some safety modes don't forward anything, while others blacklist msgs
|
|
for bus in range(3):
|
|
for addr in self.SCANNED_ADDRS:
|
|
# assume len 8
|
|
fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1)
|
|
if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]:
|
|
fwd_bus = -1
|
|
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, addr), f"{addr=:#x} from {bus=} to {fwd_bus=}")
|
|
|
|
def test_spam_can_buses(self):
|
|
for bus in range(4):
|
|
for addr in self.SCANNED_ADDRS:
|
|
if [addr, bus] not in self.TX_MSGS:
|
|
self.assertFalse(self._tx(make_msg(bus, addr, 8)), f"allowed TX {addr=} {bus=}")
|
|
|
|
def test_default_controls_not_allowed(self):
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_manually_enable_controls_allowed(self):
|
|
self.safety.set_controls_allowed(1)
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.safety.set_controls_allowed(0)
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_tx_hook_on_wrong_safety_mode(self):
|
|
files = os.listdir(os.path.dirname(os.path.realpath(__file__)))
|
|
test_files = [f for f in files if f.startswith("test_") and f.endswith(".py")]
|
|
|
|
current_test = self.__class__.__name__
|
|
|
|
all_tx = []
|
|
for tf in test_files:
|
|
test = importlib.import_module("panda.tests.safety."+tf[:-3])
|
|
for attr in dir(test):
|
|
if attr.startswith("Test") and attr != current_test:
|
|
tc = getattr(test, attr)
|
|
tx = tc.TX_MSGS
|
|
if tx is not None and not attr.endswith('Base'):
|
|
# No point in comparing different Tesla safety modes
|
|
if 'Tesla' in attr and 'Tesla' in current_test:
|
|
continue
|
|
# No point in comparing to ALLOUTPUT which allows all messages
|
|
if attr.startswith('TestAllOutput'):
|
|
continue
|
|
if attr.startswith('TestToyota') and current_test.startswith('TestToyota'):
|
|
continue
|
|
if attr.startswith('TestSubaruGen') and current_test.startswith('TestSubaruGen'):
|
|
continue
|
|
if attr.startswith('TestSubaruPreglobal') and current_test.startswith('TestSubaruPreglobal'):
|
|
continue
|
|
if {attr, current_test}.issubset({'TestVolkswagenPqSafety', 'TestVolkswagenPqStockSafety', 'TestVolkswagenPqLongSafety'}):
|
|
continue
|
|
if {attr, current_test}.issubset({'TestGmCameraSafety', 'TestGmCameraLongitudinalSafety'}):
|
|
continue
|
|
if attr.startswith('TestFord') and current_test.startswith('TestFord'):
|
|
continue
|
|
if attr.startswith('TestHyundaiCanfd') and current_test.startswith('TestHyundaiCanfd'):
|
|
continue
|
|
if {attr, current_test}.issubset({'TestVolkswagenMqbSafety', 'TestVolkswagenMqbStockSafety', 'TestVolkswagenMqbLongSafety'}):
|
|
continue
|
|
|
|
# overlapping TX addrs, but they're not actuating messages for either car
|
|
if attr == 'TestHyundaiCanfdHDA2LongEV' and current_test.startswith('TestToyota'):
|
|
tx = list(filter(lambda m: m[0] not in [0x160, ], tx))
|
|
|
|
# Volkswagen MQB longitudinal actuating message overlaps with the Subaru lateral actuating message
|
|
if attr == 'TestVolkswagenMqbLongSafety' and current_test.startswith('TestSubaru'):
|
|
tx = list(filter(lambda m: m[0] not in [0x122, ], tx))
|
|
|
|
# Volkswagen MQB and Honda Nidec ACC HUD messages overlap
|
|
if attr == 'TestVolkswagenMqbLongSafety' and current_test.startswith('TestHondaNidec'):
|
|
tx = list(filter(lambda m: m[0] not in [0x30c, ], tx))
|
|
|
|
# Volkswagen MQB and Honda Bosch Radarless ACC HUD messages overlap
|
|
if attr == 'TestVolkswagenMqbLongSafety' and current_test.startswith('TestHondaBoschRadarless'):
|
|
tx = list(filter(lambda m: m[0] not in [0x30c, ], tx))
|
|
|
|
# TODO: Temporary, should be fixed in panda firmware, safety_honda.h
|
|
if attr.startswith('TestHonda'):
|
|
# exceptions for common msgs across different hondas
|
|
tx = list(filter(lambda m: m[0] not in [0x1FA, 0x30C, 0x33D, 0x33DB], tx))
|
|
|
|
# TODO-SP: Temporary, should be fixed in panda firmware, safety_hyundai.h
|
|
if attr.startswith('TestHyundaiLongitudinal'):
|
|
# exceptions for common msgs across different Hyundai CAN platforms
|
|
tx = list(filter(lambda m: m[0] not in [0x420, 0x50A, 0x389, 0x4A2], tx))
|
|
all_tx.append([[m[0], m[1], attr] for m in tx])
|
|
|
|
# make sure we got all the msgs
|
|
self.assertTrue(len(all_tx) >= len(test_files)-1)
|
|
|
|
for tx_msgs in all_tx:
|
|
for addr, bus, test_name in tx_msgs:
|
|
msg = make_msg(bus, addr)
|
|
self.safety.set_controls_allowed(1)
|
|
# TODO: this should be blocked
|
|
if current_test in ["TestNissanSafety", "TestNissanSafetyAltEpsBus", "TestNissanLeafSafety"] and [addr, bus] in self.TX_MSGS:
|
|
continue
|
|
self.assertFalse(self._tx(msg), f"transmit of {addr=:#x} {bus=} from {test_name} during {current_test} was allowed")
|
|
|
|
|
|
@add_regen_tests
|
|
class PandaCarSafetyTest(PandaSafetyTest, MadsCommonBase):
|
|
STANDSTILL_THRESHOLD: float | None = None
|
|
GAS_PRESSED_THRESHOLD = 0
|
|
RELAY_MALFUNCTION_ADDRS: dict[int, tuple[int, ...]] | None = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "PandaCarSafetyTest" or cls.__name__.endswith('Base'):
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _user_brake_msg(self, brake):
|
|
pass
|
|
|
|
def _user_regen_msg(self, regen):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _speed_msg(self, speed):
|
|
pass
|
|
|
|
# Safety modes can override if vehicle_moving is driven by a different message
|
|
def _vehicle_moving_msg(self, speed: float):
|
|
return self._speed_msg(speed)
|
|
|
|
@abc.abstractmethod
|
|
def _user_gas_msg(self, gas):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _pcm_status_msg(self, enable):
|
|
pass
|
|
|
|
# ***** standard tests for all car-specific safety modes *****
|
|
|
|
def test_relay_malfunction(self):
|
|
# each car has an addr that is used to detect relay malfunction
|
|
# if that addr is seen on specified bus, triggers the relay malfunction
|
|
# protection logic: both tx_hook and fwd_hook are expected to return failure
|
|
self.assertFalse(self.safety.get_relay_malfunction())
|
|
for bus in range(3):
|
|
for addr in self.SCANNED_ADDRS:
|
|
self.safety.set_relay_malfunction(False)
|
|
self._rx(make_msg(bus, addr, 8))
|
|
should_relay_malfunction = addr in self.RELAY_MALFUNCTION_ADDRS.get(bus, ())
|
|
self.assertEqual(should_relay_malfunction, self.safety.get_relay_malfunction(), (bus, addr))
|
|
|
|
# test relay malfunction protection logic
|
|
self.safety.set_relay_malfunction(True)
|
|
for bus in range(3):
|
|
for addr in self.SCANNED_ADDRS:
|
|
self.assertFalse(self._tx(make_msg(bus, addr, 8)))
|
|
self.assertEqual(-1, self.safety.safety_fwd_hook(bus, addr))
|
|
|
|
def test_prev_gas(self):
|
|
self.assertFalse(self.safety.get_gas_pressed_prev())
|
|
for pressed in [self.GAS_PRESSED_THRESHOLD + 1, 0]:
|
|
self._rx(self._user_gas_msg(pressed))
|
|
self.assertEqual(bool(pressed), self.safety.get_gas_pressed_prev())
|
|
|
|
def test_allow_engage_with_gas_pressed(self):
|
|
self._rx(self._user_gas_msg(1))
|
|
self.safety.set_controls_allowed(True)
|
|
self._rx(self._user_gas_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._user_gas_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
def test_disengage_on_gas(self):
|
|
self._rx(self._user_gas_msg(0))
|
|
self.safety.set_controls_allowed(True)
|
|
self._rx(self._user_gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_alternative_experience_no_disengage_on_gas(self):
|
|
self._rx(self._user_gas_msg(0))
|
|
self.safety.set_controls_allowed(True)
|
|
self.safety.set_alternative_experience(ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS)
|
|
self._rx(self._user_gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
|
|
# Test we allow lateral, but not longitudinal
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.assertFalse(self.safety.get_longitudinal_allowed())
|
|
# Make sure we can re-gain longitudinal actuation
|
|
self._rx(self._user_gas_msg(0))
|
|
self.assertTrue(self.safety.get_longitudinal_allowed())
|
|
|
|
def test_prev_user_brake(self, _user_brake_msg=None, get_brake_pressed_prev=None):
|
|
if _user_brake_msg is None:
|
|
_user_brake_msg = self._user_brake_msg
|
|
get_brake_pressed_prev = self.safety.get_brake_pressed_prev
|
|
|
|
self.assertFalse(get_brake_pressed_prev())
|
|
for pressed in [True, False]:
|
|
self._rx(_user_brake_msg(not pressed))
|
|
self.assertEqual(not pressed, get_brake_pressed_prev())
|
|
self._rx(_user_brake_msg(pressed))
|
|
self.assertEqual(pressed, get_brake_pressed_prev())
|
|
|
|
def test_enable_control_allowed_from_cruise(self):
|
|
self._rx(self._pcm_status_msg(False))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self._rx(self._pcm_status_msg(True))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
def test_disable_control_allowed_from_cruise(self):
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._pcm_status_msg(False))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_cruise_engaged_prev(self):
|
|
for engaged in [True, False]:
|
|
self._rx(self._pcm_status_msg(engaged))
|
|
self.assertEqual(engaged, self.safety.get_cruise_engaged_prev())
|
|
self._rx(self._pcm_status_msg(not engaged))
|
|
self.assertEqual(not engaged, self.safety.get_cruise_engaged_prev())
|
|
|
|
def test_allow_user_brake_at_zero_speed(self, _user_brake_msg=None, get_brake_pressed_prev=None):
|
|
if _user_brake_msg is None:
|
|
_user_brake_msg = self._user_brake_msg
|
|
|
|
# Brake was already pressed
|
|
self._rx(self._vehicle_moving_msg(0))
|
|
self._rx(_user_brake_msg(1))
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(_user_brake_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.assertTrue(self.safety.get_longitudinal_allowed())
|
|
self._rx(_user_brake_msg(0))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.assertTrue(self.safety.get_longitudinal_allowed())
|
|
# rising edge of brake should disengage
|
|
self._rx(_user_brake_msg(1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self.assertFalse(self.safety.get_longitudinal_allowed())
|
|
self._rx(_user_brake_msg(0)) # reset no brakes
|
|
|
|
def test_not_allow_user_brake_when_moving(self, _user_brake_msg=None, get_brake_pressed_prev=None):
|
|
if _user_brake_msg is None:
|
|
_user_brake_msg = self._user_brake_msg
|
|
|
|
# Brake was already pressed
|
|
self._rx(_user_brake_msg(1))
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._vehicle_moving_msg(self.STANDSTILL_THRESHOLD))
|
|
self._rx(_user_brake_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.assertTrue(self.safety.get_longitudinal_allowed())
|
|
self._rx(self._vehicle_moving_msg(self.STANDSTILL_THRESHOLD + 1))
|
|
self._rx(_user_brake_msg(1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self.assertFalse(self.safety.get_longitudinal_allowed())
|
|
self._rx(self._vehicle_moving_msg(0))
|
|
|
|
def test_vehicle_moving(self):
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# not moving
|
|
self._rx(self._vehicle_moving_msg(0))
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# speed is at threshold
|
|
self._rx(self._vehicle_moving_msg(self.STANDSTILL_THRESHOLD))
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# past threshold
|
|
self._rx(self._vehicle_moving_msg(self.STANDSTILL_THRESHOLD + 1))
|
|
self.assertTrue(self.safety.get_vehicle_moving())
|
|
|
|
def test_safety_tick(self):
|
|
self.safety.set_timer(int(2e6))
|
|
self.safety.set_controls_allowed(True)
|
|
self.safety.set_controls_allowed_lat(True)
|
|
self.safety.safety_tick_current_safety_config()
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self.assertFalse(self.safety.get_controls_allowed_lat())
|
|
self.assertFalse(self.safety.safety_config_valid())
|