* reorg

* one less config file

* lil more

* lil more
This commit is contained in:
Adeeb Shihadeh
2025-05-19 19:02:59 -07:00
committed by GitHub
parent 20c80e3929
commit 2f47af4b2e
40 changed files with 16 additions and 360 deletions

43
scripts/benchmark.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
import time
from contextlib import contextmanager
from panda import Panda, PandaDFU
from panda.tests.hitl.helpers import get_random_can_messages
@contextmanager
def print_time(desc):
start = time.perf_counter()
yield
end = time.perf_counter()
print(f"{end - start:.2f}s - {desc}")
if __name__ == "__main__":
with print_time("Panda()"):
p = Panda()
with print_time("PandaDFU.list()"):
PandaDFU.list()
fxn = [
'reset',
'reconnect',
'up_to_date',
'health',
#'flash',
]
for f in fxn:
with print_time(f"Panda.{f}()"):
getattr(p, f)()
p.set_can_loopback(True)
for n in range(6):
msgs = get_random_can_messages(int(10**n))
with print_time(f"Panda.can_send_many() - {len(msgs)} msgs"):
p.can_send_many(msgs)
with print_time("Panda.can_recv()"):
m = p.can_recv()

View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
black_panda = None
other_panda = None
# find out which one is black
if pandas[0].is_black() and not pandas[1].is_black():
black_panda = pandas[0]
other_panda = pandas[1]
elif not pandas[0].is_black() and pandas[1].is_black():
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors)
# Toggle relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(1)
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(1)
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_obd(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
_ = black_panda.can_recv() # can echo
cans_loop = other_panda.can_recv()
else:
_ = other_panda.can_recv() # can echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
print(" Loop on bus", str(loop[2]))
loop_buses.append(loop[2])
if len(cans_loop) == 0:
print(" No loop")
assert not os.getenv("NOASSERT")
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
assert not os.getenv("NOASSERT")
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
black_panda = None
other_panda = None
# find out which one is black
if pandas[0].is_black() and not pandas[1].is_black():
black_panda = pandas[0]
other_panda = pandas[1]
elif not pandas[0].is_black() and pandas[1].is_black():
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
start_time = time.time()
temp_start_time = start_time
while True:
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration)
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration)
counter += 1
runtime = time.time() - start_time
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors,
"Content errors:", content_errors, "Runtime: ", runtime)
if (time.time() - temp_start_time) > 3600 * 6:
# Toggle relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(1)
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(1)
temp_start_time = time.time()
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration):
global nonzero_bus_errors, zero_bus_errors, content_errors
if direction:
print("***************** TESTING (BLACK --> OTHER) *****************")
else:
print("***************** TESTING (OTHER --> BLACK) *****************")
for send_bus, obd, recv_buses in test_array:
black_panda.send_heartbeat()
other_panda.send_heartbeat()
print("\ntest can: ", send_bus, " OBD: ", obd)
# set OBD on black panda
black_panda.set_obd(True if obd else None)
# clear and flush
if direction:
black_panda.can_clear(send_bus)
else:
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
if direction:
other_panda.can_clear(recv_bus)
else:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
if direction:
black_panda.can_send(at, st, send_bus)
else:
other_panda.can_send(at, st, send_bus)
time.sleep(0.1)
# check for receive
if direction:
_ = black_panda.can_recv() # cans echo
cans_loop = other_panda.can_recv()
else:
_ = other_panda.can_recv() # cans echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
print(" Loop on bus", str(loop[2]))
loop_buses.append(loop[2])
if len(cans_loop) == 0:
print(" No loop")
assert os.getenv("NOASSERT")
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
if len(loop_buses) == 0:
zero_bus_errors += 1
else:
nonzero_bus_errors += 1
assert os.getenv("NOASSERT")
else:
print(" TEST PASSED")
time.sleep(sleep_duration)
print("\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

136
scripts/black_white_relay_test.py Executable file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
# Relay test with loopback between black panda (+ harness and power) and white/grey panda
# Tests the relay switching multiple times / second by looking at the buses on which loop occurs.
import os
import time
import random
import argparse
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
counter = 0
open_errors = 0
closed_errors = 0
content_errors = 0
def run_test(sleep_duration):
global counter, open_errors, closed_errors
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
raise Exception("Connect white/grey and black panda to run this test!")
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
# find out which one is black
type0 = pandas[0].get_type()
type1 = pandas[1].get_type()
black_panda = None
other_panda = None
if type0 == "\x03" and type1 != "\x03":
black_panda = pandas[0]
other_panda = pandas[1]
elif type0 != "\x03" and type1 == "\x03":
black_panda = pandas[1]
other_panda = pandas[0]
else:
raise Exception("Connect white/grey and black panda to run this test!")
# disable safety modes
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
other_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
# test health packet
print("black panda health", black_panda.health())
print("other panda health", other_panda.health())
# test black -> other
while True:
# Switch on relay
black_panda.set_safety_mode(CarParams.SafetyModel.allOutput)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0])):
open_errors += 1
raise Exception("Open error")
# Switch off relay
black_panda.set_safety_mode(CarParams.SafetyModel.silent)
time.sleep(0.05)
if not test_buses(black_panda, other_panda, (0, False, [0, 2])):
closed_errors += 1
raise Exception("Close error")
counter += 1
print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors)
def test_buses(black_panda, other_panda, test_obj):
global content_errors
send_bus, obd, recv_buses = test_obj
black_panda.send_heartbeat()
other_panda.send_heartbeat()
# Set OBD on send panda
other_panda.set_obd(True if obd else None)
# clear and flush
other_panda.can_clear(send_bus)
for recv_bus in recv_buses:
black_panda.can_clear(recv_bus)
black_panda.can_recv()
other_panda.can_recv()
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
other_panda.can_send(at, st, send_bus)
time.sleep(0.05)
# check for receive
_ = other_panda.can_recv() # can echo
cans_loop = black_panda.can_recv()
loop_buses = []
for loop in cans_loop:
if (loop[0] != at) or (loop[1] != st):
content_errors += 1
loop_buses.append(loop[2])
# test loop buses
recv_buses.sort()
loop_buses.sort()
if(recv_buses != loop_buses):
return False
else:
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

