Files
sunnypilot/selfdrive/boardd/tests/test_boardd_loopback.py
Shane Smiskol cdcf06e9e2 boardd: ability to switch between ELM safety params (#27656)
* indecisive

* rename to generic FW query

* remove code and update comment

* we need this to start off, unless we set multiplexing immediately

* draft

* draft 2

* try that

* can't do this either, boardd might read Enabled after removing, but before setting new Request param

* this should work

* use one less param

* fix params

* match behavior (set all pandas to safety param of 1, disabling multiplexing for fingerprinting

* clean up (some tests may temp break)

* fix param name and sort

* time it

* yes it does matter

* add to hyundai's bus 5 query

* remove hyundai for now

* this should work

* clean up

* clean up

* flip argument around, clean up

* fix test_startup

* some clean up

* rm line

* comment makes more sense

* required typing

* clean up common type

* comments

* Update selfdrive/car/car_helpers.py

* line

* whoops, need to set before vin!

* fix debug

* annoying

* more debugging

* bug fix (needs both keys always)

* debuGG

debuGG

* Revert "debuGG"

This reverts commit 55b2f429324c0b92d5cfb2cabf8b20db1e166248.

* Revert "more debugging"

This reverts commit 02934c3403ad5270f03093508b704c151d1ccb2a.

* Revert "annoying"

This reverts commit 8b4e5e09989f9a0217e3ec1c0ba68735929b7366.

* clean that up

* bumpback

* bumpback

* every second write param

* flip

* stuff

* move up?

* fix timing out in CI

* rm
old-commit-hash: 42449b482d
2023-03-23 00:14:31 -07:00

96 lines
2.9 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import random
import time
import unittest
from collections import defaultdict
import cereal.messaging as messaging
from cereal import car
from common.params import Params
from common.spinner import Spinner
from common.timeout import Timeout
from selfdrive.boardd.boardd import can_list_to_can_capnp
from selfdrive.car import make_can_msg
from system.hardware import TICI
from selfdrive.test.helpers import phone_only, with_processes
class TestBoardd(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['STARTED'] = '1'
os.environ['BOARDD_LOOPBACK'] = '1'
cls.spinner = Spinner()
@classmethod
def tearDownClass(cls):
cls.spinner.close()
@phone_only
@with_processes(['pandad'])
def test_loopback(self):
# wait for boardd to init
time.sleep(2)
with Timeout(60, "boardd didn't start"):
sm = messaging.SubMaster(['pandaStates'])
while sm.rcv_frame['pandaStates'] < 1 and len(sm['pandaStates']) == 0:
sm.update(1000)
num_pandas = len(sm['pandaStates'])
if TICI:
self.assertGreater(num_pandas, 1, "connect another panda for multipanda tests")
# boardd blocks on CarVin and CarParams
cp = car.CarParams.new_message()
safety_config = car.CarParams.SafetyConfig.new_message()
safety_config.safetyModel = car.CarParams.SafetyModel.allOutput
cp.safetyConfigs = [safety_config]*num_pandas
params = Params()
params.put_bool("FirmwareQueryDone", True)
params.put_bool("ControlsReady", True)
params.put("CarParams", cp.to_bytes())
sendcan = messaging.pub_sock('sendcan')
can = messaging.sub_sock('can', conflate=False, timeout=100)
time.sleep(0.2)
n = 200
for i in range(n):
self.spinner.update(f"boardd loopback {i}/{n}")
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
to_send = []
for __ in range(random.randrange(100)):
bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3])
addr = random.randrange(1, 1<<29)
dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9)))
sent_msgs[bus].add((addr, dat))
to_send.append(make_can_msg(addr, dat, bus))
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
for _ in range(100 * 2):
recvd = messaging.drain_sock(can, wait_for_one=True)
for msg in recvd:
for m in msg.can:
if m.src >= 128:
key = (m.address, m.dat)
assert key in sent_msgs[m.src-128], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}"
sent_msgs[m.src-128].discard(key)
if all(len(v) == 0 for v in sent_msgs.values()):
break
# if a set isn't empty, messages got dropped
for bus in sent_msgs.keys():
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
if __name__ == "__main__":
unittest.main()