mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-02-18 17:23:52 +08:00
* Squashed commits, no cleanup * Few fixes * No init = garbage * Only receive with new canpacket * Add send with canpacket * Revert "Add send with canpacket" This reverts commit 7d06686ddd6d447c714b5289d31af24403d36931. * Packet must be aligned to word, or bad performance * Cleaner * Fix tests * Tests... * MISRA 10.4 * More MISRA * libpandasafety_py * cffi * even more tests... * typo * ... * ... * ... * Slight cleanup * MISRA 6.1 * MISRA 17.7 * Bug in bxcan + even style * MISRA 10.1 * Revert "MISRA 10.1" This reverts commit 404ae7fcc39556f80f528de9015702e69f4ea0a5. * ... * MISRA 10.1 and 10.4 suppress until next PR * MISRA 20.1 * ... * test_honda * ... * ... * test_toyota * test_volkswagen_mqb * test_volkswagen_pq * Sketchy thing... * Revert "Sketchy thing..." This reverts commit 3b2e5715bdc1954f7b7b3b7469ba3d0eaa06bdf9. * remove comment * bxcan extended address bug * Concept, experimental dynamic usb packet size * increase each buffer to 10240 bytes * raise python bulk read/write limits * ... * Move packet size to start * Experimental send, stream-like * New receive test, stream-like * cleanup * cleanup + rebase fixes * MISRA * Extra receive method, stream-like, commented out * type change * Revert back to buffer for send, stream commented * forgot ZLP * lower buffer, add rx failsafe * ... remove ZLP * return ZLP back * Add tx checks to panda fw * TX stream with counter * fix counter overflow * 13 free slots should be enough * limit tx usb packet * ... * Revert max_bulk_msg doubling * python lib improve speed * Stream with counter for RX, dirty, needs cleanup * Increase chunk length to 4096 bytes * cleanup fdcan.h * cleanup __init__.py * MISRA 12.1 * MISRA 10.8 * remove non-streaming usb functions * more main.c cleanup * MISRA 15.6 * MISRA 15.5 * MISRA 18.4 and suppress objectIndex * handling usb pakcets > 63bytes, naming and cleanup * Cleanup old from tests and update CANPacket_t struct * Switch to 4 bit DLC instead of 6 bit length * ops) * ... * pylint * receive python buffer increase * USB increase receive packet len * tweak buffers * No need for so high limits * MISRA 20.1 workaround * performance tweaks * cleanup, dlc to data_len_code naming * main.c naming * comments and cleanup for main.c usb * clean py lib * pylint * do not discard good rx messages on stream fail * cleanups * naming * remove bitstruct lib and lower tx limit * bitstruct lefovers * fix bug in VW test * remove adjusting data size and assert on wrong len * ... * test new memcpy before merging * Revert "test new memcpy before merging" This reverts commit 399465a264835061adabdd785718c4b6fc18c267. * macros for to/fromuint8_t array * MISRA hates me! * tests.c include macros instead * move CANPacket to can_definitions.h * vw_pq python test fix * new memcpy test, REMOVE * check without alignment * revert macros for uint8 arrays * Revert "revert macros for uint8 arrays" This reverts commit 581a9db735a42d0d68200bd270d87a8fd34e43fe. * check assert * Revert "check assert" This reverts commit 9e970d029a50597a1718b2bb0260196c050fd77f. * one more variation * Revert "one more variation" This reverts commit f6c0528b7ac7e125750dc0d9445c7ce97f6954b5. * what about read performance * Revert "what about read performance" This reverts commit d2610f90958a816fe7f1822157a84f85e97d9249. * check struct alignment to word * check for aligned memcpy again * cleanup * add CANPacket structure diagram * update CANPacket and add USB packet struct * bugfix + refactoring of EP1 * move dlc_to_len to header * missed include * typo... * MISRA * fk * lower MAX_CAN_MSGS_PER_BULK_TRANSFER * bump CAN_PACKET_VERSION to 2 * bump python lib CAN_PACKET_VERSION to 2 * rename parse_can_buffer to unpack_can_buffer * CANPacket_t const fields * Revert "CANPacket_t const fields" This reverts commit cf91c035b7706a14e317550c5f0501ae3fce7c70. * test.c relative path * cleanup * move macros to safety_declarations * Refactor pack/unpack funcs and add unittest * usb_protocol.h * oops * Update .github/workflows/test.yaml Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * remove print from unittest Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
442 lines
16 KiB
Python
442 lines
16 KiB
Python
import os
|
|
import abc
|
|
import unittest
|
|
import importlib
|
|
import numpy as np
|
|
from typing import Optional, List, Dict
|
|
from opendbc.can.packer import CANPacker # pylint: disable=import-error
|
|
from panda import LEN_TO_DLC
|
|
from panda.tests.safety import libpandasafety_py
|
|
|
|
MAX_WRONG_COUNTERS = 5
|
|
|
|
class UNSAFE_MODE:
|
|
DEFAULT = 0
|
|
DISABLE_DISENGAGE_ON_GAS = 1
|
|
DISABLE_STOCK_AEB = 2
|
|
RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8
|
|
|
|
def package_can_msg(msg):
|
|
addr, _, dat, bus = msg
|
|
ret = libpandasafety_py.ffi.new('CANPacket_t *')
|
|
ret[0].extended = 1 if addr >= 0x800 else 0
|
|
ret[0].addr = addr
|
|
ret[0].data_len_code = LEN_TO_DLC[len(dat)]
|
|
ret[0].bus = bus
|
|
ret[0].data = bytes(dat)
|
|
|
|
return ret
|
|
|
|
def make_msg(bus, addr, length=8):
|
|
return package_can_msg([addr, 0, b'\x00' * length, bus])
|
|
|
|
class CANPackerPanda(CANPacker):
|
|
def make_can_msg_panda(self, name_or_addr, bus, values, counter=-1, fix_checksum=None):
|
|
msg = self.make_can_msg(name_or_addr, bus, values, counter=-1)
|
|
if fix_checksum is not None:
|
|
msg = fix_checksum(msg)
|
|
return package_can_msg(msg)
|
|
|
|
class PandaSafetyTestBase(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "PandaSafetyTestBase":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
def _rx(self, msg):
|
|
return self.safety.safety_rx_hook(msg)
|
|
|
|
def _tx(self, msg):
|
|
return self.safety.safety_tx_hook(msg)
|
|
|
|
class InterceptorSafetyTest(PandaSafetyTestBase):
|
|
|
|
INTERCEPTOR_THRESHOLD = 0
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "InterceptorSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _interceptor_msg(self, gas, addr):
|
|
pass
|
|
|
|
def test_prev_gas_interceptor(self):
|
|
self._rx(self._interceptor_msg(0x0, 0x201))
|
|
self.assertFalse(self.safety.get_gas_interceptor_prev())
|
|
self._rx(self._interceptor_msg(0x1000, 0x201))
|
|
self.assertTrue(self.safety.get_gas_interceptor_prev())
|
|
self._rx(self._interceptor_msg(0x0, 0x201))
|
|
self.safety.set_gas_interceptor_detected(False)
|
|
|
|
def test_disengage_on_gas_interceptor(self):
|
|
for g in range(0, 0x1000):
|
|
self._rx(self._interceptor_msg(0, 0x201))
|
|
self.safety.set_controls_allowed(True)
|
|
self._rx(self._interceptor_msg(g, 0x201))
|
|
remain_enabled = g <= self.INTERCEPTOR_THRESHOLD
|
|
self.assertEqual(remain_enabled, self.safety.get_controls_allowed())
|
|
self._rx(self._interceptor_msg(0, 0x201))
|
|
self.safety.set_gas_interceptor_detected(False)
|
|
|
|
def test_unsafe_mode_no_disengage_on_gas_interceptor(self):
|
|
self.safety.set_controls_allowed(True)
|
|
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
|
for g in range(0, 0x1000):
|
|
self._rx(self._interceptor_msg(g, 0x201))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._interceptor_msg(0, 0x201))
|
|
self.safety.set_gas_interceptor_detected(False)
|
|
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
|
|
|
def test_allow_engage_with_gas_interceptor_pressed(self):
|
|
self._rx(self._interceptor_msg(0x1000, 0x201))
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._interceptor_msg(0x1000, 0x201))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._interceptor_msg(0, 0x201))
|
|
|
|
def test_gas_interceptor_safety_check(self):
|
|
for gas in np.arange(0, 4000, 100):
|
|
for controls_allowed in [True, False]:
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
if controls_allowed:
|
|
send = True
|
|
else:
|
|
send = gas == 0
|
|
self.assertEqual(send, self._tx(self._interceptor_msg(gas, 0x200)))
|
|
|
|
|
|
class TorqueSteeringSafetyTest(PandaSafetyTestBase):
|
|
|
|
MAX_RATE_UP = 0
|
|
MAX_RATE_DOWN = 0
|
|
MAX_TORQUE = 0
|
|
MAX_RT_DELTA = 0
|
|
RT_INTERVAL = 0
|
|
MAX_TORQUE_ERROR = 0
|
|
TORQUE_MEAS_TOLERANCE = 0
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "TorqueSteeringSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _torque_meas_msg(self, torque):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _torque_msg(self, torque):
|
|
pass
|
|
|
|
def _set_prev_torque(self, t):
|
|
self.safety.set_desired_torque_last(t)
|
|
self.safety.set_rt_torque_last(t)
|
|
self.safety.set_torque_meas(t, t)
|
|
|
|
def test_steer_safety_check(self):
|
|
for enabled in [0, 1]:
|
|
for t in range(-self.MAX_TORQUE * 2, self.MAX_TORQUE * 2):
|
|
self.safety.set_controls_allowed(enabled)
|
|
self._set_prev_torque(t)
|
|
if abs(t) > self.MAX_TORQUE or (not enabled and abs(t) > 0):
|
|
self.assertFalse(self._tx(self._torque_msg(t)))
|
|
else:
|
|
self.assertTrue(self._tx(self._torque_msg(t)))
|
|
|
|
def test_torque_absolute_limits(self):
|
|
for controls_allowed in [True, False]:
|
|
for torque in np.arange(-self.MAX_TORQUE - 1000, self.MAX_TORQUE + 1000, self.MAX_RATE_UP):
|
|
self.safety.set_controls_allowed(controls_allowed)
|
|
self.safety.set_rt_torque_last(torque)
|
|
self.safety.set_torque_meas(torque, torque)
|
|
self.safety.set_desired_torque_last(torque - self.MAX_RATE_UP)
|
|
|
|
if controls_allowed:
|
|
send = (-self.MAX_TORQUE <= torque <= self.MAX_TORQUE)
|
|
else:
|
|
send = torque == 0
|
|
|
|
self.assertEqual(send, self._tx(self._torque_msg(torque)))
|
|
|
|
def test_non_realtime_limit_up(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
self._set_prev_torque(0)
|
|
self.assertTrue(self._tx(self._torque_msg(self.MAX_RATE_UP)))
|
|
|
|
self._set_prev_torque(0)
|
|
self.assertFalse(self._tx(self._torque_msg(self.MAX_RATE_UP + 1)))
|
|
|
|
def test_non_realtime_limit_down(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
torque_meas = self.MAX_TORQUE - self.MAX_TORQUE_ERROR - 50
|
|
|
|
self.safety.set_rt_torque_last(self.MAX_TORQUE)
|
|
self.safety.set_torque_meas(torque_meas, torque_meas)
|
|
self.safety.set_desired_torque_last(self.MAX_TORQUE)
|
|
self.assertTrue(self._tx(self._torque_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN)))
|
|
|
|
self.safety.set_rt_torque_last(self.MAX_TORQUE)
|
|
self.safety.set_torque_meas(torque_meas, torque_meas)
|
|
self.safety.set_desired_torque_last(self.MAX_TORQUE)
|
|
self.assertFalse(self._tx(self._torque_msg(self.MAX_TORQUE - self.MAX_RATE_DOWN + 1)))
|
|
|
|
def test_exceed_torque_sensor(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
for sign in [-1, 1]:
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_TORQUE_ERROR + 2, 2): # step needs to be smaller than MAX_TORQUE_ERROR
|
|
t *= sign
|
|
self.assertTrue(self._tx(self._torque_msg(t)))
|
|
|
|
self.assertFalse(self._tx(self._torque_msg(sign * (self.MAX_TORQUE_ERROR + 2))))
|
|
|
|
def test_realtime_limit_up(self):
|
|
self.safety.set_controls_allowed(True)
|
|
|
|
for sign in [-1, 1]:
|
|
self.safety.init_tests()
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
|
|
t *= sign
|
|
self.safety.set_torque_meas(t, t)
|
|
self.assertTrue(self._tx(self._torque_msg(t)))
|
|
self.assertFalse(self._tx(self._torque_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
self._set_prev_torque(0)
|
|
for t in np.arange(0, self.MAX_RT_DELTA + 1, 1):
|
|
t *= sign
|
|
self.safety.set_torque_meas(t, t)
|
|
self.assertTrue(self._tx(self._torque_msg(t)))
|
|
|
|
# Increase timer to update rt_torque_last
|
|
self.safety.set_timer(self.RT_INTERVAL + 1)
|
|
self.assertTrue(self._tx(self._torque_msg(sign * self.MAX_RT_DELTA)))
|
|
self.assertTrue(self._tx(self._torque_msg(sign * (self.MAX_RT_DELTA + 1))))
|
|
|
|
def test_torque_measurements(self):
|
|
trq = 50
|
|
for t in [trq, -trq, 0, 0, 0, 0]:
|
|
self._rx(self._torque_meas_msg(t))
|
|
|
|
max_range = range(trq, trq + self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1)
|
|
self._rx(self._torque_meas_msg(0))
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1)
|
|
min_range = range(-self.TORQUE_MEAS_TOLERANCE, 0 + 1)
|
|
self._rx(self._torque_meas_msg(0))
|
|
self.assertTrue(self.safety.get_torque_meas_min() in min_range)
|
|
self.assertTrue(self.safety.get_torque_meas_max() in max_range)
|
|
|
|
|
|
class PandaSafetyTest(PandaSafetyTestBase):
|
|
TX_MSGS: Optional[List[List[int]]] = None
|
|
STANDSTILL_THRESHOLD: Optional[float] = None
|
|
GAS_PRESSED_THRESHOLD = 0
|
|
RELAY_MALFUNCTION_ADDR: Optional[int] = None
|
|
RELAY_MALFUNCTION_BUS: Optional[int] = None
|
|
FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} # {bus: [addr]}
|
|
FWD_BUS_LOOKUP: Dict[int, int] = {}
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == "PandaSafetyTest":
|
|
cls.safety = None
|
|
raise unittest.SkipTest
|
|
|
|
@abc.abstractmethod
|
|
def _brake_msg(self, brake):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _speed_msg(self, speed):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _gas_msg(self, gas):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _pcm_status_msg(self, enable):
|
|
pass
|
|
|
|
# ***** standard tests for all safety modes *****
|
|
|
|
def test_relay_malfunction(self):
|
|
# each car has an addr that is used to detect relay malfunction
|
|
# if that addr is seen on specified bus, triggers the relay malfunction
|
|
# protection logic: both tx_hook and fwd_hook are expected to return failure
|
|
self.assertFalse(self.safety.get_relay_malfunction())
|
|
self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8))
|
|
self.assertTrue(self.safety.get_relay_malfunction())
|
|
for a in range(1, 0x800):
|
|
for b in range(0, 3):
|
|
self.assertEqual(-1, self._tx(make_msg(b, a, 8)))
|
|
self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, a, 8)))
|
|
|
|
def test_fwd_hook(self):
|
|
# some safety modes don't forward anything, while others blacklist msgs
|
|
for bus in range(0x0, 0x3):
|
|
for addr in range(0x1, 0x800):
|
|
# assume len 8
|
|
msg = make_msg(bus, addr, 8)
|
|
fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1)
|
|
if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]:
|
|
fwd_bus = -1
|
|
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, msg))
|
|
|
|
def test_spam_can_buses(self):
|
|
for addr in range(1, 0x800):
|
|
for bus in range(0, 4):
|
|
if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS):
|
|
self.assertFalse(self._tx(make_msg(bus, addr, 8)))
|
|
|
|
def test_default_controls_not_allowed(self):
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_manually_enable_controls_allowed(self):
|
|
self.safety.set_controls_allowed(1)
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self.safety.set_controls_allowed(0)
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_prev_gas(self):
|
|
self.assertFalse(self.safety.get_gas_pressed_prev())
|
|
for pressed in [self.GAS_PRESSED_THRESHOLD + 1, 0]:
|
|
self._rx(self._gas_msg(pressed))
|
|
self.assertEqual(bool(pressed), self.safety.get_gas_pressed_prev())
|
|
|
|
def test_allow_engage_with_gas_pressed(self):
|
|
self._rx(self._gas_msg(1))
|
|
self.safety.set_controls_allowed(True)
|
|
self._rx(self._gas_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._gas_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
def test_disengage_on_gas(self):
|
|
self._rx(self._gas_msg(0))
|
|
self.safety.set_controls_allowed(True)
|
|
self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_unsafe_mode_no_disengage_on_gas(self):
|
|
self._rx(self._gas_msg(0))
|
|
self.safety.set_controls_allowed(True)
|
|
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
|
self._rx(self._gas_msg(self.GAS_PRESSED_THRESHOLD + 1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
def test_prev_brake(self):
|
|
self.assertFalse(self.safety.get_brake_pressed_prev())
|
|
for pressed in [True, False]:
|
|
self._rx(self._brake_msg(not pressed))
|
|
self.assertEqual(not pressed, self.safety.get_brake_pressed_prev())
|
|
self._rx(self._brake_msg(pressed))
|
|
self.assertEqual(pressed, self.safety.get_brake_pressed_prev())
|
|
|
|
def test_enable_control_allowed_from_cruise(self):
|
|
self._rx(self._pcm_status_msg(False))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self._rx(self._pcm_status_msg(True))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
|
|
def test_disable_control_allowed_from_cruise(self):
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._pcm_status_msg(False))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
|
|
def test_cruise_engaged_prev(self):
|
|
for engaged in [True, False]:
|
|
self._rx(self._pcm_status_msg(engaged))
|
|
self.assertEqual(engaged, self.safety.get_cruise_engaged_prev())
|
|
self._rx(self._pcm_status_msg(not engaged))
|
|
self.assertEqual(not engaged, self.safety.get_cruise_engaged_prev())
|
|
|
|
def test_allow_brake_at_zero_speed(self):
|
|
# Brake was already pressed
|
|
self._rx(self._speed_msg(0))
|
|
self._rx(self._brake_msg(1))
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._brake_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._brake_msg(0))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
# rising edge of brake should disengage
|
|
self._rx(self._brake_msg(1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self._rx(self._brake_msg(0)) # reset no brakes
|
|
|
|
def test_not_allow_brake_when_moving(self):
|
|
# Brake was already pressed
|
|
self._rx(self._brake_msg(1))
|
|
self.safety.set_controls_allowed(1)
|
|
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD))
|
|
self._rx(self._brake_msg(1))
|
|
self.assertTrue(self.safety.get_controls_allowed())
|
|
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
|
|
self._rx(self._brake_msg(1))
|
|
self.assertFalse(self.safety.get_controls_allowed())
|
|
self._rx(self._speed_msg(0))
|
|
|
|
def test_sample_speed(self):
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# not moving
|
|
self.safety.safety_rx_hook(self._speed_msg(0))
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# speed is at threshold
|
|
self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD))
|
|
self.assertFalse(self.safety.get_vehicle_moving())
|
|
|
|
# past threshold
|
|
self.safety.safety_rx_hook(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
|
|
self.assertTrue(self.safety.get_vehicle_moving())
|
|
|
|
def test_tx_hook_on_wrong_safety_mode(self):
|
|
files = os.listdir(os.path.dirname(os.path.realpath(__file__)))
|
|
test_files = [f for f in files if f.startswith("test_") and f.endswith(".py")]
|
|
|
|
current_test = self.__class__.__name__
|
|
|
|
all_tx = []
|
|
for tf in test_files:
|
|
test = importlib.import_module("panda.tests.safety."+tf[:-3])
|
|
for attr in dir(test):
|
|
if attr.startswith("Test") and attr != current_test:
|
|
tx = getattr(getattr(test, attr), "TX_MSGS")
|
|
if tx is not None:
|
|
# TODO: Temporary, should be fixed in panda firmware, safety_honda.h
|
|
if attr in ['TestHondaBoschLongGiraffeSafety', 'TestHondaNidecSafety']:
|
|
tx = list(filter(lambda m: m[0] not in [0x1FA, 0x30C], tx))
|
|
all_tx.append(tx)
|
|
|
|
# make sure we got all the msgs
|
|
self.assertTrue(len(all_tx) >= len(test_files)-1)
|
|
|
|
for tx_msgs in all_tx:
|
|
for addr, bus in tx_msgs:
|
|
msg = make_msg(bus, addr)
|
|
self.safety.set_controls_allowed(1)
|
|
# TODO: this should be blocked
|
|
if current_test in ["TestNissanSafety", "TestNissanLeafSafety"] and [addr, bus] in self.TX_MSGS:
|
|
continue
|
|
self.assertFalse(self._tx(msg), f"{addr=} {bus=} got through")
|