49
scripts/bulk_write_test.py Executable file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import os
import time
import threading
from typing import Any
from opendbc.car.structs import CarParams
from panda import Panda
JUNGLE = "JUNGLE" in os.environ
if JUNGLE:
from panda import PandaJungle
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000
def flood_tx(panda):
print('Sending!')
msg = b"\xaa" * 4
packet = [[0xaa, msg, 0], [0xaa, msg, 1], [0xaa, msg, 2]] * NUM_MESSAGES_PER_BUS
panda.can_send_many(packet, timeout=10000)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")
if __name__ == "__main__":
serials = Panda.list()
if JUNGLE:
sender = Panda()
receiver = PandaJungle()
else:
if len(serials) != 2:
raise Exception("Connect two pandas to perform this test!")
sender = Panda(serials[0])
receiver = Panda(serials[1]) # type: ignore
receiver.set_safety_mode(CarParams.SafetyModel.allOutput)
sender.set_safety_mode(CarParams.SafetyModel.allOutput)
# Start transmisson
threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can in a few second time period
rx: list[Any] = []
old_len = 0
start_time = time.time()
while time.time() - start_time < 3 or len(rx) > old_len:
old_len = len(rx)
print(old_len)
rx.extend(receiver.can_recv())
print(f"Received {len(rx)} messages")

29
scripts/can_health.py Executable file
View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import time
import re
from panda import Panda
RED = '\033[91m'
GREEN = '\033[92m'
def colorize_errors(value):
if isinstance(value, str):
if re.search(r'(?i)No error', value):
return f'{GREEN}{value}\033[0m'
elif re.search(r'(?i)(?<!No error\s)(err|error)', value):
return f'{RED}{value}\033[0m'
return str(value)
if __name__ == "__main__":
panda = Panda()
while True:
print(chr(27) + "[2J") # clear screen
print("Connected to " + ("internal panda" if panda.is_internal() else "External panda") + f" id: {panda.get_serial()[0]}: {panda.get_version()}")
for bus in range(3):
print(f"\nBus {bus}:")
health = panda.can_health(bus)
for key, value in health.items():
print(f"{key}: {colorize_errors(value)} ", end=" ")
print()
time.sleep(1)

46
scripts/can_printer.py Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
import os
import time
from collections import defaultdict
import binascii
from opendbc.car.structs import CarParams
from panda import Panda
# fake
def sec_since_boot():
return time.time()
def can_printer():
p = Panda()
print(f"Connected to id: {p.get_serial()[0]}: {p.get_version()}")
time.sleep(1)
p.can_clear(0xFFFF)
p.set_safety_mode(CarParams.SafetyModel.allOutput)
start = sec_since_boot()
lp = sec_since_boot()
all_msgs = defaultdict(list)
canbus = os.getenv("CAN")
if canbus == "3":
p.set_obd(True)
canbus = "1"
while True:
can_recv = p.can_recv()
for addr, dat, bus in can_recv:
if canbus is None or str(bus) == canbus:
all_msgs[(addr, bus)].append((dat))
if sec_since_boot() - lp > 0.1:
dd = chr(27) + "[2J"
dd += "%5.2f\n" % (sec_since_boot() - start)
for (addr, bus), dat_log in sorted(all_msgs.items()):
dd += "%d: %s(%6d): %s\n" % (bus, "%04X(%4d)" % (addr, addr), len(dat_log), binascii.hexlify(dat_log[-1]).decode())
print(dd)
lp = sec_since_boot()
if __name__ == "__main__":
can_printer()

