Files
panda-meb/tests/hitl/4_can_loopback.py
Adeeb Shihadeh 92ed48ae5c HITL tests: add timeouts (#1352)
* set timeout

* closest

* more timeout

* fan test is slow

* retry

* break

* bump
2023-06-04 01:00:17 -07:00

246 lines
7.4 KiB
Python

import os
import time
import pytest
import random
import threading
from flaky import flaky
from collections import defaultdict
from panda import Panda
from panda.tests.hitl.conftest import PandaGroup, PARTIAL_TESTS
from panda.tests.hitl.helpers import time_many_sends, clear_can_buffers
@flaky(max_runs=3, min_passes=1)
@pytest.mark.execution_timeout(35)
def test_send_recv(p, panda_jungle):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.can_send_many([(0x1ba, 0, b"message", 0)] * 2)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
time.sleep(0.1)
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
saturation_pct = (comp_kbps / speed) * 100.0
assert 80 < saturation_pct < 100
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, {:6.2f}%".format(bus, speed, comp_kbps, saturation_pct))
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
@flaky(max_runs=6, min_passes=1)
@pytest.mark.execution_timeout(30)
def test_latency(p, panda_jungle):
def test(p_send, p_recv):
p_send.set_can_loopback(False)
p_recv.set_can_loopback(False)
p_send.set_can_speed_kbps(0, 500)
p_recv.set_can_speed_kbps(0, 500)
time.sleep(0.05)
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)] * 10)
time.sleep(0.05)
p_recv.can_recv()
p_send.can_recv()
for bus in (0, 1, 2):
for speed in (10, 20, 50, 100, 125, 250, 500, 1000):
p_send.set_can_speed_kbps(bus, speed)
p_recv.set_can_speed_kbps(bus, speed)
time.sleep(0.1)
# clear can buffers
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
latencies = []
comp_kbps_list = []
saturation_pcts = []
num_messages = 100
for _ in range(num_messages):
st = time.monotonic()
p_send.can_send(0x1ab, b"message", bus)
r = []
while len(r) < 1 and (time.monotonic() - st) < 5:
r = p_recv.can_recv()
et = time.monotonic()
r_echo = []
while len(r_echo) < 1 and (time.monotonic() - st) < 10:
r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0:
print("r: {}, r_echo: {}".format(r, r_echo))
assert len(r) == 1
assert len(r_echo) == 1
et = (et - st) * 1000.0
comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / et
latency = et - ((1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / speed)
assert latency < 5.0
saturation_pct = (comp_kbps / speed) * 100.0
latencies.append(latency)
comp_kbps_list.append(comp_kbps)
saturation_pcts.append(saturation_pct)
average_latency = sum(latencies) / num_messages
assert average_latency < 1.0
average_comp_kbps = sum(comp_kbps_list) / num_messages
average_saturation_pct = sum(saturation_pcts) / num_messages
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
@pytest.mark.panda_expect_can_error
@pytest.mark.test_panda_types(PandaGroup.GEN2)
def test_gen2_loopback(p, panda_jungle):
def test(p_send, p_recv, address=None):
for bus in range(4):
obd = False
if bus == 3:
obd = True
bus = 1
# Clear buses
clear_can_buffers(p_send)
clear_can_buffers(p_recv)
# Send a random string
addr = address if address else random.randint(1, 2000)
string = b"test" + os.urandom(4)
p_send.set_obd(obd)
p_recv.set_obd(obd)
time.sleep(0.2)
p_send.can_send(addr, string, bus)
time.sleep(0.2)
content = p_recv.can_recv()
# Check amount of messages
assert len(content) == 1
# Check content
assert content[0][0] == addr and content[0][2] == string
# Check bus
assert content[0][3] == bus
print("Bus:", bus, "address:", addr, "OBD:", obd, "OK")
# Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
test(p, panda_jungle)
test(panda_jungle, p)
# Test extended frame address with ELM327 mode
p.set_safety_mode(Panda.SAFETY_ELM327)
test(p, panda_jungle, 0x18DB33F1)
test(panda_jungle, p, 0x18DB33F1)
def test_bulk_write(p, panda_jungle):
# TODO: doesn't work in partial test mode
if PARTIAL_TESTS:
return
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000
def flood_tx(panda):
print('Sending!')
msg = b"\xaa" * 8
packet = []
# start with many messages on a single bus (higher contention for single TX ring buffer)
packet += [[0xaa, None, msg, 0]] * NUM_MESSAGES_PER_BUS
# end with many messages on multiple buses
packet += [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS
# Disable timeout
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
panda.can_send_many(packet, timeout=0)
print(f"Done sending {4 * NUM_MESSAGES_PER_BUS} messages!", time.monotonic())
print(panda.health())
# Start transmisson
threading.Thread(target=flood_tx, args=(p,)).start()
# Receive as much as we can in a few second time period
rx = []
old_len = 0
start_time = time.monotonic()
while time.monotonic() - start_time < 5 or len(rx) > old_len:
old_len = len(rx)
rx.extend(panda_jungle.can_recv())
print(f"Received {len(rx)} messages", time.monotonic())
# All messages should have been received
if len(rx) != 4 * NUM_MESSAGES_PER_BUS:
raise Exception("Did not receive all messages!")
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
def test_message_integrity(p):
clear_can_buffers(p)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_can_loopback(True)
n = 250
for i in range(n):
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
to_send = []
for __ in range(random.randrange(100)):
bus = random.randrange(3)
addr = random.randrange(1, 1<<29)
dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))])
sent_msgs[bus].add((addr, dat))
to_send.append([addr, None, dat, bus])
p.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while time.monotonic() - start_time < 2 and any(len(sent_msgs[bus]) for bus in range(3)):
recvd = p.can_recv()
for msg in recvd:
if msg[3] >= 128:
k = (msg[0], bytes(msg[2]))
assert k in sent_msgs[msg[3]-128], f"message {k} was never sent on bus {bus}"
sent_msgs[msg[3]-128].discard(k)
# if a set isn't empty, messages got dropped
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
print("Got all messages intact")