Files
panda-meb/tests/safety/common.py

749 lines
30 KiB
Python
Raw Normal View History

import os
import abc
import unittest
import importlib
import numpy as np
from typing import Optional, List, Dict
from opendbc.can.packer import CANPacker # pylint: disable=import-error
from panda import ALTERNATIVE_EXPERIENCE, LEN_TO_DLC
2019-11-15 00:52:34 -08:00
from panda.tests.safety import libpandasafety_py
MAX_WRONG_COUNTERS = 5
def package_can_msg(msg):
addr, _, dat, bus = msg
CAN_FIFOMailBox to CANPacket struct + USB dynamic packet size (#739) * Squashed commits, no cleanup * Few fixes * No init = garbage * Only receive with new canpacket * Add send with canpacket * Revert "Add send with canpacket" This reverts commit 7d06686ddd6d447c714b5289d31af24403d36931. * Packet must be aligned to word, or bad performance * Cleaner * Fix tests * Tests... * MISRA 10.4 * More MISRA * libpandasafety_py * cffi * even more tests... * typo * ... * ... * ... * Slight cleanup * MISRA 6.1 * MISRA 17.7 * Bug in bxcan + even style * MISRA 10.1 * Revert "MISRA 10.1" This reverts commit 404ae7fcc39556f80f528de9015702e69f4ea0a5. * ... * MISRA 10.1 and 10.4 suppress until next PR * MISRA 20.1 * ... * test_honda * ... * ... * test_toyota * test_volkswagen_mqb * test_volkswagen_pq * Sketchy thing... * Revert "Sketchy thing..." This reverts commit 3b2e5715bdc1954f7b7b3b7469ba3d0eaa06bdf9. * remove comment * bxcan extended address bug * Concept, experimental dynamic usb packet size * increase each buffer to 10240 bytes * raise python bulk read/write limits * ... * Move packet size to start * Experimental send, stream-like * New receive test, stream-like * cleanup * cleanup + rebase fixes * MISRA * Extra receive method, stream-like, commented out * type change * Revert back to buffer for send, stream commented * forgot ZLP * lower buffer, add rx failsafe * ... remove ZLP * return ZLP back * Add tx checks to panda fw * TX stream with counter * fix counter overflow * 13 free slots should be enough * limit tx usb packet * ... * Revert max_bulk_msg doubling * python lib improve speed * Stream with counter for RX, dirty, needs cleanup * Increase chunk length to 4096 bytes * cleanup fdcan.h * cleanup __init__.py * MISRA 12.1 * MISRA 10.8 * remove non-streaming usb functions * more main.c cleanup * MISRA 15.6 * MISRA 15.5 * MISRA 18.4 and suppress objectIndex * handling usb pakcets > 63bytes, naming and cleanup * Cleanup old from tests and update CANPacket_t struct * Switch to 4 bit DLC instead of 6 bit length * ops) * ... * pylint * receive python buffer increase * USB increase receive packet len * tweak buffers * No need for so high limits * MISRA 20.1 workaround * performance tweaks * cleanup, dlc to data_len_code naming * main.c naming * comments and cleanup for main.c usb * clean py lib * pylint * do not discard good rx messages on stream fail * cleanups * naming * remove bitstruct lib and lower tx limit * bitstruct lefovers * fix bug in VW test * remove adjusting data size and assert on wrong len * ... * test new memcpy before merging * Revert "test new memcpy before merging" This reverts commit 399465a264835061adabdd785718c4b6fc18c267. * macros for to/fromuint8_t array * MISRA hates me! * tests.c include macros instead * move CANPacket to can_definitions.h * vw_pq python test fix * new memcpy test, REMOVE * check without alignment * revert macros for uint8 arrays * Revert "revert macros for uint8 arrays" This reverts commit 581a9db735a42d0d68200bd270d87a8fd34e43fe. * check assert * Revert "check assert" This reverts commit 9e970d029a50597a1718b2bb0260196c050fd77f. * one more variation * Revert "one more variation" This reverts commit f6c0528b7ac7e125750dc0d9445c7ce97f6954b5. * what about read performance * Revert "what about read performance" This reverts commit d2610f90958a816fe7f1822157a84f85e97d9249. * check struct alignment to word * check for aligned memcpy again * cleanup * add CANPacket structure diagram * update CANPacket and add USB packet struct * bugfix + refactoring of EP1 * move dlc_to_len to header * missed include * typo... * MISRA * fk * lower MAX_CAN_MSGS_PER_BULK_TRANSFER * bump CAN_PACKET_VERSION to 2 * bump python lib CAN_PACKET_VERSION to 2 * rename parse_can_buffer to unpack_can_buffer * CANPacket_t const fields * Revert "CANPacket_t const fields" This reverts commit cf91c035b7706a14e317550c5f0501ae3fce7c70. * test.c relative path * cleanup * move macros to safety_declarations * Refactor pack/unpack funcs and add unittest * usb_protocol.h * oops * Update .github/workflows/test.yaml Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * remove print from unittest Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2021-11-12 16:36:34 -08:00
ret = libpandasafety_py.ffi.new('CANPacket_t *')
ret[0].extended = 1 if addr >= 0x800 else 0
ret[0].addr = addr
ret[0].data_len_code = LEN_TO_DLC[len(dat)]
ret[0].bus = bus
ret[0].data = bytes(dat)
return ret
def make_msg(bus, addr, length=8):
return package_can_msg([addr, 0, b'\x00' * length, bus])
2019-11-15 00:52:34 -08:00
class CANPackerPanda(CANPacker):
def make_can_msg_panda(self, name_or_addr, bus, values, fix_checksum=None):
msg = self.make_can_msg(name_or_addr, bus, values)
if fix_checksum is not None:
msg = fix_checksum(msg)
return package_can_msg(msg)
def add_regen_tests(cls):
"""Dynamically adds regen tests for all user brake tests."""
# only rx/user brake tests, not brake command
found_tests = [func for func in dir(cls) if func.startswith("test_") and "user_brake" in func]
assert len(found_tests) >= 3, "Failed to detect known brake tests"
for test in found_tests:
def _make_regen_test(brake_func):
def _regen_test(self):
# only for safety modes with a regen message
if self._user_regen_msg(0) is None:
raise unittest.SkipTest("Safety mode implements no _user_regen_msg")
getattr(self, brake_func)(self._user_regen_msg, self.safety.get_regen_braking_prev)
return _regen_test
setattr(cls, test.replace("brake", "regen"), _make_regen_test(test))
return cls
class PandaSafetyTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
if cls.__name__ == "PandaSafetyTestBase":
cls.safety = None
raise unittest.SkipTest
def _rx(self, msg):
return self.safety.safety_rx_hook(msg)
def _tx(self, msg):
return self.safety.safety_tx_hook(msg)
class InterceptorSafetyTest(PandaSafetyTestBase):
INTERCEPTOR_THRESHOLD = 0
@classmethod
def setUpClass(cls):
if cls.__name__ == "InterceptorSafetyTest":
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _interceptor_gas_cmd(self, gas):
pass
@abc.abstractmethod
def _interceptor_user_gas(self, gas):
pass
def test_prev_gas_interceptor(self):
self._rx(self._interceptor_user_gas(0x0))
self.assertFalse(self.safety.get_gas_interceptor_prev())
self._rx(self._interceptor_user_gas(0x1000))
self.assertTrue(self.safety.get_gas_interceptor_prev())
self._rx(self._interceptor_user_gas(0x0))
self.safety.set_gas_interceptor_detected(False)
def test_disengage_on_gas_interceptor(self):
for g in range(0, 0x1000):
self._rx(self._interceptor_user_gas(0))
self.safety.set_controls_allowed(True)
self._rx(self._interceptor_user_gas(g))
remain_enabled = g <= self.INTERCEPTOR_THRESHOLD
self.assertEqual(remain_enabled, self.safety.get_controls_allowed())
self._rx(self._interceptor_user_gas(0))
self.safety.set_gas_interceptor_detected(False)
def test_alternative_experience_no_disengage_on_gas_interceptor(self):
self.safety.set_controls_allowed(True)
self.safety.set_alternative_experience(ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS)
for g in range(0, 0x1000):
self._rx(self._interceptor_user_gas(g))
# Test we allow lateral, but not longitudinal
self.assertTrue(self.safety.get_controls_allowed())
self.assertEqual(g <= self.INTERCEPTOR_THRESHOLD, self.safety.get_longitudinal_allowed())
# Make sure we can re-gain longitudinal actuation
self._rx(self._interceptor_user_gas(0))
self.assertTrue(self.safety.get_longitudinal_allowed())
def test_allow_engage_with_gas_interceptor_pressed(self):
self._rx(self._interceptor_user_gas(0x1000))
self.safety.set_controls_allowed(1)
self._rx(self._interceptor_user_gas(0x1000))
self.assertTrue(self.safety.get_controls_allowed())
self._rx(self._interceptor_user_gas(0))
def test_gas_interceptor_safety_check(self):
for gas in np.arange(0, 4000, 100):
for controls_allowed in [True, False]:
self.safety.set_controls_allowed(controls_allowed)
if controls_allowed:
send = True
else:
send = gas == 0
self.assertEqual(send, self._tx(self._interceptor_gas_cmd(gas)))
class TorqueSteeringSafetyTestBase(PandaSafetyTestBase):
MAX_RATE_UP = 0
MAX_RATE_DOWN = 0
MAX_TORQUE = 0
MAX_RT_DELTA = 0
RT_INTERVAL = 0
# Safety around steering req bit
MIN_VALID_STEERING_FRAMES = 0
MAX_INVALID_STEERING_FRAMES = 0
MIN_VALID_STEERING_RT_INTERVAL = 0
@classmethod
def setUpClass(cls):
if cls.__name__ == "TorqueSteeringSafetyTestBase":
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _torque_cmd_msg(self, torque, steer_req=1):
pass
def _set_prev_torque(self, t):
self.safety.set_desired_torque_last(t)
self.safety.set_rt_torque_last(t)
def test_steer_safety_check(self):
for enabled in [0, 1]:
for t in range(-self.MAX_TORQUE * 2, self.MAX_TORQUE * 2):
self.safety.set_controls_allowed(enabled)
self._set_prev_torque(t)
if abs(t) > self.MAX_TORQUE or (not enabled and abs(t) > 0):
self.assertFalse(self._tx(self._torque_cmd_msg(t)))
else:
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
def test_non_realtime_limit_up(self):
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertTrue(self._tx(self._torque_cmd_msg(-self.MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP + 1)))
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertFalse(self._tx(self._torque_cmd_msg(-self.MAX_RATE_UP - 1)))
def test_steer_req_bit_frames(self):
"""
Certain safety modes implement some tolerance on their steer request bits matching the
requested torque to avoid a steering fault or lockout and maintain torque. This tests:
- We can't cut torque for more than one frame
- We can't cut torque until at least the minimum number of matching steer_req messages
- We can always recover from violations if steer_req=1
"""
for min_valid_steer_frames in range(self.MIN_VALID_STEERING_FRAMES * 2):
# Reset match count and rt timer to allow cut (valid_steer_req_count, ts_steer_req_mismatch_last)
self.safety.init_tests()
self.safety.set_timer(self.MIN_VALID_STEERING_RT_INTERVAL)
# Allow torque cut
self.safety.set_controls_allowed(True)
self._set_prev_torque(self.MAX_TORQUE)
for _ in range(min_valid_steer_frames):
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
# should tx if we've sent enough valid frames, and we're not cutting torque for too many frames consecutively
should_tx = min_valid_steer_frames >= self.MIN_VALID_STEERING_FRAMES
for idx in range(self.MAX_INVALID_STEERING_FRAMES * 2):
tx = self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0))
self.assertEqual(should_tx and idx < self.MAX_INVALID_STEERING_FRAMES, tx)
# Keep blocking after one steer_req mismatch
for _ in range(100):
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
# Make sure we can recover
self.assertTrue(self._tx(self._torque_cmd_msg(0, steer_req=1)))
self._set_prev_torque(self.MAX_TORQUE)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
def test_steer_req_bit_multi_invalid(self):
"""
For safety modes allowing multiple consecutive invalid frames, this ensures that once a valid frame
is sent after an invalid frame (even without sending the max number of allowed invalid frames),
all counters are reset.
"""
# TODO: Add safety around steer request bits for all safety modes and remove exception
if self.MIN_VALID_STEERING_FRAMES == 0:
raise unittest.SkipTest("Safety mode does not implement tolerance for steer request bit safety")
for max_invalid_steer_frames in range(1, self.MAX_INVALID_STEERING_FRAMES * 2):
self.safety.init_tests()
self.safety.set_timer(self.MIN_VALID_STEERING_RT_INTERVAL)
# Allow torque cut
self.safety.set_controls_allowed(True)
self._set_prev_torque(self.MAX_TORQUE)
for _ in range(self.MIN_VALID_STEERING_FRAMES):
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
# Send partial amount of allowed invalid frames
for idx in range(max_invalid_steer_frames):
should_tx = idx < self.MAX_INVALID_STEERING_FRAMES
self.assertEqual(should_tx, self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
# Send one valid frame, and subsequent invalid should now be blocked
self._set_prev_torque(self.MAX_TORQUE)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
for _ in range(self.MIN_VALID_STEERING_FRAMES + 1):
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
def test_steer_req_bit_realtime(self):
"""
Realtime safety for cutting steer request bit. This tests:
- That we allow messages with mismatching steer request bit if time from last is >= MIN_VALID_STEERING_RT_INTERVAL
- That frame mismatch safety does not interfere with this test
"""
# TODO: Add safety around steer request bits for all safety modes and remove exception
if self.MIN_VALID_STEERING_RT_INTERVAL == 0:
raise unittest.SkipTest("Safety mode does not implement tolerance for steer request bit safety")
for rt_us in np.arange(self.MIN_VALID_STEERING_RT_INTERVAL - 50000, self.MIN_VALID_STEERING_RT_INTERVAL + 50000, 10000):
# Reset match count and rt timer (valid_steer_req_count, ts_steer_req_mismatch_last)
self.safety.init_tests()
# Make sure valid_steer_req_count doesn't affect this test
self.safety.set_controls_allowed(True)
self._set_prev_torque(self.MAX_TORQUE)
for _ in range(self.MIN_VALID_STEERING_FRAMES):
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
# Normally, sending MIN_VALID_STEERING_FRAMES valid frames should always allow
self.safety.set_timer(max(rt_us, 0))
should_tx = rt_us >= self.MIN_VALID_STEERING_RT_INTERVAL
for _ in range(self.MAX_INVALID_STEERING_FRAMES):
self.assertEqual(should_tx, self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
# Keep blocking after one steer_req mismatch
for _ in range(100):
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=0)))
# Make sure we can recover
self.assertTrue(self._tx(self._torque_cmd_msg(0, steer_req=1)))
self._set_prev_torque(self.MAX_TORQUE)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE, steer_req=1)))
class DriverTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase):
DRIVER_TORQUE_ALLOWANCE = 0
DRIVER_TORQUE_FACTOR = 0
@classmethod
def setUpClass(cls):
if cls.__name__ == "DriverTorqueSteeringSafetyTest":
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _torque_cmd_msg(self, torque, steer_req=1):
pass
def test_non_realtime_limit_up(self):
self.safety.set_torque_driver(0, 0)
super().test_non_realtime_limit_up()
# TODO: make this test something
def test_non_realtime_limit_down(self):
self.safety.set_torque_driver(0, 0)
self.safety.set_controls_allowed(True)
def test_against_torque_driver(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
for t in np.arange(0, self.DRIVER_TORQUE_ALLOWANCE + 1, 1):
t *= -sign
self.safety.set_torque_driver(t, t)
self._set_prev_torque(self.MAX_TORQUE * sign)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE * sign)))
self.safety.set_torque_driver(self.DRIVER_TORQUE_ALLOWANCE + 1, self.DRIVER_TORQUE_ALLOWANCE + 1)
self.assertFalse(self._tx(self._torque_cmd_msg(-self.MAX_TORQUE)))
# arbitrary high driver torque to ensure max steer torque is allowed
max_driver_torque = int(self.MAX_TORQUE / self.DRIVER_TORQUE_FACTOR + self.DRIVER_TORQUE_ALLOWANCE + 1)
# spot check some individual cases
for sign in [-1, 1]:
driver_torque = (self.DRIVER_TORQUE_ALLOWANCE + 10) * sign
torque_desired = (self.MAX_TORQUE - 10 * self.DRIVER_TORQUE_FACTOR) * sign
delta = 1 * sign
self._set_prev_torque(torque_desired)
self.safety.set_torque_driver(-driver_torque, -driver_torque)
self.assertTrue(self._tx(self._torque_cmd_msg(torque_desired)))
self._set_prev_torque(torque_desired + delta)
self.safety.set_torque_driver(-driver_torque, -driver_torque)
self.assertFalse(self._tx(self._torque_cmd_msg(torque_desired + delta)))
self._set_prev_torque(self.MAX_TORQUE * sign)
self.safety.set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertTrue(self._tx(self._torque_cmd_msg((self.MAX_TORQUE - self.MAX_RATE_DOWN) * sign)))
self._set_prev_torque(self.MAX_TORQUE * sign)
self.safety.set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertTrue(self._tx(self._torque_cmd_msg(0)))
self._set_prev_torque(self.MAX_TORQUE * sign)
self.safety.set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertFalse(self._tx(self._torque_cmd_msg((self.MAX_TORQUE - self.MAX_RATE_DOWN + 1) * sign)))
def test_realtime_limits(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
self.safety.init_tests()
self._set_prev_torque(0)
self.safety.set_torque_driver(0, 0)
for t in np.arange(0, self.MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
self._set_prev_torque(0)
for t in np.arange(0, self.MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
# Increase timer to update rt_torque_last
self.safety.set_timer(self.RT_INTERVAL + 1)
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA - 1))))
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
class MotorTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase):
MAX_TORQUE_ERROR = 0
TORQUE_MEAS_TOLERANCE = 0
@classmethod
def setUpClass(cls):
if cls.__name__ == "MotorTorqueSteeringSafetyTest":
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _torque_meas_msg(self, torque):
pass
@abc.abstractmethod
def _torque_cmd_msg(self, torque, steer_req=1):
pass
def _set_prev_torque(self, t):
super()._set_prev_torque(t)
self.safety.set_torque_meas(t, t)
def test_torque_absolute_limits(self):
for controls_allowed in [True, False]:
for torque in np.arange(-self.MAX_TORQUE - 1000, self.MAX_TORQUE + 1000, self.MAX_RATE_UP):
self.safety.set_controls_allowed(controls_allowed)
self.safety.set_rt_torque_last(torque)
self.safety.set_torque_meas(torque, torque)
self.safety.set_desired_torque_last(torque - self.MAX_RATE_UP)
if controls_allowed:
send = (-self.MAX_TORQUE <= torque <= self.MAX_TORQUE)
else:
send = torque == 0
self.assertEqual(send, self._tx(self._torque_cmd_msg(torque)))
def test_non_realtime_limit_down(self):
self.safety.set_controls_allowed(True)
torque_meas = self.MAX_TORQUE - self.MAX_TORQUE_ERROR - 50
self.safety.set_rt_torque_last(self.MAX_TORQUE)
self.safety.set_torque_meas(torque_meas, torque_meas)
self.safety.set_desired_torque_last(self.MAX_TORQUE)
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN)))
self.safety.set_rt_torque_last(self.MAX_TORQUE)
self.safety.set_torque_meas(torque_meas, torque_meas)
self.safety.set_desired_torque_last(self.MAX_TORQUE)
self.assertFalse(self._tx(self._torque_cmd_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN + 1)))
def test_exceed_torque_sensor(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
self._set_prev_torque(0)
for t in np.arange(0, self.MAX_TORQUE_ERROR + 2, 2): # step needs to be smaller than MAX_TORQUE_ERROR
t *= sign
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_TORQUE_ERROR + 2))))
def test_realtime_limit_up(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
self.safety.init_tests()
self._set_prev_torque(0)
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
t *= sign
self.safety.set_torque_meas(t, t)
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
self.assertFalse(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
self._set_prev_torque(0)
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
t *= sign
self.safety.set_torque_meas(t, t)
self.assertTrue(self._tx(self._torque_cmd_msg(t)))
# Increase timer to update rt_torque_last
self.safety.set_timer(self.RT_INTERVAL + 1)
self.assertTrue(self._tx(self._torque_cmd_msg(sign * self.MAX_RT_DELTA)))
self.assertTrue(self._tx(self._torque_cmd_msg(sign * (self.MAX_RT_DELTA + 1))))
def test_torque_measurements(self):
trq = 50
for t in [trq, -trq, 0, 0, 0, 0]:
self._rx(self._torque_meas_msg(t))
max_range = range(trq, trq + self.TORQUE_MEAS_TOLERANCE + 1)
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1)
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
self._rx(self._torque_meas_msg(0))
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1)
min_range = range(-self.TORQUE_MEAS_TOLERANCE, 0 + 1)
self._rx(self._torque_meas_msg(0))
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
@add_regen_tests
class PandaSafetyTest(PandaSafetyTestBase):
TX_MSGS: Optional[List[List[int]]] = None
SCANNED_ADDRS = [*range(0x0, 0x800), # Entire 11-bit CAN address space
*range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing
*range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing
*range(0x3300, 0x3400), # Honda
0x10400060, 0x104c006c] # GMLAN (exceptions, range/format unclear)
STANDSTILL_THRESHOLD: Optional[float] = None
GAS_PRESSED_THRESHOLD = 0
RELAY_MALFUNCTION_ADDR: Optional[int] = None
RELAY_MALFUNCTION_BUS: Optional[int] = None
2020-05-31 14:07:01 -07:00
FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} # {bus: [addr]}
FWD_BUS_LOOKUP: Dict[int, int] = {}
@classmethod
def setUpClass(cls):
if cls.__name__ == "PandaSafetyTest" or cls.__name__.endswith('Base'):
cls.safety = None
raise unittest.SkipTest
@abc.abstractmethod
def _user_brake_msg(self, brake):
pass
def _user_regen_msg(self, regen):
pass
@abc.abstractmethod
def _speed_msg(self, speed):
pass
@abc.abstractmethod
def _user_gas_msg(self, gas):
pass
@abc.abstractmethod
def _pcm_status_msg(self, enable):
pass
# ***** standard tests for all safety modes *****
def test_tx_msg_in_scanned_range(self):
# the relay malfunction, fwd hook, and spam can tests don't exhaustively
# scan the entire 29-bit address space, only some known important ranges
# make sure SCANNED_ADDRS stays up to date with car port TX_MSGS; new
# model ports should expand the range if needed
for msg in self.TX_MSGS:
self.assertTrue(msg[0] in self.SCANNED_ADDRS, f"{msg[0]=:#x}")
def test_relay_malfunction(self):
# each car has an addr that is used to detect relay malfunction
# if that addr is seen on specified bus, triggers the relay malfunction
# protection logic: both tx_hook and fwd_hook are expected to return failure
self.assertFalse(self.safety.get_relay_malfunction())
self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8))
self.assertTrue(self.safety.get_relay_malfunction())
for bus in range(0, 3):
for addr in self.SCANNED_ADDRS:
self.assertEqual(-1, self._tx(make_msg(bus, addr, 8)))
self.assertEqual(-1, self.safety.safety_fwd_hook(bus, make_msg(bus, addr, 8)))
def test_fwd_hook(self):
# some safety modes don't forward anything, while others blacklist msgs
for bus in range(0, 3):
for addr in self.SCANNED_ADDRS:
# assume len 8
msg = make_msg(bus, addr, 8)
fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1)
if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]:
fwd_bus = -1
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, msg), f"{addr=:#x} from {bus=} to {fwd_bus=}")
def test_spam_can_buses(self):
for bus in range(0, 4):
for addr in self.SCANNED_ADDRS:
if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS):
self.assertFalse(self._tx(make_msg(bus, addr, 8)), f"allowed TX {addr=} {bus=}")
def test_default_controls_not_allowed(self):
self.assertFalse(self.safety.get_controls_allowed())
def test_manually_enable_controls_allowed(self):
self.safety.set_controls_allowed(1)
self.assertTrue(self.safety.get_controls_allowed())
self.safety.set_controls_allowed(0)
self.assertFalse(self.safety.get_controls_allowed())
def test_prev_gas(self):
self.assertFalse(self.safety.get_gas_pressed_prev())
for pressed in [self.GAS_PRESSED_THRESHOLD + 1, 0]:
self._rx(self._user_gas_msg(pressed))
self.assertEqual(bool(pressed), self.safety.get_gas_pressed_prev())
def test_allow_engage_with_gas_pressed(self):
self._rx(self._user_gas_msg(1))
self.safety.set_controls_allowed(True)
self._rx(self._user_gas_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self._rx(self._user_gas_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
def test_disengage_on_gas(self):
self._rx(self._user_gas_msg(0))
self.safety.set_controls_allowed(True)
self._rx(self._user_gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
self.assertFalse(self.safety.get_controls_allowed())
def test_alternative_experience_no_disengage_on_gas(self):
self._rx(self._user_gas_msg(0))
self.safety.set_controls_allowed(True)
self.safety.set_alternative_experience(ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS)
self._rx(self._user_gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
# Test we allow lateral, but not longitudinal
self.assertTrue(self.safety.get_controls_allowed())
self.assertFalse(self.safety.get_longitudinal_allowed())
# Make sure we can re-gain longitudinal actuation
self._rx(self._user_gas_msg(0))
self.assertTrue(self.safety.get_longitudinal_allowed())
def test_prev_user_brake(self, _user_brake_msg=None, get_brake_pressed_prev=None):
if _user_brake_msg is None:
_user_brake_msg = self._user_brake_msg
get_brake_pressed_prev = self.safety.get_brake_pressed_prev
self.assertFalse(get_brake_pressed_prev())
for pressed in [True, False]:
self._rx(_user_brake_msg(not pressed))
self.assertEqual(not pressed, get_brake_pressed_prev())
self._rx(_user_brake_msg(pressed))
self.assertEqual(pressed, get_brake_pressed_prev())
def test_enable_control_allowed_from_cruise(self):
self._rx(self._pcm_status_msg(False))
self.assertFalse(self.safety.get_controls_allowed())
self._rx(self._pcm_status_msg(True))
self.assertTrue(self.safety.get_controls_allowed())
def test_disable_control_allowed_from_cruise(self):
self.safety.set_controls_allowed(1)
self._rx(self._pcm_status_msg(False))
self.assertFalse(self.safety.get_controls_allowed())
def test_cruise_engaged_prev(self):
for engaged in [True, False]:
self._rx(self._pcm_status_msg(engaged))
self.assertEqual(engaged, self.safety.get_cruise_engaged_prev())
self._rx(self._pcm_status_msg(not engaged))
self.assertEqual(not engaged, self.safety.get_cruise_engaged_prev())
def test_allow_user_brake_at_zero_speed(self, _user_brake_msg=None, get_brake_pressed_prev=None):
if _user_brake_msg is None:
_user_brake_msg = self._user_brake_msg
# Brake was already pressed
self._rx(self._speed_msg(0))
self._rx(_user_brake_msg(1))
self.safety.set_controls_allowed(1)
self._rx(_user_brake_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self.safety.get_longitudinal_allowed())
self._rx(_user_brake_msg(0))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self.safety.get_longitudinal_allowed())
# rising edge of brake should disengage
self._rx(_user_brake_msg(1))
self.assertFalse(self.safety.get_controls_allowed())
self.assertFalse(self.safety.get_longitudinal_allowed())
self._rx(_user_brake_msg(0)) # reset no brakes
def test_not_allow_user_brake_when_moving(self, _user_brake_msg=None, get_brake_pressed_prev=None):
if _user_brake_msg is None:
_user_brake_msg = self._user_brake_msg
# Brake was already pressed
self._rx(_user_brake_msg(1))
self.safety.set_controls_allowed(1)
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD))
self._rx(_user_brake_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self.assertTrue(self.safety.get_longitudinal_allowed())
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
self._rx(_user_brake_msg(1))
self.assertFalse(self.safety.get_controls_allowed())
self.assertFalse(self.safety.get_longitudinal_allowed())
self._rx(self._speed_msg(0))
def test_sample_speed(self):
self.assertFalse(self.safety.get_vehicle_moving())
# not moving
self.safety.safety_rx_hook(self._speed_msg(0))
self.assertFalse(self.safety.get_vehicle_moving())
# speed is at threshold
self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD))
self.assertFalse(self.safety.get_vehicle_moving())
# past threshold
self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
self.assertTrue(self.safety.get_vehicle_moving())
def test_tx_hook_on_wrong_safety_mode(self):
files = os.listdir(os.path.dirname(os.path.realpath(__file__)))
test_files = [f for f in files if f.startswith("test_") and f.endswith(".py")]
current_test = self.__class__.__name__
all_tx = []
for tf in test_files:
test = importlib.import_module("panda.tests.safety."+tf[:-3])
for attr in dir(test):
if attr.startswith("Test") and attr != current_test:
tx = getattr(getattr(test, attr), "TX_MSGS")
if tx is not None and not attr.endswith('Base'):
# No point in comparing different Tesla safety modes
if 'Tesla' in attr and 'Tesla' in current_test:
continue
if {attr, current_test}.issubset({'TestToyotaSafety', 'TestToyotaAltBrakeSafety', 'TestToyotaStockLongitudinal'}):
continue
if {attr, current_test}.issubset({'TestSubaruSafety', 'TestSubaruGen2Safety'}):
continue
if {attr, current_test}.issubset({'TestVolkswagenPqSafety', 'TestVolkswagenPqStockSafety', 'TestVolkswagenPqLongSafety'}):
continue
GM: longitudinal support for camera-integrated cars (#1014) * Allow brake TX on PT bus * Initial Panda GM cam harness support * Complete rewrite / simplification * Add param for cam harness (default is OBD2) - Forward btw 0 and 2 - Filter LKAS and optionally ACC from cam * Add param for stock ACC - Allows ACC on PT bus when set, not otherwise - Allows ACC from cam when set, not otherwise * Add temporary WIP EPS timing workaround in ifdef * Remove keepalive; regen == braking * Fix MISRA violations * EPS timing opt-in by param * Minor clean up * remove timing code for readability fix and formatting * we only need to send one message if cam harness and stock long * Simplify params * Removed dup brake msg * revised params, split 3-ways * add test * clean up a bit, no need to send ACC yet like this * split out button enable and pcm enable into seperate tests * some formatting some formatting * GM CAM uses PCM cruise for controls_allowed * fix gas safety tests * misra * fix static analysis comment * fix brake pressed * Add OP VOACC override stock cam * Add VOACC safetyparam to init.py * openpilot long safety * do this in another PR * add back * add safety tests * rename * struct for safety * not used * temporary fault fix * adjust max gas * allow PSCMStatus in long mode * stash * fix max gas * fix and clean up tests * clean up longitudinal tests into a common class * fix pylint * subclass * clean that up * rm * blocking * comment * comment * lower case * brake bus Co-authored-by: Shane Smiskol <shane@smiskol.com>
2022-11-01 03:58:18 -04:00
if {attr, current_test}.issubset({'TestGmCameraSafety', 'TestGmCameraLongitudinalSafety'}):
continue
if attr.startswith('TestHyundaiCanfd') and current_test.startswith('TestHyundaiCanfd'):
continue
if {attr, current_test}.issubset({'TestVolkswagenMqbSafety', 'TestVolkswagenMqbStockSafety', 'TestVolkswagenMqbLongSafety'}):
continue
# overlapping TX addrs, but they're not actuating messages for either car
if attr == 'TestHyundaiCanfdHDA2LongEV' and current_test.startswith('TestToyota'):
tx = list(filter(lambda m: m[0] not in [0x160, ], tx))
# Volkswagen MQB longitudinal actuating message overlaps with the Subaru lateral actuating message
if attr == 'TestVolkswagenMqbLongSafety' and current_test.startswith('TestSubaru'):
tx = list(filter(lambda m: m[0] not in [0x122, ], tx))
# Volkswagen MQB and Honda Nidec ACC HUD messages overlap
if attr == 'TestVolkswagenMqbLongSafety' and current_test.startswith('TestHondaNidec'):
tx = list(filter(lambda m: m[0] not in [0x30c, ], tx))
# TODO: Temporary, should be fixed in panda firmware, safety_honda.h
if attr.startswith('TestHonda'):
# exceptions for common msgs across different hondas
2022-06-16 19:02:44 -07:00
tx = list(filter(lambda m: m[0] not in [0x1FA, 0x30C, 0x33D], tx))
all_tx.append(list([m[0], m[1], attr] for m in tx))
# make sure we got all the msgs
self.assertTrue(len(all_tx) >= len(test_files)-1)
for tx_msgs in all_tx:
for addr, bus, test_name in tx_msgs:
msg = make_msg(bus, addr)
self.safety.set_controls_allowed(1)
# TODO: this should be blocked
if current_test in ["TestNissanSafety", "TestNissanLeafSafety"] and [addr, bus] in self.TX_MSGS:
continue
self.assertFalse(self._tx(msg), f"transmit of {addr=:#x} {bus=} from {test_name} during {current_test} was allowed")