97
scripts/check_fw_size.py Executable file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
import subprocess
from collections import defaultdict
def check_space(file, mcu):
MCUS = {
"H7": {
".flash": 1024*1024, # FLASH
".dtcmram": 128*1024, # DTCMRAM
".itcmram": 64*1024, # ITCMRAM
".axisram": 320*1024, # AXI SRAM
".sram12": 32*1024, # SRAM1(16kb) + SRAM2(16kb)
".sram4": 16*1024, # SRAM4
".backup_sram": 4*1024, # SRAM4
},
"F4": {
".flash": 1024*1024, # FLASH
".dtcmram": 256*1024, # RAM
".ram_d1": 64*1024, # RAM2
},
}
IGNORE_LIST = [
".ARM.attributes",
".comment",
".debug_line",
".debug_info",
".debug_abbrev",
".debug_aranges",
".debug_str",
".debug_ranges",
".debug_loc",
".debug_frame",
".debug_line_str",
".debug_rnglists",
".debug_loclists",
]
FLASH = [
".isr_vector",
".text",
".rodata",
".data"
]
RAM = [
".data",
".bss",
"._user_heap_stack" # _user_heap_stack considered free?
]
result = {}
calcs = defaultdict(int)
output = str(subprocess.check_output(f"arm-none-eabi-size -x --format=sysv {file}", shell=True), 'utf-8')
for row in output.split('\n'):
pop = False
line = row.split()
if len(line) == 3 and line[0].startswith('.'):
if line[0] in IGNORE_LIST:
continue
result[line[0]] = [line[1], line[2]]
if line[0] in FLASH:
calcs[".flash"] += int(line[1], 16)
pop = True
if line[0] in RAM:
calcs[".dtcmram"] += int(line[1], 16)
pop = True
if pop:
result.pop(line[0])
if len(result):
for line in result:
calcs[line] += int(result[line][0], 16)
print(f"=======SUMMARY FOR {mcu} FILE {file}=======")
for line in calcs:
if line in MCUS[mcu]:
used_percent = (100 - (MCUS[mcu][line] - calcs[line]) / MCUS[mcu][line] * 100)
print(f"SECTION: {line} size: {MCUS[mcu][line]} USED: {calcs[line]}({used_percent:.2f}%) FREE: {MCUS[mcu][line] - calcs[line]}")
else:
print(line, calcs[line])
print()
if __name__ == "__main__":
# red panda
check_space("../board/obj/bootstub.panda_h7.elf", "H7")
check_space("../board/obj/panda_h7.elf", "H7")
# black panda
check_space("../board/obj/bootstub.panda.elf", "F4")
check_space("../board/obj/panda.elf", "F4")
# jungle v1
check_space("../board/jungle/obj/bootstub.panda_jungle.elf", "F4")
check_space("../board/jungle/obj/panda_jungle.elf", "F4")
# jungle v2
check_space("../board/jungle/obj/bootstub.panda_jungle_h7.elf", "H7")
check_space("../board/jungle/obj/panda_jungle_h7.elf", "H7")

21
scripts/ci_shell.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
OP_ROOT="$DIR/../../"
PANDA_ROOT="$DIR/../"
if [ -z "$BUILD" ]; then
docker pull docker.io/commaai/panda:latest
else
docker build --cache-from docker.io/commaai/panda:latest -t docker.io/commaai/panda:latest -f $PANDA_ROOT/Dockerfile $PANDA_ROOT
fi
docker run \
-it \
--rm \
--volume $OP_ROOT:$OP_ROOT \
--workdir $PWD \
--env PYTHONPATH=$OP_ROOT \
docker.io/commaai/panda:latest \
/bin/bash

62
scripts/debug_console.py Executable file
View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
import os
import sys
import time
import select
import codecs
from panda import Panda
setcolor = ["\033[1;32;40m", "\033[1;31;40m"]
unsetcolor = "\033[00m"
port_number = int(os.getenv("PORT", "0"))
claim = os.getenv("CLAIM") is not None
no_color = os.getenv("NO_COLOR") is not None
no_reconnect = os.getenv("NO_RECONNECT") is not None
if __name__ == "__main__":
while True:
try:
serials = Panda.list()
if os.getenv("SERIAL"):
serials = [x for x in serials if x == os.getenv("SERIAL")]
pandas = [Panda(x, claim=claim) for x in serials]
decoders = [codecs.getincrementaldecoder('utf-8')() for _ in pandas]
if not len(pandas):
print("no pandas found")
if no_reconnect:
sys.exit(0)
time.sleep(1)
continue
if os.getenv("BAUD") is not None:
for panda in pandas:
panda.set_uart_baud(port_number, int(os.getenv("BAUD"))) # type: ignore
while True:
for i, panda in enumerate(pandas):
while True:
ret = panda.serial_read(port_number)
if len(ret) > 0:
decoded = decoders[i].decode(ret)
if no_color:
sys.stdout.write(decoded)
else:
sys.stdout.write(setcolor[i] + decoded + unsetcolor)
sys.stdout.flush()
else:
break
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
ln = sys.stdin.readline()
if claim:
panda.serial_write(port_number, ln)
time.sleep(0.01)
except KeyboardInterrupt:
break
except Exception:
print("panda disconnected!")
time.sleep(0.5)

