FwQueryConfig: add platform code support and tests (#28475)
* add config options * you know what, platform codes don't need to be related to fuzzy fingerprinting at all * better comment? * add comment * add test from other PR * add platform code function to hyundai * comment and add eps! * clean up config test * fix test * add error message to test * until we have multiple ways a brand can set up fuzzy FP, let's leave func name fuzzy-specific * rename in comment too * hyundai tests * simpler test! * check all ecus with platform codes * add types-python-dateutil = "^2.8.19.13" * these aren't used any more * Update selfdrive/car/hyundai/tests/test_hyundai.py * not to imply this is active yet old-commit-hash: f5e032b67f6e37d2c12dcd90168a2b33b3b78831
This commit is contained in:
@@ -3,7 +3,7 @@ import capnp
|
||||
import copy
|
||||
from dataclasses import dataclass, field
|
||||
import struct
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import panda.python.uds as uds
|
||||
|
||||
@@ -75,6 +75,12 @@ class FwQueryConfig:
|
||||
# Ecus added for data collection, not to be fingerprinted on
|
||||
extra_ecus: List[Tuple[capnp.lib.capnp._EnumModule, int, Optional[int]]] = field(default_factory=list)
|
||||
|
||||
# Brand-specific fuzzy fingerprinting config options:
|
||||
# A function to get unique, platform-specific identification codes for a set of versions
|
||||
fuzzy_get_platform_codes: Optional[Callable[[List[bytes]], Set[bytes]]] = None
|
||||
# List of ECUs expected to have platform codes
|
||||
platform_code_ecus: List[capnp.lib.capnp._EnumModule] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self):
|
||||
for i in range(len(self.requests)):
|
||||
if self.requests[i].auxiliary:
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
import unittest
|
||||
|
||||
from cereal import car
|
||||
from selfdrive.car.hyundai.values import CANFD_CAR, FW_QUERY_CONFIG, FW_VERSIONS, CAN_GEARS, LEGACY_SAFETY_MODE_CAR, CHECKSUM, CAMERA_SCC_CAR
|
||||
from selfdrive.car.hyundai.values import CAMERA_SCC_CAR, CANFD_CAR, CAN_GEARS, CAR, CHECKSUM, FW_QUERY_CONFIG, \
|
||||
FW_VERSIONS, LEGACY_SAFETY_MODE_CAR
|
||||
|
||||
Ecu = car.CarParams.Ecu
|
||||
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
|
||||
@@ -24,6 +25,65 @@ class TestHyundaiFingerprint(unittest.TestCase):
|
||||
ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_in_whitelist])
|
||||
self.assertEqual(len(ecus_not_in_whitelist), 0, f'{car_model}: Car model has ECUs not in auxiliary request whitelists: {ecu_strings}')
|
||||
|
||||
def test_platform_code_ecus_available(self):
|
||||
no_eps_platforms = CANFD_CAR | {CAR.KIA_SORENTO, CAR.KIA_OPTIMA_G4, CAR.KIA_OPTIMA_G4_FL,
|
||||
CAR.SONATA_LF, CAR.TUCSON, CAR.GENESIS_G90, CAR.GENESIS_G80}
|
||||
|
||||
# Asserts ECU keys essential for fuzzy fingerprinting are available on all platforms
|
||||
for car_model, ecus in FW_VERSIONS.items():
|
||||
with self.subTest(car_model=car_model):
|
||||
for fuzzy_ecu in FW_QUERY_CONFIG.platform_code_ecus:
|
||||
if fuzzy_ecu in (Ecu.fwdRadar, Ecu.eps) and car_model == CAR.HYUNDAI_GENESIS:
|
||||
continue
|
||||
if fuzzy_ecu == Ecu.eps and car_model in no_eps_platforms:
|
||||
continue
|
||||
self.assertIn(fuzzy_ecu, [e[0] for e in ecus])
|
||||
|
||||
def test_fuzzy_fw_dates(self):
|
||||
# Some newer platforms have date codes in a different format we don't yet parse,
|
||||
# for now assert date format is consistent for all FW across each platform
|
||||
for car_model, ecus in FW_VERSIONS.items():
|
||||
with self.subTest(car_model=car_model):
|
||||
for ecu, fws in ecus.items():
|
||||
if ecu[0] not in FW_QUERY_CONFIG.platform_code_ecus:
|
||||
continue
|
||||
|
||||
codes = set()
|
||||
for fw in fws:
|
||||
codes |= FW_QUERY_CONFIG.fuzzy_get_platform_codes([fw])
|
||||
|
||||
# Either no dates should be parsed or all dates should be parsed
|
||||
self.assertEqual(len({b'-' in code for code in codes}), 1)
|
||||
|
||||
def test_fuzzy_platform_codes(self):
|
||||
# Asserts basic platform code parsing behavior
|
||||
codes = FW_QUERY_CONFIG.fuzzy_get_platform_codes([b'\xf1\x00DH LKAS 1.1 -150210'])
|
||||
self.assertEqual(codes, {b"DH-1502"})
|
||||
|
||||
# Some cameras and all radars do not have dates
|
||||
codes = FW_QUERY_CONFIG.fuzzy_get_platform_codes([b'\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 '])
|
||||
self.assertEqual(codes, {b"AEhe"})
|
||||
|
||||
codes = FW_QUERY_CONFIG.fuzzy_get_platform_codes([b'\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 '])
|
||||
self.assertEqual(codes, {b"CV1"})
|
||||
|
||||
codes = FW_QUERY_CONFIG.fuzzy_get_platform_codes([
|
||||
b'\xf1\x00DH LKAS 1.1 -150210',
|
||||
b'\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 ',
|
||||
b'\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 ',
|
||||
])
|
||||
self.assertEqual(codes, {b"DH-1502", b"AEhe", b"CV1"})
|
||||
|
||||
# Returned platform codes must inclusively contain start/end dates
|
||||
codes = FW_QUERY_CONFIG.fuzzy_get_platform_codes([
|
||||
b'\xf1\x00LX2 MFC AT USA LHD 1.00 1.07 99211-S8100 220222',
|
||||
b'\xf1\x00LX2 MFC AT USA LHD 1.00 1.08 99211-S8100 211103',
|
||||
b'\xf1\x00ON MFC AT USA LHD 1.00 1.01 99211-S9100 190405',
|
||||
b'\xf1\x00ON MFC AT USA LHD 1.00 1.03 99211-S9100 190720',
|
||||
])
|
||||
self.assertEqual(codes, {b'LX2-2111', b'LX2-2112', b'LX2-2201', b'LX2-2202',
|
||||
b'ON-1904', b'ON-1905', b'ON-1906', b'ON-1907'})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
from dateutil import rrule
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, IntFlag
|
||||
from typing import Dict, List, Optional, Union
|
||||
from typing import DefaultDict, Dict, List, Optional, Set, Union
|
||||
|
||||
from cereal import car
|
||||
from panda.python import uds
|
||||
@@ -8,6 +12,7 @@ from common.conversions import Conversions as CV
|
||||
from selfdrive.car import dbc_dict
|
||||
from selfdrive.car.docs_definitions import CarFootnote, CarHarness, CarInfo, CarParts, Column
|
||||
from selfdrive.car.fw_query_definitions import FwQueryConfig, Request, p16
|
||||
from system.swaglog import cloudlog
|
||||
|
||||
Ecu = car.CarParams.Ecu
|
||||
|
||||
@@ -342,6 +347,36 @@ FINGERPRINTS = {
|
||||
}],
|
||||
}
|
||||
|
||||
|
||||
def get_platform_codes(fw_versions: List[bytes]) -> Set[bytes]:
|
||||
codes: DefaultDict[bytes, Set[bytes]] = defaultdict(set)
|
||||
for fw in fw_versions:
|
||||
match = PLATFORM_CODE_PATTERN.search(fw)
|
||||
if match is not None:
|
||||
code, date = match.groups()
|
||||
codes[code].add(date)
|
||||
|
||||
# Create platform codes for all dates inclusive if ECU has FW dates
|
||||
final_codes = set()
|
||||
for code, dates in codes.items():
|
||||
# Radar and some cameras don't have FW dates
|
||||
if None in dates:
|
||||
final_codes.add(code)
|
||||
continue
|
||||
|
||||
try:
|
||||
parsed = {datetime.strptime(date.decode()[:4], '%y%m') for date in dates}
|
||||
except ValueError:
|
||||
cloudlog.exception(f'Error parsing date in FW versions: {code!r}, {dates}')
|
||||
final_codes.add(code)
|
||||
continue
|
||||
|
||||
for date in rrule.rrule(rrule.MONTHLY, dtstart=min(parsed), until=max(parsed)):
|
||||
final_codes.add(code + b'-' + date.strftime('%y%m').encode())
|
||||
|
||||
return final_codes
|
||||
|
||||
|
||||
HYUNDAI_VERSION_REQUEST_LONG = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]) + \
|
||||
p16(0xf100) # Long description
|
||||
|
||||
@@ -355,6 +390,9 @@ HYUNDAI_VERSION_REQUEST_MULTI = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]
|
||||
|
||||
HYUNDAI_VERSION_RESPONSE = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER + 0x40])
|
||||
|
||||
PLATFORM_CODE_PATTERN = re.compile(b'((?<=' + HYUNDAI_VERSION_REQUEST_LONG[1:] +
|
||||
b')[A-Z]{2}[A-Za-z0-9]{0,2})(?:.*([0-9]{6}))?')
|
||||
|
||||
FW_QUERY_CONFIG = FwQueryConfig(
|
||||
requests=[
|
||||
# TODO: minimize shared whitelists for CAN and cornerRadar for CAN-FD
|
||||
@@ -411,6 +449,9 @@ FW_QUERY_CONFIG = FwQueryConfig(
|
||||
(Ecu.hvac, 0x7b3, None), # HVAC Control Assembly
|
||||
(Ecu.cornerRadar, 0x7b7, None),
|
||||
],
|
||||
fuzzy_get_platform_codes=get_platform_codes,
|
||||
# Camera and radar should exist on all cars
|
||||
platform_code_ecus=[Ecu.fwdRadar, Ecu.fwdCamera, Ecu.eps],
|
||||
)
|
||||
|
||||
FW_VERSIONS = {
|
||||
|
||||
@@ -123,6 +123,21 @@ class TestFwFingerprint(unittest.TestCase):
|
||||
with self.subTest():
|
||||
self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}")
|
||||
|
||||
def test_fuzzy_fingerprint_config(self):
|
||||
for brand, config in FW_QUERY_CONFIGS.items():
|
||||
with self.subTest(brand=brand):
|
||||
if config.fuzzy_get_platform_codes is None:
|
||||
self.assertEqual(len(config.platform_code_ecus), 0, "Cannot specify platform code ECUs without full config")
|
||||
else:
|
||||
self.assertGreater(len(config.platform_code_ecus), 0, "Need to specify platform code ECUs")
|
||||
|
||||
# Assert every supported ECU FW version returns one platform code
|
||||
for fw_by_addr in VERSIONS[brand].values():
|
||||
for addr, fws in fw_by_addr.items():
|
||||
if addr[0] in config.platform_code_ecus:
|
||||
for f in fws:
|
||||
self.assertEqual(1, len(config.fuzzy_get_platform_codes([f])), f"Unable to parse FW: {f}")
|
||||
|
||||
def test_fw_request_ecu_whitelist(self):
|
||||
for brand, config in FW_QUERY_CONFIGS.items():
|
||||
with self.subTest(brand=brand):
|
||||
|
||||
Reference in New Issue
Block a user