diff --git a/python/__init__.py b/python/__init__.py index 1b8495f4..5d6f3e9f 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -401,6 +401,10 @@ class Panda: if not enter_bootloader and reconnect: self.reconnect() + @property + def connected(self) -> bool: + return self._handle_open + def reconnect(self): if self._handle_open: self.close() diff --git a/requirements.txt b/requirements.txt index 600be4b2..6b063e91 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ numpy hexdump>=3.3 pycryptodome==3.9.8 tqdm>=4.14.0 -nose pytest parameterized requests diff --git a/tests/hitl/0_dfu.py b/tests/hitl/0_dfu.py index ff697ba5..cff4c02c 100644 --- a/tests/hitl/0_dfu.py +++ b/tests/hitl/0_dfu.py @@ -1,8 +1,5 @@ from panda import Panda, PandaDFU -from .helpers import test_all_pandas, panda_connect_and_init -@test_all_pandas -@panda_connect_and_init def test_dfu(p): app_mcu_type = p.get_mcu_type() dfu_serial = PandaDFU.st_serial_to_dfu_serial(p.get_usb_serial(), p.get_mcu_type()) diff --git a/tests/hitl/1_program.py b/tests/hitl/1_program.py index c129c99b..6db33550 100644 --- a/tests/hitl/1_program.py +++ b/tests/hitl/1_program.py @@ -2,13 +2,14 @@ import os import time from panda import Panda, PandaDFU, McuType, BASEDIR -from .helpers import test_all_pandas, panda_connect_and_init, check_signature +def check_signature(p): + assert not p.bootstub, "Flashed firmware not booting. Stuck in bootstub." + assert p.up_to_date() + # TODO: make more comprehensive bootstub tests and run on a few production ones + current # TODO: also test release-signed app -@test_all_pandas -@panda_connect_and_init def test_a_known_bootstub(p): """ Test that compiled app can work with known production bootstub @@ -53,14 +54,10 @@ def test_a_known_bootstub(p): check_signature(p) assert not p.bootstub -@test_all_pandas -@panda_connect_and_init def test_b_recover(p): assert p.recover(timeout=30) check_signature(p) -@test_all_pandas -@panda_connect_and_init def test_c_flash(p): # test flash from bootstub serial = p._serial diff --git a/tests/hitl/2_health.py b/tests/hitl/2_health.py index 3a7f1e74..f63f777c 100644 --- a/tests/hitl/2_health.py +++ b/tests/hitl/2_health.py @@ -1,32 +1,28 @@ import time +import pytest from panda import Panda from panda_jungle import PandaJungle # pylint: disable=import-error -from .helpers import panda_jungle, test_all_pandas, test_all_gen2_pandas, panda_connect_and_init +from panda.tests.hitl.conftest import PandaGroup -@test_all_pandas -@panda_connect_and_init -def test_ignition(p): +def test_ignition(p, panda_jungle): # Set harness orientation to #2, since the ignition line is on the wrong SBU bus :/ panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_2) p.reset() for ign in (True, False): panda_jungle.set_ignition(ign) - time.sleep(2) + time.sleep(0.1) assert p.health()['ignition_line'] == ign -@test_all_gen2_pandas -@panda_connect_and_init -def test_orientation_detection(p): +@pytest.mark.test_panda_types(PandaGroup.GEN2) +def test_orientation_detection(p, panda_jungle): seen_orientations = [] for i in range(3): panda_jungle.set_harness_orientation(i) - time.sleep(0.25) p.reset() - time.sleep(0.25) detected_harness_orientation = p.health()['car_harness_status'] print(f"Detected orientation: {detected_harness_orientation}") @@ -34,16 +30,12 @@ def test_orientation_detection(p): assert False seen_orientations.append(detected_harness_orientation) -@test_all_pandas -@panda_connect_and_init def test_voltage(p): for _ in range(10): voltage = p.health()['voltage'] assert ((voltage > 11000) and (voltage < 13000)) time.sleep(0.1) -@test_all_pandas -@panda_connect_and_init def test_hw_type(p): """ hw type should be same in bootstub as application @@ -66,10 +58,8 @@ def test_hw_type(p): assert pp.get_mcu_type() == mcu_type, "Bootstub and app MCU type mismatch" assert pp.get_uid() == app_uid - -@test_all_pandas -@panda_connect_and_init -def test_heartbeat(p): +def test_heartbeat(p, panda_jungle): + panda_jungle.set_ignition(True) # TODO: add more cases here once the tests aren't super slow p.set_safety_mode(mode=Panda.SAFETY_HYUNDAI, param=Panda.FLAG_HYUNDAI_LONG) p.send_heartbeat() @@ -79,7 +69,7 @@ def test_heartbeat(p): # shouldn't do anything once we're in a car safety mode p.set_heartbeat_disabled() - time.sleep(6) + time.sleep(6.) h = p.health() assert h['heartbeat_lost'] @@ -87,8 +77,6 @@ def test_heartbeat(p): assert h['safety_param'] == 0 assert h['controls_allowed'] == 0 -@test_all_pandas -@panda_connect_and_init def test_microsecond_timer(p): start_time = p.get_microsecond_timer() time.sleep(1) diff --git a/tests/hitl/3_usb_to_can.py b/tests/hitl/3_usb_to_can.py index 4aca06af..22cfde43 100644 --- a/tests/hitl/3_usb_to_can.py +++ b/tests/hitl/3_usb_to_can.py @@ -1,13 +1,12 @@ import sys import time +import pytest from flaky import flaky -from nose.tools import assert_equal, assert_less, assert_greater from panda import Panda -from .helpers import SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_all_gmlan_pandas, test_all_pandas, panda_connect_and_init +from panda.tests.hitl.conftest import SPEED_NORMAL, SPEED_GMLAN, PandaGroup +from panda.tests.hitl.helpers import time_many_sends -@test_all_pandas -@panda_connect_and_init def test_can_loopback(p): p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_can_loopback(True) @@ -31,8 +30,6 @@ def test_can_loopback(p): assert 0x1aa == sr[0][0] == lb[0][0] assert b"message" == sr[0][2] == lb[0][2] -@test_all_pandas -@panda_connect_and_init def test_reliability(p): MSG_COUNT = 100 @@ -55,20 +52,18 @@ def test_reliability(p): sent_echo = [x for x in r if x[3] == 0x80] loopback_resp = [x for x in r if x[3] == 0] - assert_equal(sorted([x[0] for x in loopback_resp]), addrs) - assert_equal(sorted([x[0] for x in sent_echo]), addrs) - assert_equal(len(r), 200) + assert sorted([x[0] for x in loopback_resp]) == addrs + assert sorted([x[0] for x in sent_echo]) == addrs + assert len(r) == 200 # take sub 20ms et = (time.monotonic() - st) * 1000.0 - assert_less(et, 20) + assert et < 20 sys.stdout.write("P") sys.stdout.flush() -@test_all_pandas @flaky(max_runs=6, min_passes=1) -@panda_connect_and_init def test_throughput(p): # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) @@ -85,13 +80,12 @@ def test_throughput(p): # bit count from https://en.wikipedia.org/wiki/CAN_bus saturation_pct = (comp_kbps / speed) * 100.0 - assert_greater(saturation_pct, 80) - assert_less(saturation_pct, 100) + assert saturation_pct > 80 + assert saturation_pct < 100 print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct)) -@test_all_gmlan_pandas -@panda_connect_and_init +@pytest.mark.test_panda_types(PandaGroup.GMLAN) def test_gmlan(p): p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_can_loopback(True) @@ -104,18 +98,17 @@ def test_gmlan(p): for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: p.set_gmlan(bus) comp_kbps_gmlan = time_many_sends(p, 3) - assert_greater(comp_kbps_gmlan, 0.8 * SPEED_GMLAN) - assert_less(comp_kbps_gmlan, 1.0 * SPEED_GMLAN) + assert comp_kbps_gmlan > (0.8 * SPEED_GMLAN) + assert comp_kbps_gmlan < (1.0 * SPEED_GMLAN) p.set_gmlan(None) comp_kbps_normal = time_many_sends(p, bus) - assert_greater(comp_kbps_normal, 0.8 * SPEED_NORMAL) - assert_less(comp_kbps_normal, 1.0 * SPEED_NORMAL) + assert comp_kbps_normal > (0.8 * SPEED_NORMAL) + assert comp_kbps_normal < (1.0 * SPEED_NORMAL) print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal)) -@test_all_gmlan_pandas -@panda_connect_and_init +@pytest.mark.test_panda_types(PandaGroup.GMLAN) def test_gmlan_bad_toggle(p): p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_can_loopback(True) @@ -124,21 +117,19 @@ def test_gmlan_bad_toggle(p): for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: p.set_gmlan(bus) comp_kbps_gmlan = time_many_sends(p, 3) - assert_greater(comp_kbps_gmlan, 0.6 * SPEED_GMLAN) - assert_less(comp_kbps_gmlan, 1.0 * SPEED_GMLAN) + assert comp_kbps_gmlan > (0.6 * SPEED_GMLAN) + assert comp_kbps_gmlan < (1.0 * SPEED_GMLAN) # normal for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: p.set_gmlan(None) comp_kbps_normal = time_many_sends(p, bus) - assert_greater(comp_kbps_normal, 0.6 * SPEED_NORMAL) - assert_less(comp_kbps_normal, 1.0 * SPEED_NORMAL) + assert comp_kbps_normal > (0.6 * SPEED_NORMAL) + assert comp_kbps_normal < (1.0 * SPEED_NORMAL) # this will fail if you have hardware serial connected -@test_all_pandas -@panda_connect_and_init def test_serial_debug(p): _ = p.serial_read(Panda.SERIAL_DEBUG) # junk p.call_control_api(0x01) - assert(p.serial_read(Panda.SERIAL_DEBUG).startswith(b"NO HANDLER")) + assert p.serial_read(Panda.SERIAL_DEBUG).startswith(b"NO HANDLER") diff --git a/tests/hitl/4_can_loopback.py b/tests/hitl/4_can_loopback.py index 103da33c..f0241d2b 100644 --- a/tests/hitl/4_can_loopback.py +++ b/tests/hitl/4_can_loopback.py @@ -1,18 +1,17 @@ import os import time +import pytest import random import threading from flaky import flaky from collections import defaultdict -from nose.tools import assert_equal, assert_less, assert_greater from panda import Panda -from .helpers import panda_jungle, time_many_sends, test_all_pandas, test_all_gen2_pandas, clear_can_buffers, panda_connect_and_init, PARTIAL_TESTS +from panda.tests.hitl.conftest import PandaGroup, PARTIAL_TESTS +from panda.tests.hitl.helpers import time_many_sends, clear_can_buffers -@test_all_pandas @flaky(max_runs=3, min_passes=1) -@panda_connect_and_init -def test_send_recv(p): +def test_send_recv(p, panda_jungle): def test(p_send, p_recv): p_send.set_can_loopback(False) p_recv.set_can_loopback(False) @@ -35,8 +34,7 @@ def test_send_recv(p): comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True) saturation_pct = (comp_kbps / speed) * 100.0 - assert_greater(saturation_pct, 80) - assert_less(saturation_pct, 100) + assert 80 < saturation_pct < 100 print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, {:6.2f}%".format(bus, speed, comp_kbps, saturation_pct)) @@ -46,10 +44,8 @@ def test_send_recv(p): test(panda_jungle, p) -@test_all_pandas @flaky(max_runs=6, min_passes=1) -@panda_connect_and_init -def test_latency(p): +def test_latency(p, panda_jungle): def test(p_send, p_recv): p_send.set_can_loopback(False) p_recv.set_can_loopback(False) @@ -93,14 +89,14 @@ def test_latency(p): if len(r) == 0 or len(r_echo) == 0: print("r: {}, r_echo: {}".format(r, r_echo)) - assert_equal(len(r), 1) - assert_equal(len(r_echo), 1) + assert len(r) == 1 + assert len(r_echo) == 1 et = (et - st) * 1000.0 comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / et latency = et - ((1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / speed) - assert_less(latency, 5.0) + assert latency < 5.0 saturation_pct = (comp_kbps / speed) * 100.0 latencies.append(latency) @@ -108,7 +104,7 @@ def test_latency(p): saturation_pcts.append(saturation_pct) average_latency = sum(latencies) / num_messages - assert_less(average_latency, 1.0) + assert average_latency < 1.0 average_comp_kbps = sum(comp_kbps_list) / num_messages average_saturation_pct = sum(saturation_pcts) / num_messages @@ -121,9 +117,9 @@ def test_latency(p): test(panda_jungle, p) -@test_all_gen2_pandas -@panda_connect_and_init -def test_gen2_loopback(p): +@pytest.mark.panda_expect_can_error +@pytest.mark.test_panda_types(PandaGroup.GEN2) +def test_gen2_loopback(p, panda_jungle): def test(p_send, p_recv, address=None): for bus in range(4): obd = False @@ -167,9 +163,7 @@ def test_gen2_loopback(p): test(p, panda_jungle, 0x18DB33F1) test(panda_jungle, p, 0x18DB33F1) -@test_all_pandas -@panda_connect_and_init -def test_bulk_write(p): +def test_bulk_write(p, panda_jungle): # TODO: doesn't work in partial test mode if PARTIAL_TESTS: return @@ -211,8 +205,6 @@ def test_bulk_write(p): # Set back to silent mode p.set_safety_mode(Panda.SAFETY_SILENT) -@test_all_pandas -@panda_connect_and_init def test_message_integrity(p): clear_can_buffers(p) diff --git a/tests/hitl/5_gps.py b/tests/hitl/5_gps.py index 7fb6329c..3cead333 100644 --- a/tests/hitl/5_gps.py +++ b/tests/hitl/5_gps.py @@ -1,9 +1,11 @@ import time -from panda import PandaSerial -from .helpers import test_all_gps_pandas, panda_connect_and_init +import pytest -@test_all_gps_pandas -@panda_connect_and_init +from panda import PandaSerial +from panda.tests.hitl.conftest import PandaGroup + + +@pytest.mark.test_panda_types(PandaGroup.GPS) def test_gps_version(p): serial = PandaSerial(p, 1, 9600) # Reset and check twice to make sure the enabling works diff --git a/tests/hitl/6_safety.py b/tests/hitl/6_safety.py index 37528382..2237f530 100644 --- a/tests/hitl/6_safety.py +++ b/tests/hitl/6_safety.py @@ -1,11 +1,8 @@ import time -from nose.tools import assert_equal from panda import Panda -from .helpers import test_all_pandas, panda_connect_and_init -@test_all_pandas -@panda_connect_and_init + def test_safety_nooutput(p): p.set_safety_mode(Panda.SAFETY_SILENT) p.set_can_loopback(True) @@ -20,14 +17,13 @@ def test_safety_nooutput(p): assert len([x for x in r if x[3] != 192]) == 0 assert len([x for x in r if x[3] == 192]) == 1 -@test_all_pandas -@panda_connect_and_init + def test_canfd_safety_modes(p): # works on all pandas p.set_safety_mode(Panda.SAFETY_TOYOTA) - assert_equal(p.health()['safety_mode'], Panda.SAFETY_TOYOTA) + assert p.health()['safety_mode'] == Panda.SAFETY_TOYOTA # shouldn't be able to set a CAN-FD safety mode on non CAN-FD panda p.set_safety_mode(Panda.SAFETY_HYUNDAI_CANFD) expected_mode = Panda.SAFETY_HYUNDAI_CANFD if p.get_type() in Panda.H7_DEVICES else Panda.SAFETY_SILENT - assert_equal(p.health()['safety_mode'], expected_mode) + assert p.health()['safety_mode'] == expected_mode diff --git a/tests/hitl/conftest.py b/tests/hitl/conftest.py new file mode 100644 index 00000000..a70f2ca7 --- /dev/null +++ b/tests/hitl/conftest.py @@ -0,0 +1,180 @@ +import concurrent.futures +import os +import time +import pytest + +from panda import Panda +from panda_jungle import PandaJungle # pylint: disable=import-error +from panda.tests.hitl.helpers import clear_can_buffers + + +SPEED_NORMAL = 500 +SPEED_GMLAN = 33.3 +BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)] + + +PEDAL_SERIAL = 'none' +JUNGLE_SERIAL = os.getenv("PANDAS_JUNGLE") +PANDAS_EXCLUDE = os.getenv("PANDAS_EXCLUDE", "").strip().split(" ") +PARTIAL_TESTS = os.environ.get("PARTIAL_TESTS", "0") == "1" + +class PandaGroup: + H7 = (Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2) + GEN2 = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO) + H7 + GPS = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO) + GMLAN = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA) + + TESTED = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2, Panda.HW_TYPE_UNO) + +if PARTIAL_TESTS: + # minimal set of pandas to get most of our coverage + # * red panda covers GEN2, STM32H7 + # * black panda covers STM32F4, GEN2, and GPS + PandaGroup.TESTED = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA) # type: ignore + +# Find all pandas connected +_all_pandas = {} +_panda_jungle = None +def init_all_pandas(): + global _panda_jungle + _panda_jungle = PandaJungle(JUNGLE_SERIAL) + _panda_jungle.set_panda_power(True) + + for serial in Panda.list(): + if serial not in PANDAS_EXCLUDE and serial != PEDAL_SERIAL: + with Panda(serial=serial) as p: + ptype = bytes(p.get_type()) + if ptype in PandaGroup.TESTED: + _all_pandas[serial] = ptype + + # ensure we have all tested panda types + missing_types = set(PandaGroup.TESTED) - set(_all_pandas.values()) + assert len(missing_types) == 0, f"Missing panda types: {missing_types}" + + print(f"{len(_all_pandas)} total pandas") +init_all_pandas() +_all_panda_serials = list(_all_pandas.keys()) + + +def init_jungle(): + clear_can_buffers(_panda_jungle) + _panda_jungle.set_panda_power(True) + _panda_jungle.set_can_loopback(False) + _panda_jungle.set_obd(False) + _panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1) + for bus, speed in BUS_SPEEDS: + _panda_jungle.set_can_speed_kbps(bus, speed) + + +def pytest_configure(config): + config.addinivalue_line( + "markers", "test_panda_types(name): mark test to run only on specified panda types" + ) + config.addinivalue_line( + "markers", "panda_expect_can_error: mark test to ignore CAN health errors" + ) + + +def pytest_make_parametrize_id(config, val, argname): + if val in _all_pandas: + # TODO: get nice string instead of int + hw_type = _all_pandas[val][0] + return f"serial={val}, hw_type={hw_type}" + return None + + +@pytest.fixture(name='panda_jungle') +def fixture__panda_jungle(request): + init_jungle() + return _panda_jungle + +@pytest.fixture(name='p') +def func_fixture_panda(request, module_panda): + p = module_panda + + # Check if test is applicable to this panda + mark = request.node.get_closest_marker('test_panda_types') + if mark: + assert len(mark.args) > 0, "Missing allowed panda types in mark" + test_types = mark.args[0] + if _all_pandas[p.get_usb_serial()] not in test_types: + pytest.skip(f"Not applicable, {test_types} pandas only") + + # TODO: reset is slow (2+ seconds) + p.reset() + + # Run test + yield p + + # Teardown + if not p.connected: + p.reconnect() + if p.bootstub: + p.reset() + + # TODO: would be nice to make these common checks in the teardown + # show up as failed tests instead of "errors" + + # Check for faults + assert p.health()['fault_status'] == 0 + + # Check health of each CAN core after test, normal to fail for test_gen2_loopback on OBD bus, so skipping + mark = request.node.get_closest_marker('panda_expect_can_error') + expect_can_error = mark is not None + if not expect_can_error: + for i in range(3): + can_health = p.can_health(i) + assert can_health['bus_off_cnt'] == 0 + assert can_health['receive_error_cnt'] == 0 + assert can_health['transmit_error_cnt'] == 0 + assert can_health['total_rx_lost_cnt'] == 0 + assert can_health['total_tx_lost_cnt'] == 0 + assert can_health['total_error_cnt'] == 0 + assert can_health['total_tx_checksum_error_cnt'] == 0 + +@pytest.fixture(name='module_panda', params=_all_panda_serials, scope='module') +def fixture_panda_setup(request): + """ + Clean up all pandas + jungle and return the panda under test. + """ + panda_serial = request.param + + # Initialize jungle + init_jungle() + + # wait for all pandas to come up + for _ in range(50): + if set(_all_panda_serials).issubset(set(Panda.list())): + break + time.sleep(0.1) + + # Connect to pandas + def cnnct(s): + if s == panda_serial: + p = Panda(serial=s) + p.reset(reconnect=True) + + p.set_can_loopback(False) + p.set_gmlan(None) + p.set_esp_power(False) + p.set_power_save(False) + for bus, speed in BUS_SPEEDS: + p.set_can_speed_kbps(bus, speed) + clear_can_buffers(p) + p.set_power_save(False) + return p + else: + with Panda(serial=s) as p: + p.reset(reconnect=False) + return None + + with concurrent.futures.ThreadPoolExecutor() as exc: + ps = list(exc.map(cnnct, _all_panda_serials, timeout=20)) + pandas = [p for p in ps if p is not None] + + # run test + yield pandas[0] + + # Teardown + for p in pandas: + p.close() diff --git a/tests/hitl/helpers.py b/tests/hitl/helpers.py index 99070f0f..2c36b798 100644 --- a/tests/hitl/helpers.py +++ b/tests/hitl/helpers.py @@ -1,81 +1,5 @@ -import concurrent.futures -import os import time import random -import faulthandler -from functools import wraps, partial -from nose.tools import assert_equal -from parameterized import parameterized - -from panda import Panda -from panda_jungle import PandaJungle # pylint: disable=import-error - -SPEED_NORMAL = 500 -SPEED_GMLAN = 33.3 -BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)] -H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2] -GEN2_HW_TYPES = [Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO] + H7_HW_TYPES -GPS_HW_TYPES = [Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO] -PEDAL_SERIAL = 'none' -JUNGLE_SERIAL = os.getenv("PANDAS_JUNGLE") -PANDAS_EXCLUDE = os.getenv("PANDAS_EXCLUDE", "").strip().split(" ") - -PARTIAL_TESTS = os.environ.get("PARTIAL_TESTS", "0") == "1" - -# Enable fault debug -faulthandler.enable(all_threads=False) - -# Connect to Panda Jungle -panda_jungle = PandaJungle(JUNGLE_SERIAL) - -# Find all pandas connected -_all_pandas = [] -def init_all_pandas(): - global _all_pandas - _all_pandas = [] - - # power cycle pandas - panda_jungle.set_panda_power(False) - time.sleep(3) - panda_jungle.set_panda_power(True) - time.sleep(5) - - for serial in Panda.list(): - if serial not in PANDAS_EXCLUDE and serial != PEDAL_SERIAL: - with Panda(serial=serial) as p: - _all_pandas.append((serial, p.get_type())) - print(f"{len(_all_pandas)} total pandas") -init_all_pandas() -_all_panda_serials = [x[0] for x in _all_pandas] - -def parameterized_panda_types(types): - serials = [] - for typ in types: - for s, t in _all_pandas: - if t == typ and s not in serials: - serials.append(s) - break - else: - raise IOError("No unused panda found for type: {}".format(typ)) - return parameterized(serials) - -# Panda providers -TESTED_HW_TYPES = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2, Panda.HW_TYPE_UNO) -test_all_pandas = parameterized_panda_types(TESTED_HW_TYPES) -test_all_gen2_pandas = parameterized_panda_types(GEN2_HW_TYPES) -test_all_gps_pandas = parameterized_panda_types(GPS_HW_TYPES) - -# no grey for speedup, should be sufficiently covered by white for these tests -test_all_gmlan_pandas = parameterized_panda_types([Panda.HW_TYPE_WHITE_PANDA, ]) - -if PARTIAL_TESTS: - # minimal set of pandas to get most of our coverage - # * red panda covers STM32H7 - # * black panda covers STM32F4, GEN2, and GPS - partial_pandas = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA) - test_all_pandas = parameterized_panda_types(partial_pandas) - test_all_gen2_pandas = parameterized_panda_types(partial_pandas) - test_all_gps_pandas = parameterized_panda_types([Panda.HW_TYPE_BLACK_PANDA, ]) def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas=False): @@ -105,92 +29,16 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas= resp = [x for x in r if x[3] == bus and x[0] == msg_id] leftovers = [x for x in r if (x[3] != 0x80 | bus and x[3] != bus) or x[0] != msg_id] - assert_equal(len(leftovers), 0) + assert len(leftovers) == 0 - assert_equal(len(resp), msg_count) - assert_equal(len(sent_echo), msg_count) + assert len(resp) == msg_count + assert len(sent_echo) == msg_count end_time = (end_time - start_time) * 1000.0 comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) * msg_count / end_time return comp_kbps -def panda_connect_and_init(fn=None): - if not fn: - return partial(panda_connect_and_init) - - @wraps(fn) - def wrapper(panda_serials, **kwargs): - # Change panda_serials to a list - if panda_serials is not None: - if not isinstance(panda_serials, list): - panda_serials = [panda_serials, ] - - # Initialize jungle - clear_can_buffers(panda_jungle) - panda_jungle.set_panda_power(True) - panda_jungle.set_can_loopback(False) - panda_jungle.set_obd(False) - panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1) - for bus, speed in BUS_SPEEDS: - panda_jungle.set_can_speed_kbps(bus, speed) - - # wait for all pandas to come up - for _ in range(50): - if set(_all_panda_serials).issubset(set(Panda.list())): - break - time.sleep(0.1) - - # Connect to pandas - def cnnct(s): - if s in panda_serials: - p = Panda(serial=s) - p.reset(reconnect=True) - - p.set_can_loopback(False) - p.set_gmlan(None) - p.set_esp_power(False) - p.set_power_save(False) - for bus, speed in BUS_SPEEDS: - p.set_can_speed_kbps(bus, speed) - clear_can_buffers(p) - p.set_power_save(False) - return p - else: - with Panda(serial=s) as p: - p.reset(reconnect=False) - return None - - with concurrent.futures.ThreadPoolExecutor() as exc: - ps = list(exc.map(cnnct, _all_panda_serials, timeout=20)) - pandas = [p for p in ps if p is not None] - - try: - fn(*pandas, *kwargs) - - # Check if the pandas did not throw any faults while running test - for panda in pandas: - if not panda.bootstub: - #panda.reconnect() - assert panda.health()['fault_status'] == 0 - # Check health of each CAN core after test, normal to fail for test_gen2_loopback on OBD bus, so skipping - if fn.__name__ != "test_gen2_loopback": - for i in range(3): - can_health = panda.can_health(i) - assert can_health['bus_off_cnt'] == 0 - assert can_health['receive_error_cnt'] == 0 - assert can_health['transmit_error_cnt'] == 0 - assert can_health['total_rx_lost_cnt'] == 0 - assert can_health['total_tx_lost_cnt'] == 0 - assert can_health['total_error_cnt'] == 0 - assert can_health['total_tx_checksum_error_cnt'] == 0 - finally: - for p in pandas: - try: - p.close() - except Exception: - pass - return wrapper def clear_can_buffers(panda): # clear tx buffers @@ -207,9 +55,3 @@ def clear_can_buffers(panda): if (time.monotonic() - st) > 10: print("Unable to clear can buffers for panda ", panda.get_serial()) assert False - -def check_signature(p): - assert not p.bootstub, "Flashed firmware not booting. Stuck in bootstub." - firmware_sig = Panda.get_signature_from_firmware(p.get_mcu_type().config.app_path) - panda_sig = p.get_signature() - assert_equal(panda_sig, firmware_sig) diff --git a/tests/hitl/test.sh b/tests/hitl/test.sh index d2de5f3b..7382ac59 100755 --- a/tests/hitl/test.sh +++ b/tests/hitl/test.sh @@ -2,4 +2,6 @@ set -e DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)" -nosetests -x -v --with-flaky -s $(ls $DIR/$1*.py) +cd $DIR + +pytest --durations=0 --maxfail=1 *.py