View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import matplotlib.pyplot as plt # pylint: disable=import-error
HASHING_PRIME = 23
REGISTER_MAP_SIZE = 0x3FF
BYTES_PER_REG = 4
# From ST32F413 datasheet
REGISTER_ADDRESS_REGIONS = [
(0x40000000, 0x40007FFF),
(0x40010000, 0x400107FF),
(0x40011000, 0x400123FF),
(0x40012C00, 0x40014BFF),
(0x40015000, 0x400153FF),
(0x40015800, 0x40015BFF),
(0x40016000, 0x400167FF),
(0x40020000, 0x40021FFF),
(0x40023000, 0x400233FF),
(0x40023800, 0x40023FFF),
(0x40026000, 0x400267FF),
(0x50000000, 0x5003FFFF),
(0x50060000, 0x500603FF),
(0x50060800, 0x50060BFF),
(0x50060800, 0x50060BFF),
(0xE0000000, 0xE00FFFFF)
]
def _hash(reg_addr):
return (((reg_addr >> 16) ^ ((((reg_addr + 1) & 0xFFFF) * HASHING_PRIME) & 0xFFFF)) & REGISTER_MAP_SIZE)
# Calculate hash for each address
hashes = []
double_hashes = []
for (start_addr, stop_addr) in REGISTER_ADDRESS_REGIONS:
for addr in range(start_addr, stop_addr + 1, BYTES_PER_REG):
h = _hash(addr)
hashes.append(h)
double_hashes.append(_hash(h))
# Make histograms
plt.subplot(2, 1, 1)
plt.hist(hashes, bins=REGISTER_MAP_SIZE)
plt.title("Number of collisions per _hash")
plt.xlabel("Address")
plt.subplot(2, 1, 2)
plt.hist(double_hashes, bins=REGISTER_MAP_SIZE)
plt.title("Number of collisions per double _hash")
plt.xlabel("Address")
plt.show()

17
scripts/echo.py Executable file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python3
from opendbc.car.structs import CarParams
from panda import Panda
# This script is intended to be used in conjunction with the echo_loopback_test.py test script from panda jungle.
# It sends a reversed response back for every message received containing b"test".
if __name__ == "__main__":
p = Panda()
p.set_safety_mode(CarParams.SafetyModel.allOutput)
p.set_power_save(False)
while True:
incoming = p.can_recv()
for message in incoming:
address, data, bus = message
if b'test' in data:
p.can_send(address, data[::-1], bus)

14
scripts/fan/fan_test.py Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python
import time
from panda import Panda
if __name__ == "__main__":
p = Panda()
power = 0
while True:
p.set_fan_power(power)
time.sleep(5)
print("Power: ", power, "RPM:", str(p.get_fan_rpm()), "Expected:", int(6500 * power / 100))
power += 10
power %= 110

88
scripts/fan/fan_tuning.py Executable file
View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
import json
import time
import threading
from panda import Panda
def drain_serial(p):
ret = []
while True:
d = p.serial_read(0)
if len(d) == 0:
break
ret.append(d)
return ret
fan_cmd = 0.
def logger(event):
# requires a build with DEBUG_FAN
with Panda(claim=False) as p, open('/tmp/fan_log', 'w') as f:
power = None
target_rpm = None
stall_count = None
rpm_fast = None
t = time.monotonic()
drain_serial(p)
while not event.is_set():
p.set_fan_power(fan_cmd)
for l in drain_serial(p)[::-1]:
ns = l.decode('utf8').strip().split(' ')
if len(ns) == 4:
target_rpm, rpm_fast, power, stall_count = (int(n, 16) for n in ns)
break
dat = {
't': time.monotonic() - t,
'cmd_power': fan_cmd,
'pwm_power': power,
'target_rpm': target_rpm,
'rpm_fast': rpm_fast,
'rpm': p.get_fan_rpm(),
'stall_counter': stall_count,
'total_stall_count': p.health()['fan_stall_count'],
}
f.write(json.dumps(dat) + '\n')
time.sleep(1/16.)
p.set_fan_power(0)
def get_overshoot_rpm(p, power):
global fan_cmd
# make sure the fan is stopped completely
fan_cmd = 0.
while p.get_fan_rpm() > 100:
time.sleep(0.1)
time.sleep(3)
# set it to 30% power to mimic going onroad
fan_cmd = power
max_rpm = 0
max_power = 0
for _ in range(70):
max_rpm = max(max_rpm, p.get_fan_rpm())
max_power = max(max_power, p.health()['fan_power'])
time.sleep(0.1)
# tolerate 10% overshoot
expected_rpm = Panda.MAX_FAN_RPMs[bytes(p.get_type())] * power / 100
overshoot = (max_rpm / expected_rpm) - 1
return overshoot, max_rpm, max_power
if __name__ == "__main__":
event = threading.Event()
threading.Thread(target=logger, args=(event, )).start()
try:
p = Panda()
for power in range(10, 101, 10):
overshoot, max_rpm, max_power = get_overshoot_rpm(p, power)
print(f"Fan power {power}%: overshoot {overshoot:.2%}, Max RPM {max_rpm}, Max power {max_power}%")
finally:
event.set()

