misc hitl test cleanup (#1562)

* misc hitl test cleanup

* adjust that

* update

* tres fix

---------

Co-authored-by: Bruce Wayne <batman@comma.ai>
Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
Adeeb Shihadeh 2023-08-06 16:30:21 -07:00 committed by GitHub
parent 3dc3b58e20
commit 77b09a3160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 64 additions and 127 deletions

2
Jenkinsfile vendored
View File

@ -1,7 +1,6 @@
def docker_run(String step_label, int timeout_mins, String cmd) {
timeout(time: timeout_mins, unit: 'MINUTES') {
sh script: "docker run --rm --privileged \
--env PARTIAL_TESTS=${env.PARTIAL_TESTS} \
--env PYTHONWARNINGS=error \
--volume /dev/bus/usb:/dev/bus/usb \
--volume /var/run/dbus:/var/run/dbus \
@ -62,7 +61,6 @@ pipeline {
agent any
environment {
CI = "1"
//PARTIAL_TESTS = "${env.BRANCH_NAME == 'master' ? ' ' : '1'}"
PYTHONWARNINGS= "error"
DOCKER_IMAGE_TAG = "panda:build-${env.GIT_COMMIT}"

View File

@ -1,45 +0,0 @@
import pytest
import random
from panda import Panda, PandaDFU
from panda.python.spi import SpiDevice
@pytest.mark.expected_logs(1)
def test_dfu(p):
app_mcu_type = p.get_mcu_type()
dfu_serial = p.get_dfu_serial()
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=20), "failed to enter DFU"
dfu = PandaDFU(dfu_serial)
assert dfu.get_mcu_type() == app_mcu_type
assert dfu_serial in PandaDFU.list()
dfu._handle.clear_status()
dfu.reset()
p.reconnect()
@pytest.mark.expected_logs(1)
@pytest.mark.test_panda_types((Panda.HW_TYPE_TRES, ))
def test_dfu_with_spam(p):
dfu_serial = p.get_dfu_serial()
# enter DFU
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=20), "failed to enter DFU"
# send junk
for _ in range(10):
speed = 1000000 * random.randint(1, 5)
d = SpiDevice(speed=speed)
with d.acquire() as spi:
dat = [random.randint(0, 255) for _ in range(random.randint(1, 100))]
spi.xfer(dat)
# should still show up
assert dfu_serial in PandaDFU.list()

View File

@ -9,11 +9,30 @@ def check_signature(p):
assert not p.bootstub, "Flashed firmware not booting. Stuck in bootstub."
assert p.up_to_date()
@pytest.mark.expected_logs(1)
def test_dfu(p):
app_mcu_type = p.get_mcu_type()
dfu_serial = p.get_dfu_serial()
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=19), "failed to enter DFU"
dfu = PandaDFU(dfu_serial)
assert dfu.get_mcu_type() == app_mcu_type
assert dfu_serial in PandaDFU.list()
dfu._handle.clear_status()
dfu.reset()
p.reconnect()
# TODO: make more comprehensive bootstub tests and run on a few production ones + current
# TODO: also test release-signed app
@pytest.mark.execution_timeout(30)
@pytest.mark.expected_logs(1, 2)
def test_a_known_bootstub(p):
def test_known_bootstub(p):
"""
Test that compiled app can work with known production bootstub
"""
@ -60,13 +79,13 @@ def test_a_known_bootstub(p):
@pytest.mark.execution_timeout(25)
@pytest.mark.expected_logs(1)
def test_b_recover(p):
def test_recover(p):
assert p.recover(timeout=30)
check_signature(p)
@pytest.mark.execution_timeout(25)
@pytest.mark.expected_logs(3)
def test_c_flash(p):
def test_flash(p):
# test flash from bootstub
serial = p._serial
assert serial is not None

View File

@ -1,4 +1,3 @@
import sys
import time
import pytest
from flaky import flaky
@ -60,9 +59,6 @@ def test_reliability(p):
et = (time.monotonic() - st) * 1000.0
assert et < 20
sys.stdout.write("P")
sys.stdout.flush()
@flaky(max_runs=6, min_passes=1)
def test_throughput(p):
# enable output mode
@ -90,10 +86,6 @@ def test_gmlan(p):
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
p.set_can_speed_kbps(1, SPEED_NORMAL)
p.set_can_speed_kbps(2, SPEED_NORMAL)
p.set_can_speed_kbps(3, SPEED_GMLAN)
# set gmlan on CAN2
for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
p.set_gmlan(bus)

View File

@ -7,30 +7,17 @@ from flaky import flaky
from collections import defaultdict
from panda import Panda
from panda.tests.hitl.conftest import PandaGroup, PARTIAL_TESTS
from panda.tests.hitl.helpers import time_many_sends, clear_can_buffers
from panda.tests.hitl.conftest import PandaGroup
from panda.tests.hitl.helpers import time_many_sends, get_random_can_messages, clear_can_buffers
@flaky(max_runs=3, min_passes=1)
@pytest.mark.execution_timeout(35)
def test_send_recv(p, panda_jungle):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.can_send_many([(0x1ba, 0, b"message", 0)] * 2)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
time.sleep(0.1)
clear_can_buffers(p_send, speed)
clear_can_buffers(p_recv, speed)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
@ -49,27 +36,10 @@ def test_send_recv(p, panda_jungle):
@pytest.mark.execution_timeout(30)
def test_latency(p, panda_jungle):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.set_can_speed_kbps(0, 500)
p_recv.set_can_speed_kbps(0, 500)
time.sleep(0.05)
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)] * 10)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
# clear can buffers
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
clear_can_buffers(p_send, speed)
clear_can_buffers(p_recv, speed)
latencies = []
comp_kbps_list = []
@ -166,10 +136,6 @@ def test_gen2_loopback(p, panda_jungle):
test(panda_jungle, p, 0x18DB33F1)
def test_bulk_write(p, panda_jungle):
# TODO: doesn't work in partial test mode
if PARTIAL_TESTS:
return
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000
@ -204,27 +170,15 @@ def test_bulk_write(p, panda_jungle):
if len(rx) != 4 * NUM_MESSAGES_PER_BUS:
raise Exception("Did not receive all messages!")
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
def test_message_integrity(p):
clear_can_buffers(p)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
n = 250
for i in range(n):
for i in range(250):
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
to_send = []
for __ in range(random.randrange(100)):
bus = random.randrange(3)
addr = random.randrange(1, 1<<29)
dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))])
sent_msgs[bus].add((addr, dat))
to_send.append([addr, None, dat, bus])
to_send = get_random_can_messages(random.randrange(100))
for m in to_send:
sent_msgs[m[3]].add((m[0], m[2]))
p.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
@ -233,13 +187,12 @@ def test_message_integrity(p):
for msg in recvd:
if msg[3] >= 128:
k = (msg[0], bytes(msg[2]))
assert k in sent_msgs[msg[3]-128], f"message {k} was never sent on bus {bus}"
bus = msg[3]-128
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[msg[3]-128].discard(k)
# if a set isn't empty, messages got dropped
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
print("Got all messages intact")

