import os import time import pytest import random import threading from flaky import flaky from collections import defaultdict from panda import Panda from panda.tests.hitl.conftest import PandaGroup from panda.tests.hitl.helpers import time_many_sends, get_random_can_messages, clear_can_buffers @flaky(max_runs=3, min_passes=1) @pytest.mark.timeout(35) def test_send_recv(p, panda_jungle): def test(p_send, p_recv): for bus in (0, 1, 2): for speed in (10, 20, 50, 100, 125, 250, 500, 1000): clear_can_buffers(p_send, speed) clear_can_buffers(p_recv, speed) comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True) saturation_pct = (comp_kbps / speed) * 100.0 assert 80 < saturation_pct < 100 print(f"two pandas bus {bus}, 100 messages at speed {speed:4d}, comp speed is {comp_kbps:7.2f}, {saturation_pct:6.2f}%") # Run tests in both directions p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) test(p, panda_jungle) test(panda_jungle, p) @flaky(max_runs=6, min_passes=1) @pytest.mark.timeout(30) def test_latency(p, panda_jungle): def test(p_send, p_recv): for bus in (0, 1, 2): for speed in (10, 20, 50, 100, 125, 250, 500, 1000): clear_can_buffers(p_send, speed) clear_can_buffers(p_recv, speed) latencies = [] comp_kbps_list = [] saturation_pcts = [] num_messages = 100 for _ in range(num_messages): st = time.monotonic() p_send.can_send(0x1ab, b"message", bus) r = [] while len(r) < 1 and (time.monotonic() - st) < 5: r = p_recv.can_recv() et = time.monotonic() r_echo = [] while len(r_echo) < 1 and (time.monotonic() - st) < 10: r_echo = p_send.can_recv() if len(r) == 0 or len(r_echo) == 0: print(f"r: {r}, r_echo: {r_echo}") assert len(r) == 1 assert len(r_echo) == 1 et = (et - st) * 1000.0 comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / et latency = et - ((1 + 11 + 1 + 1 + 1 + 4 + 8 * 8 + 15 + 1 + 1 + 1 + 7) / speed) assert latency < 5.0 saturation_pct = (comp_kbps / speed) * 100.0 latencies.append(latency) comp_kbps_list.append(comp_kbps) saturation_pcts.append(saturation_pct) average_latency = sum(latencies) / num_messages assert average_latency < 1.0 average_comp_kbps = sum(comp_kbps_list) / num_messages average_saturation_pct = sum(saturation_pcts) / num_messages print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}" .format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct)) # Run tests in both directions p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) test(p, panda_jungle) test(panda_jungle, p) @pytest.mark.panda_expect_can_error @pytest.mark.test_panda_types(PandaGroup.GEN2) def test_gen2_loopback(p, panda_jungle): def test(p_send, p_recv, address=None): for bus in range(4): obd = False if bus == 3: obd = True bus = 1 # Clear buses clear_can_buffers(p_send) clear_can_buffers(p_recv) # Send a random string addr = address if address else random.randint(1, 2000) string = b"test" + os.urandom(4) p_send.set_obd(obd) p_recv.set_obd(obd) time.sleep(0.2) p_send.can_send(addr, string, bus) time.sleep(0.2) content = p_recv.can_recv() # Check amount of messages assert len(content) == 1 # Check content assert content[0][0] == addr and content[0][1] == string # Check bus assert content[0][2] == bus print("Bus:", bus, "address:", addr, "OBD:", obd, "OK") # Run tests in both directions p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) test(p, panda_jungle) test(panda_jungle, p) # Test extended frame address with ELM327 mode p.set_safety_mode(Panda.SAFETY_ELM327) test(p, panda_jungle, 0x18DB33F1) test(panda_jungle, p, 0x18DB33F1) # TODO: why it's not being reset by fixtures reinit? p.set_obd(False) panda_jungle.set_obd(False) def test_bulk_write(p, panda_jungle): # The TX buffers on pandas is 0x100 in length. NUM_MESSAGES_PER_BUS = 10000 def flood_tx(panda): print('Sending!') msg = b"\xaa" * 8 packet = [] # start with many messages on a single bus (higher contention for single TX ring buffer) packet += [[0xaa, msg, 0]] * NUM_MESSAGES_PER_BUS # end with many messages on multiple buses packet += [[0xaa, msg, 0], [0xaa, msg, 1], [0xaa, msg, 2]] * NUM_MESSAGES_PER_BUS # Disable timeout panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) panda.can_send_many(packet, timeout=0) print(f"Done sending {4 * NUM_MESSAGES_PER_BUS} messages!", time.monotonic()) print(panda.health()) # Start transmisson threading.Thread(target=flood_tx, args=(p,)).start() # Receive as much as we can in a few second time period rx = [] old_len = 0 start_time = time.monotonic() while time.monotonic() - start_time < 5 or len(rx) > old_len: old_len = len(rx) rx.extend(panda_jungle.can_recv()) print(f"Received {len(rx)} messages", time.monotonic()) # All messages should have been received if len(rx) != 4 * NUM_MESSAGES_PER_BUS: raise Exception("Did not receive all messages!") def test_message_integrity(p): p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_can_loopback(True) for i in range(250): sent_msgs = defaultdict(set) for _ in range(random.randrange(10)): to_send = get_random_can_messages(random.randrange(100)) for m in to_send: sent_msgs[m[2]].add((m[0], m[1])) p.can_send_many(to_send, timeout=0) start_time = time.monotonic() while time.monotonic() - start_time < 2 and any(len(sent_msgs[bus]) for bus in range(3)): recvd = p.can_recv() for msg in recvd: if msg[2] >= 128: k = (msg[0], bytes(msg[1])) bus = msg[2]-128 assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}" sent_msgs[msg[2]-128].discard(k) # if a set isn't empty, messages got dropped for bus in range(3): assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages" print("Got all messages intact")