7
scripts/get_version.py Executable file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env python3
from panda import Panda
if __name__ == "__main__":
for p in Panda.list():
pp = Panda(p)
print(f"{pp.get_serial()[0]}: {pp.get_version()}")

18
scripts/health_test.py Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
import time
from panda import Panda
if __name__ == "__main__":
i = 0
pi = 0
panda = Panda()
while True:
st = time.monotonic()
while time.monotonic() - st < 1:
panda.health()
i += 1
print(i, panda.health(), "\n")
print(f"Speed: {i - pi}Hz")
pi = i

14
scripts/ir_test.py Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
import time
from panda import Panda
power = 0
if __name__ == "__main__":
p = Panda()
while True:
p.set_ir_power(power)
print("Power: ", str(power))
time.sleep(1)
power += 10
power %= 100

95
scripts/loopback_test.py Executable file
View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
import os
import time
import random
import argparse
from itertools import permutations
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
def run_test(sleep_duration):
pandas = Panda.list()
print(pandas)
if len(pandas) < 2:
raise Exception("Minimum two pandas are needed for test")
run_test_w_pandas(pandas, sleep_duration)
def run_test_w_pandas(pandas, sleep_duration):
h = [Panda(x) for x in pandas]
print("H", h)
for hh in h:
hh.set_safety_mode(CarParams.SafetyModel.allOutput)
# test both directions
for ho in permutations(list(range(len(h))), r=2):
print("***************** TESTING", ho)
panda0, panda1 = h[ho[0]], h[ho[1]]
# **** test health packet ****
print("health", ho[0], h[ho[0]].health())
# **** test can line loopback ****
for bus, obd in [(0, False), (1, False), (2, False), (1, True), (2, True)]:
print("\ntest can", bus)
# flush
cans_echo = panda0.can_recv()
cans_loop = panda1.can_recv()
panda0.set_obd(None)
panda1.set_obd(None)
if obd is True:
panda0.set_obd(bus)
panda1.set_obd(bus)
bus = 3
# send the characters
at = random.randint(1, 2000)
st = get_test_string()[0:8]
panda0.can_send(at, st, bus)
time.sleep(0.1)
# check for receive
cans_echo = panda0.can_recv()
cans_loop = panda1.can_recv()
print("Bus", bus, "echo", cans_echo, "loop", cans_loop)
assert len(cans_echo) == 1
assert len(cans_loop) == 1
assert cans_echo[0][0] == at
assert cans_loop[0][0] == at
assert cans_echo[0][1] == st
assert cans_loop[0][1] == st
assert cans_echo[0][2] == 0x80 | bus
if cans_loop[0][2] != bus:
print("EXPECTED %d GOT %d" % (bus, cans_loop[0][2]))
assert cans_loop[0][2] == bus
print("CAN pass", bus, ho)
time.sleep(sleep_duration)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, help="Number of test iterations to run")
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
args = parser.parse_args()
if args.n is None:
while True:
run_test(sleep_duration=args.sleep)
else:
for _ in range(args.n):
run_test(sleep_duration=args.sleep)

22
scripts/make_release.sh Executable file
View File

