Files
dragonpilot/selfdrive/boardd/tests/test_boardd_loopback.py
Adeeb Shihadeh ff89184823 tizi tests in jenkins (#27810)
* add

* mv there for now

* mv

* single panda

* move into parallel

* longer timeout for dfu

---------

Co-authored-by: Comma Device <device@comma.ai>
2023-04-05 11:07:42 -07:00

97 lines
3.1 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import random
import time
import unittest
from collections import defaultdict
import cereal.messaging as messaging
from cereal import car
from common.params import Params
from common.spinner import Spinner
from common.timeout import Timeout
from selfdrive.boardd.boardd import can_list_to_can_capnp
from selfdrive.car import make_can_msg
from system.hardware import TICI
from selfdrive.test.helpers import phone_only, with_processes
class TestBoardd(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['STARTED'] = '1'
os.environ['BOARDD_LOOPBACK'] = '1'
cls.spinner = Spinner()
@classmethod
def tearDownClass(cls):
cls.spinner.close()
@phone_only
@with_processes(['pandad'])
def test_loopback(self):
# wait for boardd to init
time.sleep(2)
with Timeout(60, "boardd didn't start"):
sm = messaging.SubMaster(['pandaStates'])
while sm.rcv_frame['pandaStates'] < 1 and len(sm['pandaStates']) == 0:
sm.update(1000)
num_pandas = len(sm['pandaStates'])
expected_pandas = 2 if TICI and "SINGLE_PANDA" not in os.environ else 1
self.assertEqual(num_pandas, expected_pandas, "connected pandas ({num_pandas}) doesn't match expected panda count ({expected_pandas}). \
connect another panda for multipanda tests.")
# boardd blocks on CarVin and CarParams
cp = car.CarParams.new_message()
safety_config = car.CarParams.SafetyConfig.new_message()
safety_config.safetyModel = car.CarParams.SafetyModel.allOutput
cp.safetyConfigs = [safety_config]*num_pandas
params = Params()
params.put_bool("FirmwareQueryDone", True)
params.put_bool("ControlsReady", True)
params.put("CarParams", cp.to_bytes())
sendcan = messaging.pub_sock('sendcan')
can = messaging.sub_sock('can', conflate=False, timeout=100)
time.sleep(0.2)
n = 200
for i in range(n):
self.spinner.update(f"boardd loopback {i}/{n}")
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
to_send = []
for __ in range(random.randrange(100)):
bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3])
addr = random.randrange(1, 1<<29)
dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9)))
sent_msgs[bus].add((addr, dat))
to_send.append(make_can_msg(addr, dat, bus))
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
for _ in range(100 * 2):
recvd = messaging.drain_sock(can, wait_for_one=True)
for msg in recvd:
for m in msg.can:
if m.src >= 128:
key = (m.address, m.dat)
assert key in sent_msgs[m.src-128], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}"
sent_msgs[m.src-128].discard(key)
if all(len(v) == 0 for v in sent_msgs.values()):
break
# if a set isn't empty, messages got dropped
for bus in sent_msgs.keys():
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
if __name__ == "__main__":
unittest.main()