mirror of https://github.com/commaai/panda.git
136 lines
3.4 KiB
Python
Executable File
136 lines
3.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Relay test with loopback between black panda (+ harness and power) and white/grey panda
|
|
# Tests the relay switching multiple times / second by looking at the buses on which loop occurs.
|
|
|
|
|
|
import os
|
|
import time
|
|
import random
|
|
import argparse
|
|
|
|
from panda import Panda
|
|
|
|
def get_test_string():
|
|
return b"test" + os.urandom(10)
|
|
|
|
counter = 0
|
|
open_errors = 0
|
|
closed_errors = 0
|
|
content_errors = 0
|
|
|
|
def run_test(sleep_duration):
|
|
global counter, open_errors, closed_errors
|
|
|
|
pandas = Panda.list()
|
|
print(pandas)
|
|
|
|
# make sure two pandas are connected
|
|
if len(pandas) != 2:
|
|
raise Exception("Connect white/grey and black panda to run this test!")
|
|
|
|
# connect
|
|
pandas[0] = Panda(pandas[0])
|
|
pandas[1] = Panda(pandas[1])
|
|
|
|
# find out which one is black
|
|
type0 = pandas[0].get_type()
|
|
type1 = pandas[1].get_type()
|
|
|
|
black_panda = None
|
|
other_panda = None
|
|
|
|
if type0 == "\x03" and type1 != "\x03":
|
|
black_panda = pandas[0]
|
|
other_panda = pandas[1]
|
|
elif type0 != "\x03" and type1 == "\x03":
|
|
black_panda = pandas[1]
|
|
other_panda = pandas[0]
|
|
else:
|
|
raise Exception("Connect white/grey and black panda to run this test!")
|
|
|
|
# disable safety modes
|
|
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
|
|
# test health packet
|
|
print("black panda health", black_panda.health())
|
|
print("other panda health", other_panda.health())
|
|
|
|
# test black -> other
|
|
while True:
|
|
# Switch on relay
|
|
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
time.sleep(0.05)
|
|
|
|
if not test_buses(black_panda, other_panda, (0, False, [0])):
|
|
open_errors += 1
|
|
raise Exception("Open error")
|
|
|
|
# Switch off relay
|
|
black_panda.set_safety_mode(Panda.SAFETY_SILENT)
|
|
time.sleep(0.05)
|
|
|
|
if not test_buses(black_panda, other_panda, (0, False, [0, 2])):
|
|
closed_errors += 1
|
|
raise Exception("Close error")
|
|
|
|
counter += 1
|
|
print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors)
|
|
|
|
def test_buses(black_panda, other_panda, test_obj):
|
|
global content_errors
|
|
send_bus, obd, recv_buses = test_obj
|
|
|
|
black_panda.send_heartbeat()
|
|
other_panda.send_heartbeat()
|
|
|
|
# Set OBD on send panda
|
|
other_panda.set_obd(True if obd else None)
|
|
|
|
# clear and flush
|
|
other_panda.can_clear(send_bus)
|
|
|
|
for recv_bus in recv_buses:
|
|
black_panda.can_clear(recv_bus)
|
|
|
|
black_panda.can_recv()
|
|
other_panda.can_recv()
|
|
|
|
# send the characters
|
|
at = random.randint(1, 2000)
|
|
st = get_test_string()[0:8]
|
|
other_panda.can_send(at, st, send_bus)
|
|
time.sleep(0.05)
|
|
|
|
# check for receive
|
|
_ = other_panda.can_recv() # can echo
|
|
cans_loop = black_panda.can_recv()
|
|
|
|
loop_buses = []
|
|
for loop in cans_loop:
|
|
if (loop[0] != at) or (loop[1] != st):
|
|
content_errors += 1
|
|
loop_buses.append(loop[2])
|
|
|
|
# test loop buses
|
|
recv_buses.sort()
|
|
loop_buses.sort()
|
|
if(recv_buses != loop_buses):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-n", type=int, help="Number of test iterations to run")
|
|
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0)
|
|
args = parser.parse_args()
|
|
|
|
if args.n is None:
|
|
while True:
|
|
run_test(sleep_duration=args.sleep)
|
|
else:
|
|
for _ in range(args.n):
|
|
run_test(sleep_duration=args.sleep)
|