2022-08-03 08:05:47 +08:00
|
|
|
#!/usr/bin/env python3
|
2021-12-12 06:36:00 +08:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import random
|
|
|
|
from collections import defaultdict
|
2023-04-13 02:07:58 +08:00
|
|
|
from panda import Panda, calculate_checksum
|
2023-08-04 14:55:13 +08:00
|
|
|
from panda import PandaJungle
|
2021-12-12 06:36:00 +08:00
|
|
|
|
2022-09-10 11:13:06 +08:00
|
|
|
H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2]
|
2021-12-12 06:36:00 +08:00
|
|
|
JUNGLE_SERIAL = os.getenv("JUNGLE")
|
|
|
|
H7_PANDAS_EXCLUDE = [] # type: ignore
|
|
|
|
if os.getenv("H7_PANDAS_EXCLUDE"):
|
|
|
|
H7_PANDAS_EXCLUDE = os.getenv("H7_PANDAS_EXCLUDE").strip().split(" ") # type: ignore
|
|
|
|
|
|
|
|
#TODO: REMOVE, temporary list of CAN FD lengths, one in panda python lib MUST be used
|
2022-03-08 06:25:19 +08:00
|
|
|
DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48]
|
2021-12-12 06:36:00 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
def panda_reset():
|
|
|
|
panda_serials = []
|
|
|
|
|
2023-01-23 08:47:59 +08:00
|
|
|
panda_jungle = PandaJungle(JUNGLE_SERIAL)
|
|
|
|
panda_jungle.set_panda_power(False)
|
|
|
|
time.sleep(1)
|
|
|
|
panda_jungle.set_panda_power(True)
|
|
|
|
time.sleep(4)
|
2022-10-13 06:28:20 +08:00
|
|
|
|
|
|
|
for serial in Panda.list():
|
|
|
|
if serial not in H7_PANDAS_EXCLUDE:
|
2023-01-14 08:17:20 +08:00
|
|
|
with Panda(serial=serial) as p:
|
|
|
|
if p.get_type() in H7_HW_TYPES:
|
2023-01-23 08:47:59 +08:00
|
|
|
p.reset()
|
2023-01-14 08:17:20 +08:00
|
|
|
panda_serials.append(serial)
|
2022-10-13 06:28:20 +08:00
|
|
|
|
2023-01-23 08:47:59 +08:00
|
|
|
print("test pandas", panda_serials)
|
|
|
|
assert len(panda_serials) == 2, "Two H7 pandas required"
|
2023-01-14 08:17:20 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
return panda_serials
|
|
|
|
|
|
|
|
def panda_init(serial, enable_canfd=False, enable_non_iso=False):
|
2021-12-12 06:36:00 +08:00
|
|
|
p = Panda(serial=serial)
|
2022-03-08 06:25:19 +08:00
|
|
|
p.set_power_save(False)
|
2021-12-12 06:36:00 +08:00
|
|
|
for bus in range(3):
|
2022-03-08 06:25:19 +08:00
|
|
|
if enable_canfd:
|
|
|
|
p.set_can_data_speed_kbps(bus, 2000)
|
2022-10-13 06:28:20 +08:00
|
|
|
if enable_non_iso:
|
|
|
|
p.set_canfd_non_iso(bus, True)
|
2021-12-12 06:36:00 +08:00
|
|
|
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
|
|
return p
|
|
|
|
|
|
|
|
def canfd_test(p_send, p_recv):
|
2023-01-23 08:47:59 +08:00
|
|
|
for n in range(100):
|
2021-12-12 06:36:00 +08:00
|
|
|
sent_msgs = defaultdict(set)
|
|
|
|
to_send = []
|
|
|
|
for _ in range(200):
|
|
|
|
bus = random.randrange(3)
|
|
|
|
for dlc in range(len(DLC_TO_LEN)):
|
|
|
|
address = random.randrange(1, 1<<29)
|
2023-04-13 02:07:58 +08:00
|
|
|
data = bytearray(random.getrandbits(8) for _ in range(DLC_TO_LEN[dlc]))
|
|
|
|
if len(data) >= 2:
|
|
|
|
data[0] = calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8"))
|
2021-12-12 06:36:00 +08:00
|
|
|
to_send.append([address, 0, data, bus])
|
2023-04-13 02:07:58 +08:00
|
|
|
sent_msgs[bus].add((address, bytes(data)))
|
2021-12-12 06:36:00 +08:00
|
|
|
|
|
|
|
p_send.can_send_many(to_send, timeout=0)
|
|
|
|
|
2023-01-23 08:47:59 +08:00
|
|
|
start_time = time.monotonic()
|
|
|
|
while (time.monotonic() - start_time < 1) and any(len(x) > 0 for x in sent_msgs.values()):
|
2021-12-12 06:36:00 +08:00
|
|
|
incoming = p_recv.can_recv()
|
|
|
|
for msg in incoming:
|
|
|
|
address, _, data, bus = msg
|
2023-04-13 02:07:58 +08:00
|
|
|
if len(data) >= 2:
|
|
|
|
assert calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8")) == data[0]
|
2021-12-12 06:36:00 +08:00
|
|
|
k = (address, bytes(data))
|
|
|
|
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
|
|
|
|
sent_msgs[bus].discard(k)
|
|
|
|
|
|
|
|
for bus in range(3):
|
2023-01-23 08:47:59 +08:00
|
|
|
assert not len(sent_msgs[bus]), f"loop {n}: bus {bus} missing {len(sent_msgs[bus])} messages"
|
|
|
|
|
2021-12-12 06:36:00 +08:00
|
|
|
print("Got all messages intact")
|
|
|
|
|
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
def setup_test(enable_non_iso=False):
|
|
|
|
panda_serials = panda_reset()
|
|
|
|
|
|
|
|
p_send = panda_init(panda_serials[0], enable_canfd=False, enable_non_iso=enable_non_iso)
|
|
|
|
p_recv = panda_init(panda_serials[1], enable_canfd=True, enable_non_iso=enable_non_iso)
|
|
|
|
|
|
|
|
# Check that sending panda CAN FD and BRS are turned off
|
|
|
|
for bus in range(3):
|
|
|
|
health = p_send.can_health(bus)
|
|
|
|
assert not health["canfd_enabled"]
|
|
|
|
assert not health["brs_enabled"]
|
|
|
|
assert health["canfd_non_iso"] == enable_non_iso
|
|
|
|
|
|
|
|
# Receiving panda sends dummy CAN FD message that should enable CAN FD on sender side
|
|
|
|
for bus in range(3):
|
|
|
|
p_recv.can_send(0x200, b"dummymessage", bus)
|
|
|
|
p_recv.can_recv()
|
|
|
|
|
|
|
|
# Check if all tested buses on sending panda have swithed to CAN FD with BRS
|
|
|
|
for bus in range(3):
|
|
|
|
health = p_send.can_health(bus)
|
|
|
|
assert health["canfd_enabled"]
|
|
|
|
assert health["brs_enabled"]
|
|
|
|
assert health["canfd_non_iso"] == enable_non_iso
|
2022-03-08 06:25:19 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
return p_send, p_recv
|
2022-03-08 06:25:19 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
def main():
|
|
|
|
print("[TEST CAN-FD]")
|
|
|
|
p_send, p_recv = setup_test()
|
|
|
|
canfd_test(p_send, p_recv)
|
2022-03-08 06:25:19 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
print("[TEST CAN-FD non-ISO]")
|
|
|
|
p_send, p_recv = setup_test(enable_non_iso=True)
|
|
|
|
canfd_test(p_send, p_recv)
|
2022-03-08 06:25:19 +08:00
|
|
|
|
2022-10-13 06:28:20 +08:00
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|