Files
sunnypilot/selfdrive/pandad/tests/test_pandad_loopback.py
Shane Smiskol 4f019b5f60 move selfdrive/car to opendbc (#32630)
* move most of /car

* move some car tests

move some car tests

* fix selfdrive/car/tests

* fix selfdrive/controls tests

* fix the rest of the selfdrive tests

* bump opendbc

* fix all tests

* few more non-test references

* remove opcar and move docs to car

fix these debugging scripts

fix docs

* bump opendbc and panda

forgot panda
old-commit-hash: e735a7f379
2024-08-17 00:54:51 -07:00

115 lines
4.1 KiB
Python

import os
import copy
import random
import time
import pytest
from collections import defaultdict
from pprint import pprint
import cereal.messaging as messaging
from cereal import car, log
from opendbc.car.can_definitions import CanData
from openpilot.common.retry import retry
from openpilot.common.params import Params
from openpilot.common.timeout import Timeout
from openpilot.selfdrive.pandad import can_list_to_can_capnp
from openpilot.system.hardware import TICI
from openpilot.selfdrive.test.helpers import phone_only, with_processes
@retry(attempts=3)
def setup_pandad(num_pandas):
params = Params()
params.clear_all()
params.put_bool("IsOnroad", False)
sm = messaging.SubMaster(['pandaStates'])
with Timeout(90, "pandad didn't start"):
while sm.recv_frame['pandaStates'] < 1 or len(sm['pandaStates']) == 0 or \
any(ps.pandaType == log.PandaState.PandaType.unknown for ps in sm['pandaStates']):
sm.update(1000)
found_pandas = len(sm['pandaStates'])
assert num_pandas == found_pandas, "connected pandas ({found_pandas}) doesn't match expected panda count ({num_pandas}). \
connect another panda for multipanda tests."
# pandad safety setting relies on these params
cp = car.CarParams.new_message()
safety_config = car.CarParams.SafetyConfig.new_message()
safety_config.safetyModel = car.CarParams.SafetyModel.allOutput
cp.safetyConfigs = [safety_config]*num_pandas
params.put_bool("IsOnroad", True)
params.put_bool("FirmwareQueryDone", True)
params.put_bool("ControlsReady", True)
params.put("CarParams", cp.to_bytes())
with Timeout(90, "pandad didn't set safety mode"):
while any(ps.safetyModel != car.CarParams.SafetyModel.allOutput for ps in sm['pandaStates']):
sm.update(1000)
def send_random_can_messages(sendcan, count, num_pandas=1):
sent_msgs = defaultdict(set)
for _ in range(count):
to_send = []
for __ in range(random.randrange(20)):
bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3])
addr = random.randrange(1, 1<<29)
dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9)))
if (addr, dat) in sent_msgs[bus]:
continue
sent_msgs[bus].add((addr, dat))
to_send.append(CanData(addr, dat, bus))
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
return sent_msgs
@pytest.mark.tici
class TestBoarddLoopback:
@classmethod
def setup_class(cls):
os.environ['STARTED'] = '1'
os.environ['BOARDD_LOOPBACK'] = '1'
@phone_only
@with_processes(['pandad'])
def test_loopback(self):
num_pandas = 2 if TICI and "SINGLE_PANDA" not in os.environ else 1
setup_pandad(num_pandas)
sendcan = messaging.pub_sock('sendcan')
can = messaging.sub_sock('can', conflate=False, timeout=100)
sm = messaging.SubMaster(['pandaStates'])
time.sleep(1)
n = 200
for i in range(n):
print(f"pandad loopback {i}/{n}")
sent_msgs = send_random_can_messages(sendcan, random.randrange(20, 100), num_pandas)
sent_loopback = copy.deepcopy(sent_msgs)
sent_loopback.update({k+128: copy.deepcopy(v) for k, v in sent_msgs.items()})
sent_total = {k: len(v) for k, v in sent_loopback.items()}
for _ in range(100 * 5):
sm.update(0)
recvd = messaging.drain_sock(can, wait_for_one=True)
for msg in recvd:
for m in msg.can:
key = (m.address, m.dat)
assert key in sent_loopback[m.src], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}"
sent_loopback[m.src].discard(key)
if all(len(v) == 0 for v in sent_loopback.values()):
break
# if a set isn't empty, messages got dropped
pprint(sent_msgs)
pprint(sent_loopback)
print({k: len(x) for k, x in sent_loopback.items()})
print(sum([len(x) for x in sent_loopback.values()]))
pprint(sm['pandaStates']) # may drop messages due to RX buffer overflow
for bus in sent_loopback.keys():
assert not len(sent_loopback[bus]), f"loop {i}: bus {bus} missing {len(sent_loopback[bus])} out of {sent_total[bus]} messages"