mirror of https://github.com/commaai/panda.git
71 lines
2.1 KiB
Python
Executable File
71 lines
2.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import usb1
|
|
import time
|
|
import struct
|
|
import itertools
|
|
import threading
|
|
from typing import Any, Union, List
|
|
|
|
from panda import Panda
|
|
|
|
JUNGLE = "JUNGLE" in os.environ
|
|
if JUNGLE:
|
|
from panda import PandaJungle
|
|
|
|
# Generate unique messages
|
|
NUM_MESSAGES_PER_BUS = 10000
|
|
messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)]
|
|
tx_messages = list(itertools.chain.from_iterable(([[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] for msg in messages)))
|
|
|
|
def flood_tx(panda):
|
|
print('Sending!')
|
|
transferred = 0
|
|
while True:
|
|
try:
|
|
print(f"Sending block {transferred}-{len(tx_messages)}: ", end="")
|
|
panda.can_send_many(tx_messages[transferred:], timeout=10)
|
|
print("OK")
|
|
break
|
|
except usb1.USBErrorTimeout as e:
|
|
transferred += (e.transferred // 16)
|
|
print("timeout, transferred: ", transferred)
|
|
|
|
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")
|
|
|
|
if __name__ == "__main__":
|
|
serials = Panda.list()
|
|
receiver: Union[Panda, PandaJungle]
|
|
if JUNGLE:
|
|
sender = Panda()
|
|
receiver = PandaJungle()
|
|
else:
|
|
if len(serials) != 2:
|
|
raise Exception("Connect two pandas to perform this test!")
|
|
sender = Panda(serials[0])
|
|
receiver = Panda(serials[1])
|
|
receiver.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
|
|
sender.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
|
|
# Start transmisson
|
|
threading.Thread(target=flood_tx, args=(sender,)).start()
|
|
|
|
# Receive as much as we can, and stop when there hasn't been anything for a second
|
|
rx: List[Any] = []
|
|
old_len = 0
|
|
last_change = time.monotonic()
|
|
while time.monotonic() - last_change < 1:
|
|
if old_len < len(rx):
|
|
last_change = time.monotonic()
|
|
old_len = len(rx)
|
|
|
|
rx.extend(receiver.can_recv())
|
|
print(f"Received {len(rx)} messages")
|
|
|
|
# Check if we received everything
|
|
for bus in range(3):
|
|
received_msgs = {bytes(m[2]) for m in filter(lambda m, b=bus: m[3] == b, rx)} # type: ignore
|
|
dropped_msgs = set(messages).difference(received_msgs)
|
|
print(f"Bus {bus} dropped msgs: {len(list(dropped_msgs))} / {len(messages)}")
|