@@ -0,0 +1,22 @@
#!/usr/bin/env bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
export CERT=/home/batman/xx/pandaextra/certs/release
if [ ! -f "$CERT" ]; then
echo "No release cert found, cannot build release."
echo "You probably aren't looking to do this anyway."
exit
fi
export RELEASE=1
export BUILDER=DEV
cd $DIR/../board
scons -u -c
rm obj/*
scons -u
cd obj
RELEASE_NAME=$(awk '{print $1}' version)
zip -j ../../release/panda-$RELEASE_NAME.zip version panda.bin.signed bootstub.panda.bin panda_h7.bin.signed bootstub.panda_h7.bin

71
scripts/message_drop_test.py Executable file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
import os
import usb1
import time
import struct
import itertools
import threading
from typing import Any
from opendbc.car.structs import CarParams
from panda import Panda
JUNGLE = "JUNGLE" in os.environ
if JUNGLE:
from panda import PandaJungle
# Generate unique messages
NUM_MESSAGES_PER_BUS = 10000
messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)]
tx_messages = list(itertools.chain.from_iterable([[0xaa, msg, 0], [0xaa, msg, 1], [0xaa, msg, 2]] for msg in messages))
def flood_tx(panda):
print('Sending!')
transferred = 0
while True:
try:
print(f"Sending block {transferred}-{len(tx_messages)}: ", end="")
panda.can_send_many(tx_messages[transferred:], timeout=10)
print("OK")
break
except usb1.USBErrorTimeout as e:
transferred += (e.transferred // 16)
print("timeout, transferred: ", transferred)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")
if __name__ == "__main__":
serials = Panda.list()
receiver: Panda | PandaJungle
if JUNGLE:
sender = Panda()
receiver = PandaJungle()
else:
if len(serials) != 2:
raise Exception("Connect two pandas to perform this test!")
sender = Panda(serials[0])
receiver = Panda(serials[1])
receiver.set_safety_mode(CarParams.SafetyModel.allOutput)
sender.set_safety_mode(CarParams.SafetyModel.allOutput)
# Start transmisson
threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can, and stop when there hasn't been anything for a second
rx: list[Any] = []
old_len = 0
last_change = time.monotonic()
while time.monotonic() - last_change < 1:
if old_len < len(rx):
last_change = time.monotonic()
old_len = len(rx)
rx.extend(receiver.can_recv())
print(f"Received {len(rx)} messages")
# Check if we received everything
for bus in range(3):
received_msgs = {bytes(m[1]) for m in filter(lambda m, b=bus: m[2] == b, rx)} # type: ignore
dropped_msgs = set(messages).difference(received_msgs)
print(f"Bus {bus} dropped msgs: {len(list(dropped_msgs))} / {len(messages)}")

32
scripts/read_flash_spi.py Executable file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python3
from panda import Panda, PandaDFU
if __name__ == "__main__":
try:
from openpilot.system.hardware import HARDWARE
HARDWARE.recover_internal_panda()
Panda.wait_for_dfu(None, 5)
except Exception:
pass
p = PandaDFU(None)
cfg = p.get_mcu_type().config
def readmem(addr, length, fn):
print(f"reading {hex(addr)} {hex(length)} bytes to {fn}")
max_size = 255
with open(fn, "wb") as f:
to_read = length
while to_read > 0:
l = min(to_read, max_size)
dat = p._handle.read(addr, l)
assert len(dat) == l
f.write(dat)
to_read -= len(dat)
addr += len(dat)
addr = cfg.bootstub_address
for i, sector_size in enumerate(cfg.sector_sizes):
readmem(addr, sector_size, f"sector_{i}.bin")
addr += sector_size

6
scripts/read_st_flash.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
rm -f /tmp/dump_bootstub
rm -f /tmp/dump_main
dfu-util -a 0 -s 0x08000000 -U /tmp/dump_bootstub
dfu-util -a 0 -s 0x08004000 -U /tmp/dump_main

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import time
from panda import Panda, PandaDFU
class GPIO:
STM_RST_N = 124
STM_BOOT0 = 134
HUB_RST_N = 30
def gpio_init(pin, output):
with open(f"/sys/class/gpio/gpio{pin}/direction", 'wb') as f:
f.write(b"out" if output else b"in")
def gpio_set(pin, high):
with open(f"/sys/class/gpio/gpio{pin}/value", 'wb') as f:
f.write(b"1" if high else b"0")
if __name__ == "__main__":
for pin in (GPIO.STM_RST_N, GPIO.STM_BOOT0, GPIO.HUB_RST_N):
gpio_init(pin, True)
# reset USB hub
gpio_set(GPIO.HUB_RST_N, 0)
time.sleep(0.5)
gpio_set(GPIO.HUB_RST_N, 1)
# flash bootstub
print("resetting into DFU")
gpio_set(GPIO.STM_RST_N, 1)
gpio_set(GPIO.STM_BOOT0, 1)
time.sleep(1)
gpio_set(GPIO.STM_RST_N, 0)
gpio_set(GPIO.STM_BOOT0, 0)
time.sleep(1)
print("flashing bootstub")
PandaDFU(None).recover()
gpio_set(GPIO.STM_RST_N, 1)
time.sleep(0.5)
gpio_set(GPIO.STM_RST_N, 0)
time.sleep(1)
print("flashing app")
p = Panda()
assert p.bootstub
p.flash()

17
scripts/relay_test.py Executable file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
import time
from opendbc.car.structs import CarParams
from panda import Panda
p = Panda()
while True:
p.set_safety_mode(CarParams.SafetyModel.toyota)
p.send_heartbeat()
print("ON")
time.sleep(1)
p.set_safety_mode(CarParams.SafetyModel.noOutput)
p.send_heartbeat()
print("OFF")
time.sleep(1)

27
scripts/restore_flash_spi.py Executable file
View File

@@ -0,0 +1,27 @@
#!/usr/bin/env python3
from panda import Panda, PandaDFU, STBootloaderSPIHandle
if __name__ == "__main__":
try:
from openpilot.system.hardware import HARDWARE
HARDWARE.recover_internal_panda()
Panda.wait_for_dfu(None, 5)
except Exception:
pass
p = PandaDFU(None)
assert isinstance(p._handle, STBootloaderSPIHandle)
cfg = p.get_mcu_type().config
print("restoring from backup...")
addr = cfg.bootstub_address
for i, sector_size in enumerate(cfg.sector_sizes):
print(f"- sector #{i}")
p._handle.erase_sector(i)
with open(f"sector_{i}.bin", "rb") as f:
dat = f.read()
assert len(dat) == sector_size
p._handle.program(addr, dat)
addr += len(dat)
p.reset()

74
scripts/setup_device_ci.sh Executable file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env bash
set -e
if [ -z "$SOURCE_DIR" ]; then
echo "SOURCE_DIR must be set"
exit 1
fi
if [ -z "$GIT_COMMIT" ]; then
echo "GIT_COMMIT must be set"
exit 1
fi
if [ -z "$TEST_DIR" ]; then
echo "TEST_DIR must be set"
exit 1
fi
CONTINUE_PATH="/data/continue.sh"
tee $CONTINUE_PATH << EOF
#!/usr/bin/env bash
sudo abctl --set_success
# patch sshd config
sudo mount -o rw,remount /
sudo sed -i "s,/data/params/d/GithubSshKeys,/usr/comma/setup_keys," /etc/ssh/sshd_config
sudo systemctl daemon-reload
sudo systemctl restart ssh
sudo systemctl disable ssh-param-watcher.path
sudo systemctl disable ssh-param-watcher.service
sudo mount -o ro,remount /
while true; do
if ! sudo systemctl is-active -q ssh; then
sudo systemctl start ssh
fi
sleep 5s
done
sleep infinity
EOF
chmod +x $CONTINUE_PATH
# set up environment
if [ ! -d "$SOURCE_DIR" ]; then
git clone https://github.com/commaai/panda.git $SOURCE_DIR
fi
# setup device/SOM state
SOM_ST_IO=49
echo $SOM_ST_IO > /sys/class/gpio/export || true
echo out > /sys/class/gpio/gpio${SOM_ST_IO}/direction
echo 1 > /sys/class/gpio/gpio${SOM_ST_IO}/value
# checkout panda commit
cd $SOURCE_DIR
rm -f .git/index.lock
git reset --hard
git fetch --no-tags --no-recurse-submodules -j4 --verbose --depth 1 origin $GIT_COMMIT
find . -maxdepth 1 -not -path './.git' -not -name '.' -not -name '..' -exec rm -rf '{}' \;
git reset --hard $GIT_COMMIT
git checkout $GIT_COMMIT
git clean -xdff
echo "git checkout done, t=$SECONDS"
du -hs $SOURCE_DIR $SOURCE_DIR/.git
rsync -a --delete $SOURCE_DIR $TEST_DIR
echo "$TEST_DIR synced with $GIT_COMMIT, t=$SECONDS"

7
scripts/som_debug.sh Executable file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -e
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
cd $DIR
PYTHONUNBUFFERED=1 NO_COLOR=1 CLAIM=1 PORT=4 ./debug_console.py

21
scripts/spam_can.py Executable file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import os
import random
from opendbc.car.structs import CarParams
from panda import Panda
def get_test_string():
return b"test" + os.urandom(10)
if __name__ == "__main__":
p = Panda()
p.set_safety_mode(CarParams.SafetyModel.allOutput)
print("Spamming all buses...")
while True:
at = random.randint(1, 2000)
st = get_test_string()[0:8]
bus = random.randint(0, 2)
p.can_send(at, st, bus)
# print("Sent message on bus: ", bus)

33
scripts/standalone_test.py Executable file
View File

@@ -0,0 +1,33 @@
#!/usr/bin/env python3
import struct
import time
from opendbc.car.structs import CarParams
from panda import Panda
if __name__ == "__main__":
p = Panda()
print(p.get_serial())
print(p.health())
t1 = time.time()
for _ in range(100):
p.get_serial()
t2 = time.time()
print("100 requests took %.2f ms" % ((t2 - t1) * 1000))
p.set_safety_mode(CarParams.SafetyModel.allOutput)
a = 0
while True:
# flood
msg = b"\xaa" * 4 + struct.pack("I", a)
p.can_send(0xaa, msg, 0)
p.can_send(0xaa, msg, 1)
p.can_send(0xaa, msg, 4)
time.sleep(0.01)
dat = p.can_recv()
if len(dat) > 0:
print(dat)
a += 1

153
scripts/test_canfd.py Executable file
View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
import os
import time
import random
from collections import defaultdict
from opendbc.car.structs import CarParams
from panda import Panda, calculate_checksum, DLC_TO_LEN
from panda import PandaJungle
from panda.tests.hitl.helpers import time_many_sends
H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2]
JUNGLE_SERIAL = os.getenv("JUNGLE")
H7_PANDAS_EXCLUDE = [] # type: ignore
if os.getenv("H7_PANDAS_EXCLUDE"):
H7_PANDAS_EXCLUDE = os.getenv("H7_PANDAS_EXCLUDE").strip().split(" ") # type: ignore
def panda_reset():
panda_serials = []
panda_jungle = PandaJungle(JUNGLE_SERIAL)
panda_jungle.set_can_silent(True)
panda_jungle.set_panda_power(False)
time.sleep(1)
panda_jungle.set_panda_power(True)
time.sleep(4)
for serial in Panda.list():
if serial not in H7_PANDAS_EXCLUDE:
with Panda(serial=serial) as p:
if p.get_type() in H7_HW_TYPES:
p.reset()
panda_serials.append(serial)
print("test pandas", panda_serials)
assert len(panda_serials) == 2, "Two H7 pandas required"
return panda_serials
def panda_init(serial, enable_canfd=False, enable_non_iso=False):
p = Panda(serial=serial)
p.set_power_save(False)
for bus in range(3):
p.set_can_speed_kbps(0, 500)
if enable_canfd:
p.set_can_data_speed_kbps(bus, 2000)
if enable_non_iso:
p.set_canfd_non_iso(bus, True)
p.set_safety_mode(CarParams.SafetyModel.allOutput)
return p
def test_canfd_throughput(p, p_recv=None):
two_pandas = p_recv is not None
p.set_safety_mode(CarParams.SafetyModel.allOutput)
if two_pandas:
p_recv.set_safety_mode(CarParams.SafetyModel.allOutput)
# enable output mode
else:
p.set_can_loopback(True)
tests = [
[500, 1000, 2000], # speeds
[93, 87, 78], # saturation thresholds
]
for i in range(len(tests[0])):
# set bus 0 data speed to speed
p.set_can_data_speed_kbps(0, tests[0][i])
if p_recv is not None:
p_recv.set_can_data_speed_kbps(0, tests[0][i])
time.sleep(0.05)
comp_kbps = time_many_sends(p, 0, p_recv=p_recv, msg_count=400, two_pandas=two_pandas, msg_len=64)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
saturation_pct = (comp_kbps / tests[0][i]) * 100.0
assert saturation_pct > tests[1][i]
assert saturation_pct < 100
def canfd_test(p_send, p_recv):
for n in range(100):
sent_msgs = defaultdict(set)
to_send = []
for _ in range(200):
bus = random.randrange(3)
for dlc in range(len(DLC_TO_LEN)):
address = random.randrange(1, 1<<29)
data = bytearray(random.getrandbits(8) for _ in range(DLC_TO_LEN[dlc]))
if len(data) >= 2:
data[0] = calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8"))
to_send.append([address, data, bus])
sent_msgs[bus].add((address, bytes(data)))
p_send.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while (time.monotonic() - start_time < 1) and any(len(x) > 0 for x in sent_msgs.values()):
incoming = p_recv.can_recv()
for msg in incoming:
address, data, bus = msg
if len(data) >= 2:
assert calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8")) == data[0]
k = (address, bytes(data))
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[bus].discard(k)
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {n}: bus {bus} missing {len(sent_msgs[bus])} messages"
def setup_test(enable_non_iso=False):
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=False, enable_non_iso=enable_non_iso)
p_recv = panda_init(panda_serials[1], enable_canfd=True, enable_non_iso=enable_non_iso)
# Check that sending panda CAN FD and BRS are turned off
for bus in range(3):
health = p_send.can_health(bus)
assert not health["canfd_enabled"]
assert not health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
# Receiving panda sends dummy CAN FD message that should enable CAN FD on sender side
for bus in range(3):
p_recv.can_send(0x200, b"dummymessage", bus)
p_recv.can_recv()
p_send.can_recv()
# Check if all tested buses on sending panda have swithed to CAN FD with BRS
for bus in range(3):
health = p_send.can_health(bus)
assert health["canfd_enabled"]
assert health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
return p_send, p_recv
def main():
print("[TEST CAN-FD]")
p_send, p_recv = setup_test()
canfd_test(p_send, p_recv)
print("[TEST CAN-FD non-ISO]")
p_send, p_recv = setup_test(enable_non_iso=True)
canfd_test(p_send, p_recv)
print("[TEST CAN-FD THROUGHPUT]")
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=True)
p_recv = panda_init(panda_serials[1], enable_canfd=True)
test_canfd_throughput(p_send, p_recv)
if __name__ == "__main__":
main()