View File

@ -3,13 +3,33 @@ import pytest
import random
from unittest.mock import patch
from panda import Panda
from panda.python.spi import PandaProtocolMismatch, PandaSpiNackResponse
from panda import Panda, PandaDFU
from panda.python.spi import SpiDevice, PandaProtocolMismatch, PandaSpiNackResponse
pytestmark = [
pytest.mark.test_panda_types((Panda.HW_TYPE_TRES, ))
]
@pytest.mark.skip("doesn't work, bootloader seems to ignore commands once it sees junk")
@pytest.mark.expected_logs(0)
def test_dfu_with_spam(p):
dfu_serial = p.get_dfu_serial()
# enter DFU
p.reset(enter_bootstub=True)
p.reset(enter_bootloader=True)
assert Panda.wait_for_dfu(dfu_serial, timeout=19), "failed to enter DFU"
# send junk
d = SpiDevice()
for _ in range(9):
with d.acquire() as spi:
dat = [random.randint(-1, 255) for _ in range(random.randint(1, 100))]
spi.xfer(dat)
# should still show up
assert dfu_serial in PandaDFU.list()
class TestSpi:
def _ping(self, mocker, panda):
# should work with no retries

View File

@ -49,7 +49,7 @@ def test_fan_cooldown(p):
def test_fan_overshoot(p):
if p.get_type() == Panda.HW_TYPE_DOS:
pytest.skip("fan controller overshoots on fans that need stall recovery")
pytest.skip("panda's fan controller overshoots on the comma three fans that need stall recovery")
# make sure it's stopped completely
p.set_fan_power(0)

View File

@ -28,12 +28,7 @@ class PandaGroup:
TESTED = (Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2, Panda.HW_TYPE_UNO)
if PARTIAL_TESTS:
# minimal set of pandas to get most of our coverage
# * red panda covers GEN2, STM32H7
# * black panda covers STM32F4, and GEN2
PandaGroup.TESTED = (Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_RED_PANDA) # type: ignore
elif HW_TYPES is not None:
if HW_TYPES is not None:
PandaGroup.TESTED = [bytes([int(x), ]) for x in HW_TYPES.strip().split(",")] # type: ignore

View File

@ -1,5 +1,6 @@
import time
import random
from typing import Optional
def get_random_can_messages(n):
@ -51,7 +52,11 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, two_pandas=False):
return comp_kbps
def clear_can_buffers(panda):
def clear_can_buffers(panda, speed: Optional[int] = None):
if speed is not None:
for bus in range(3):
panda.set_can_speed_kbps(bus, speed)
# clear tx buffers
for i in range(4):
panda.can_clear(i)