mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-03-03 00:13:52 +08:00
* move to openpilot * draft * support internal urls * not used * update seg list * regen segment list * regen seg list * regen * regen * regen * no dirty segments * regen seg list with no fuzzy or fixed fp * regen segments with hda2 exception * regen with added filters * regen without bad dongle * regen * regenerate! * regenerate, only OP enabled * regen * regenerate! * regenerate! * stash * use SegmentName * new route list * add temp comment * remove comment * let's see if this works * comment out other tests to speed up * fix agent * ? * hmm * oh whoops... * add matrix * fix * how many levels do we need? * fix values * matrix can't be in parallel * how about this * try this * Revert matrix Revert "try this" This reverts commit 8d0d91fd70c467c1fbb4e4e9aed511d15b2a30ef. Revert "how about this" This reverts commit a8e4fc53234824e77cbfb1e471277bc033e9dea1. Revert "matrix can't be in parallel" This reverts commit daaa6fcc3c75c74cbb90e97c565099a94123994c. Revert "fix values" This reverts commit df554b6a3371d124a574eb8d26bc51ef5b5b8fde. Revert "how many levels do we need?" This reverts commit 1a17320fa1c5e7220ef60e29981bbb3bb7da16c6. Revert "fix" This reverts commit e7eb6e404358fbd2eac3fea1901a8d30ea92d729. Revert "add matrix" This reverts commit a1b57e5725417d3c2f639f8edfc0c889b84b6753. * use pytest instead! (5 jobs is ~150 mins) * split lines, uncomment * This Sienna seg has a fault SDSU that stopped forwarding/sending msgs * picked a route with no PSCMStatus and no panda errors, recent routes are working * this cadillac was dashcammed (no radar) * opened an issue for this, it's 'expected' right now * small clean up * small clean up * i don't think that worked * is this needed? * add to new PC tests * cache * draft * Revert "draft" This reverts commit 3b7f740dd4883118747300bc3687074c2d3c2116. * probably should be function * draft * clean up * add todo * 600 random segments * debug * does this fix pythonpath issues? fix * try this * mount? * pytest again! * no need for PYTHONPATH now * Update Jenkinsfile * ? * convention * clean up * would be even more complex (have to unset ci which is class level) * track * is lfs pulled at all? * ah no it's not
344 lines
12 KiB
Python
Executable File
344 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# pylint: disable=E1101
|
|
import os
|
|
import importlib
|
|
import unittest
|
|
from collections import defaultdict, Counter
|
|
from typing import List, Optional, Tuple
|
|
from parameterized import parameterized_class
|
|
|
|
from cereal import log, car
|
|
from common.basedir import BASEDIR
|
|
from common.realtime import DT_CTRL
|
|
from selfdrive.car.fingerprints import all_known_cars
|
|
from selfdrive.car.car_helpers import interfaces
|
|
from selfdrive.car.gm.values import CAR as GM
|
|
from selfdrive.car.honda.values import CAR as HONDA, HONDA_BOSCH
|
|
from selfdrive.car.hyundai.values import CAR as HYUNDAI
|
|
from selfdrive.car.tests.routes import non_tested_cars, routes, CarTestRoute
|
|
from selfdrive.test.openpilotci import get_url
|
|
from tools.lib.logreader import LogReader
|
|
from tools.lib.route import Route, SegmentName, RouteName
|
|
|
|
from panda.tests.libpanda import libpanda_py
|
|
|
|
PandaType = log.PandaState.PandaType
|
|
|
|
NUM_JOBS = int(os.environ.get("NUM_JOBS", "1"))
|
|
JOB_ID = int(os.environ.get("JOB_ID", "0"))
|
|
INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "")
|
|
|
|
ignore_addr_checks_valid = [
|
|
GM.BUICK_REGAL,
|
|
HYUNDAI.GENESIS_G70_2020,
|
|
]
|
|
|
|
|
|
def get_test_cases():
|
|
# build list of test cases
|
|
test_cases: List[Tuple[str, Optional[CarTestRoute]]] = []
|
|
if not len(INTERNAL_SEG_LIST):
|
|
routes_by_car = defaultdict(set)
|
|
for r in routes:
|
|
routes_by_car[r.car_model].add(r)
|
|
|
|
for i, c in enumerate(sorted(all_known_cars())):
|
|
if i % NUM_JOBS == JOB_ID:
|
|
test_cases.extend(sorted((c, r) for r in routes_by_car.get(c, (None,))))
|
|
|
|
else:
|
|
with open(os.path.join(BASEDIR, INTERNAL_SEG_LIST), "r") as f:
|
|
seg_list = iter(f.read().splitlines())
|
|
|
|
for platform in seg_list:
|
|
platform = platform[2:] # get rid of comment
|
|
segment_name = SegmentName(next(seg_list))
|
|
test_cases.append((platform, CarTestRoute(segment_name.route_name.canonical_name, platform,
|
|
segment=segment_name.segment_num)))
|
|
return test_cases
|
|
|
|
|
|
SKIP_ENV_VAR = "SKIP_LONG_TESTS"
|
|
|
|
|
|
class TestCarModelBase(unittest.TestCase):
|
|
car_model = None
|
|
test_route = None
|
|
ci = True
|
|
|
|
@unittest.skipIf(SKIP_ENV_VAR in os.environ, f"Long running test skipped. Unset {SKIP_ENV_VAR} to run")
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'):
|
|
raise unittest.SkipTest
|
|
|
|
if 'FILTER' in os.environ:
|
|
if not cls.car_model.startswith(tuple(os.environ.get('FILTER').split(','))):
|
|
raise unittest.SkipTest
|
|
|
|
if cls.test_route is None:
|
|
if cls.car_model in non_tested_cars:
|
|
print(f"Skipping tests for {cls.car_model}: missing route")
|
|
raise unittest.SkipTest
|
|
raise Exception(f"missing test route for {cls.car_model}")
|
|
|
|
experimental_long = False
|
|
test_segs = (2, 1, 0)
|
|
if cls.test_route.segment is not None:
|
|
test_segs = (cls.test_route.segment,)
|
|
|
|
for seg in test_segs:
|
|
try:
|
|
if len(INTERNAL_SEG_LIST):
|
|
route_name = RouteName(cls.test_route.route)
|
|
lr = LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2")
|
|
elif cls.ci:
|
|
lr = LogReader(get_url(cls.test_route.route, seg))
|
|
else:
|
|
lr = LogReader(Route(cls.test_route.route).log_paths()[seg])
|
|
except Exception:
|
|
continue
|
|
|
|
car_fw = []
|
|
can_msgs = []
|
|
fingerprint = defaultdict(dict)
|
|
for msg in lr:
|
|
if msg.which() == "can":
|
|
for m in msg.can:
|
|
if m.src < 64:
|
|
fingerprint[m.src][m.address] = len(m.dat)
|
|
can_msgs.append(msg)
|
|
elif msg.which() == "carParams":
|
|
car_fw = msg.carParams.carFw
|
|
if msg.carParams.openpilotLongitudinalControl:
|
|
experimental_long = True
|
|
if cls.car_model is None and not cls.ci:
|
|
cls.car_model = msg.carParams.carFingerprint
|
|
|
|
if len(can_msgs) > int(50 / DT_CTRL):
|
|
break
|
|
else:
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded?")
|
|
|
|
cls.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime)
|
|
|
|
cls.CarInterface, cls.CarController, cls.CarState = interfaces[cls.car_model]
|
|
cls.CP = cls.CarInterface.get_params(cls.car_model, fingerprint, car_fw, experimental_long, docs=False)
|
|
assert cls.CP
|
|
assert cls.CP.carFingerprint == cls.car_model
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
del cls.can_msgs
|
|
|
|
def setUp(self):
|
|
self.CI = self.CarInterface(self.CP, self.CarController, self.CarState)
|
|
assert self.CI
|
|
|
|
# TODO: check safetyModel is in release panda build
|
|
self.safety = libpanda_py.libpanda
|
|
|
|
cfg = self.CP.safetyConfigs[-1]
|
|
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
|
|
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
|
|
self.safety.init_tests()
|
|
|
|
def test_car_params(self):
|
|
if self.CP.dashcamOnly:
|
|
self.skipTest("no need to check carParams for dashcamOnly")
|
|
|
|
# make sure car params are within a valid range
|
|
self.assertGreater(self.CP.mass, 1)
|
|
|
|
if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
|
|
tuning = self.CP.lateralTuning.which()
|
|
if tuning == 'pid':
|
|
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
|
|
elif tuning == 'torque':
|
|
self.assertTrue(self.CP.lateralTuning.torque.kf > 0)
|
|
elif tuning == 'indi':
|
|
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV))
|
|
else:
|
|
raise Exception("unknown tuning")
|
|
|
|
def test_car_interface(self):
|
|
# TODO: also check for checksum violations from can parser
|
|
can_invalid_cnt = 0
|
|
can_valid = False
|
|
CC = car.CarControl.new_message()
|
|
|
|
for i, msg in enumerate(self.can_msgs):
|
|
CS = self.CI.update(CC, (msg.as_builder().to_bytes(),))
|
|
self.CI.apply(CC, msg.logMonoTime)
|
|
|
|
if CS.canValid:
|
|
can_valid = True
|
|
|
|
# wait max of 2s for low frequency msgs to be seen
|
|
if i > 200 or can_valid:
|
|
can_invalid_cnt += not CS.canValid
|
|
|
|
self.assertEqual(can_invalid_cnt, 0)
|
|
|
|
def test_radar_interface(self):
|
|
os.environ['NO_RADAR_SLEEP'] = "1"
|
|
RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface
|
|
RI = RadarInterface(self.CP)
|
|
assert RI
|
|
|
|
error_cnt = 0
|
|
for i, msg in enumerate(self.can_msgs):
|
|
rr = RI.update((msg.as_builder().to_bytes(),))
|
|
if rr is not None and i > 50:
|
|
error_cnt += car.RadarData.Error.canError in rr.errors
|
|
self.assertEqual(error_cnt, 0)
|
|
|
|
def test_panda_safety_rx_valid(self):
|
|
if self.CP.dashcamOnly:
|
|
self.skipTest("no need to check panda safety for dashcamOnly")
|
|
|
|
start_ts = self.can_msgs[0].logMonoTime
|
|
|
|
failed_addrs = Counter()
|
|
for can in self.can_msgs:
|
|
# update panda timer
|
|
t = (can.logMonoTime - start_ts) / 1e3
|
|
self.safety.set_timer(int(t))
|
|
|
|
# run all msgs through the safety RX hook
|
|
for msg in can.can:
|
|
if msg.src >= 64:
|
|
continue
|
|
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
|
|
if self.safety.safety_rx_hook(to_send) != 1:
|
|
failed_addrs[hex(msg.address)] += 1
|
|
|
|
# ensure all msgs defined in the addr checks are valid
|
|
if self.car_model not in ignore_addr_checks_valid:
|
|
self.safety.safety_tick_current_rx_checks()
|
|
if t > 1e6:
|
|
self.assertTrue(self.safety.addr_checks_valid())
|
|
self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}")
|
|
|
|
def test_panda_safety_tx_cases(self, data=None):
|
|
"""Asserts we can tx common messages"""
|
|
if self.CP.notCar:
|
|
self.skipTest("Skipping test for notCar")
|
|
|
|
def test_car_controller(car_control):
|
|
now_nanos = 0
|
|
msgs_sent = 0
|
|
CI = self.CarInterface(self.CP, self.CarController, self.CarState)
|
|
for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages
|
|
CI.update(car_control, [])
|
|
_, sendcan = CI.apply(car_control, now_nanos)
|
|
|
|
now_nanos += DT_CTRL * 1e9
|
|
msgs_sent += len(sendcan)
|
|
for addr, _, dat, bus in sendcan:
|
|
to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat)
|
|
self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus))
|
|
|
|
# Make sure we attempted to send messages
|
|
self.assertGreater(msgs_sent, 50)
|
|
|
|
# Make sure we can send all messages while inactive
|
|
CC = car.CarControl.new_message()
|
|
test_car_controller(CC)
|
|
|
|
# Test cancel + general messages (controls_allowed=False & cruise_engaged=True)
|
|
self.safety.set_cruise_engaged_prev(True)
|
|
CC = car.CarControl.new_message(cruiseControl={'cancel': True})
|
|
test_car_controller(CC)
|
|
|
|
# Test resume + general messages (controls_allowed=True & cruise_engaged=True)
|
|
self.safety.set_controls_allowed(True)
|
|
CC = car.CarControl.new_message(cruiseControl={'resume': True})
|
|
test_car_controller(CC)
|
|
|
|
def test_panda_safety_carstate(self):
|
|
"""
|
|
Assert that panda safety matches openpilot's carState
|
|
"""
|
|
if self.CP.dashcamOnly:
|
|
self.skipTest("no need to check panda safety for dashcamOnly")
|
|
|
|
CC = car.CarControl.new_message()
|
|
|
|
# warm up pass, as initial states may be different
|
|
for can in self.can_msgs[:300]:
|
|
self.CI.update(CC, (can.as_builder().to_bytes(), ))
|
|
for msg in filter(lambda m: m.src in range(64), can.can):
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
|
|
self.safety.safety_rx_hook(to_send)
|
|
|
|
controls_allowed_prev = False
|
|
CS_prev = car.CarState.new_message()
|
|
checks = defaultdict(lambda: 0)
|
|
for idx, can in enumerate(self.can_msgs):
|
|
CS = self.CI.update(CC, (can.as_builder().to_bytes(), ))
|
|
for msg in filter(lambda m: m.src in range(64), can.can):
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
|
|
ret = self.safety.safety_rx_hook(to_send)
|
|
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}")
|
|
|
|
# Skip first frame so CS_prev is properly initialized
|
|
if idx == 0:
|
|
CS_prev = CS
|
|
# Button may be left pressed in warm up period
|
|
if not self.CP.pcmCruise:
|
|
self.safety.set_controls_allowed(0)
|
|
continue
|
|
|
|
# TODO: check rest of panda's carstate (steering, ACC main on, etc.)
|
|
|
|
checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev()
|
|
if self.CP.carName not in ("hyundai", "body"):
|
|
# TODO: fix standstill mismatches for other makes
|
|
checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving()
|
|
|
|
# TODO: remove this exception once this mismatch is resolved
|
|
brake_pressed = CS.brakePressed
|
|
if CS.brakePressed and not self.safety.get_brake_pressed_prev():
|
|
if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
|
|
brake_pressed = False
|
|
checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev()
|
|
checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev()
|
|
|
|
if self.CP.pcmCruise:
|
|
# On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state.
|
|
# On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but
|
|
# openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages).
|
|
if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH:
|
|
# only the rising edges are expected to match
|
|
if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled:
|
|
checks['controlsAllowed'] += not self.safety.get_controls_allowed()
|
|
else:
|
|
checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed()
|
|
else:
|
|
# Check for enable events on rising edge of controls allowed
|
|
button_enable = any(evt.enable for evt in CS.events)
|
|
mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev)
|
|
checks['controlsAllowed'] += mismatch
|
|
controls_allowed_prev = self.safety.get_controls_allowed()
|
|
if button_enable and not mismatch:
|
|
self.safety.set_controls_allowed(False)
|
|
|
|
if self.CP.carName == "honda":
|
|
checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on()
|
|
|
|
CS_prev = CS
|
|
|
|
failed_checks = {k: v for k, v in checks.items() if v > 0}
|
|
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
|
|
|
|
|
|
@parameterized_class(('car_model', 'test_route'), get_test_cases())
|
|
class TestCarModel(TestCarModelBase):
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|