panda/tests/canfd/test_canfd.py

153 lines
4.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import time
import random
from collections import defaultdict
from panda import Panda, calculate_checksum, DLC_TO_LEN
from panda import PandaJungle
from panda.tests.hitl.helpers import time_many_sends
H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2]
JUNGLE_SERIAL = os.getenv("JUNGLE")
H7_PANDAS_EXCLUDE = [] # type: ignore
if os.getenv("H7_PANDAS_EXCLUDE"):
H7_PANDAS_EXCLUDE = os.getenv("H7_PANDAS_EXCLUDE").strip().split(" ") # type: ignore
def panda_reset():
panda_serials = []
panda_jungle = PandaJungle(JUNGLE_SERIAL)
panda_jungle.set_can_silent(True)
panda_jungle.set_panda_power(False)
time.sleep(1)
panda_jungle.set_panda_power(True)
time.sleep(4)
for serial in Panda.list():
if serial not in H7_PANDAS_EXCLUDE:
with Panda(serial=serial) as p:
if p.get_type() in H7_HW_TYPES:
p.reset()
panda_serials.append(serial)
print("test pandas", panda_serials)
assert len(panda_serials) == 2, "Two H7 pandas required"
return panda_serials
def panda_init(serial, enable_canfd=False, enable_non_iso=False):
p = Panda(serial=serial)
p.set_power_save(False)
for bus in range(3):
p.set_can_speed_kbps(0, 500)
if enable_canfd:
p.set_can_data_speed_kbps(bus, 2000)
if enable_non_iso:
p.set_canfd_non_iso(bus, True)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
return p
def test_canfd_throughput(p, p_recv=None):
two_pandas = p_recv is not None
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
if two_pandas:
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
# enable output mode
else:
p.set_can_loopback(True)
tests = [
[500, 1000, 2000], # speeds
[93, 87, 78], # saturation thresholds
]
for i in range(len(tests[0])):
# set bus 0 data speed to speed
p.set_can_data_speed_kbps(0, tests[0][i])
if p_recv is not None:
p_recv.set_can_data_speed_kbps(0, tests[0][i])
time.sleep(0.05)
comp_kbps = time_many_sends(p, 0, p_recv=p_recv, msg_count=400, two_pandas=two_pandas, msg_len=64)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
saturation_pct = (comp_kbps / tests[0][i]) * 100.0
assert saturation_pct > tests[1][i]
assert saturation_pct < 100
def canfd_test(p_send, p_recv):
for n in range(100):
sent_msgs = defaultdict(set)
to_send = []
for _ in range(200):
bus = random.randrange(3)
for dlc in range(len(DLC_TO_LEN)):
address = random.randrange(1, 1<<29)
data = bytearray(random.getrandbits(8) for _ in range(DLC_TO_LEN[dlc]))
if len(data) >= 2:
data[0] = calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8"))
to_send.append([address, data, bus])
sent_msgs[bus].add((address, bytes(data)))
p_send.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while (time.monotonic() - start_time < 1) and any(len(x) > 0 for x in sent_msgs.values()):
incoming = p_recv.can_recv()
for msg in incoming:
address, data, bus = msg
if len(data) >= 2:
assert calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8")) == data[0]
k = (address, bytes(data))
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[bus].discard(k)
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {n}: bus {bus} missing {len(sent_msgs[bus])} messages"
def setup_test(enable_non_iso=False):
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=False, enable_non_iso=enable_non_iso)
p_recv = panda_init(panda_serials[1], enable_canfd=True, enable_non_iso=enable_non_iso)
# Check that sending panda CAN FD and BRS are turned off
for bus in range(3):
health = p_send.can_health(bus)
assert not health["canfd_enabled"]
assert not health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
# Receiving panda sends dummy CAN FD message that should enable CAN FD on sender side
for bus in range(3):
p_recv.can_send(0x200, b"dummymessage", bus)
p_recv.can_recv()
p_send.can_recv()
# Check if all tested buses on sending panda have swithed to CAN FD with BRS
for bus in range(3):
health = p_send.can_health(bus)
assert health["canfd_enabled"]
assert health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
return p_send, p_recv
def main():
print("[TEST CAN-FD]")
p_send, p_recv = setup_test()
canfd_test(p_send, p_recv)
print("[TEST CAN-FD non-ISO]")
p_send, p_recv = setup_test(enable_non_iso=True)
canfd_test(p_send, p_recv)
print("[TEST CAN-FD THROUGHPUT]")
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=True)
p_recv = panda_init(panda_serials[1], enable_canfd=True)
test_canfd_throughput(p_send, p_recv)
if __name__ == "__main__":
main()