diff --git a/selfdrive/car/ford/tests/print_platform_codes.py b/selfdrive/car/ford/tests/print_platform_codes.py new file mode 100755 index 0000000000..670199980a --- /dev/null +++ b/selfdrive/car/ford/tests/print_platform_codes.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +from collections import defaultdict + +from cereal import car +from openpilot.selfdrive.car.ford.values import get_platform_codes +from openpilot.selfdrive.car.ford.fingerprints import FW_VERSIONS + +Ecu = car.CarParams.Ecu +ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} + + +if __name__ == "__main__": + cars_for_code: defaultdict = defaultdict(lambda: defaultdict(set)) + + for car_model, ecus in FW_VERSIONS.items(): + print(car_model) + for ecu in sorted(ecus, key=lambda x: int(x[0])): + platform_codes = get_platform_codes(ecus[ecu]) + for code in platform_codes: + cars_for_code[ecu][code].add(car_model) + + print(f' (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])}, {ecu[2]}):') + print(f' Codes: {sorted(platform_codes)}') + print() + + print('\nCar models vs. platform codes:') + for ecu, codes in cars_for_code.items(): + print(f' (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])}, {ecu[2]}):') + for code, cars in codes.items(): + print(f' {code!r}: {sorted(map(str, cars))}') diff --git a/selfdrive/car/ford/tests/test_ford.py b/selfdrive/car/ford/tests/test_ford.py index 2ad3f5db1b..5d7b2c3332 100755 --- a/selfdrive/car/ford/tests/test_ford.py +++ b/selfdrive/car/ford/tests/test_ford.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 +import random import unittest -from parameterized import parameterized from collections.abc import Iterable import capnp +from hypothesis import settings, given, strategies as st +from parameterized import parameterized from cereal import car -from openpilot.selfdrive.car.ford.values import FW_QUERY_CONFIG +from openpilot.selfdrive.car.fw_versions import build_fw_dict +from openpilot.selfdrive.car.ford.values import CAR, FW_QUERY_CONFIG, FW_PATTERN, get_platform_codes from openpilot.selfdrive.car.ford.fingerprints import FW_VERSIONS Ecu = car.CarParams.Ecu @@ -23,7 +26,7 @@ ECU_ADDRESSES = { } -ECU_FW_CORE = { +ECU_PART_NUMBER = { Ecu.eps: [ b"14D003", ], @@ -37,9 +40,6 @@ ECU_FW_CORE = { b"14F397", # Ford Q3 b"14H102", # Ford Q4 ], - Ecu.engine: [ - b"14C204", - ], } @@ -53,29 +53,96 @@ class TestFordFW(unittest.TestCase): @parameterized.expand(FW_VERSIONS.items()) def test_fw_versions(self, car_model: str, fw_versions: dict[tuple[capnp.lib.capnp._EnumModule, int, int | None], Iterable[bytes]]): for (ecu, addr, subaddr), fws in fw_versions.items(): - self.assertIn(ecu, ECU_FW_CORE, "Unexpected ECU") + self.assertIn(ecu, ECU_PART_NUMBER, "Unexpected ECU") self.assertEqual(addr, ECU_ADDRESSES[ecu], "ECU address mismatch") self.assertIsNone(subaddr, "Unexpected ECU subaddress") - # Software part number takes the form: PREFIX-CORE-SUFFIX - # Prefix changes based on the family of part. It includes the model year - # and likely the platform. - # Core identifies the type of the item (e.g. 14D003 = PSCM, 14C204 = PCM). - # Suffix specifies the version of the part. -AA would be followed by -AB. - # Small increments in the suffix are usually compatible. - # Details: https://forscan.org/forum/viewtopic.php?p=70008#p70008 for fw in fws: self.assertEqual(len(fw), 24, "Expected ECU response to be 24 bytes") - # TODO: parse with regex, don't need detailed error message - fw_parts = fw.rstrip(b'\x00').split(b'-') - self.assertEqual(len(fw_parts), 3, "Expected FW to be in format: prefix-core-suffix") + match = FW_PATTERN.match(fw) + self.assertIsNotNone(match, f"Unable to parse FW: {fw!r}") + if match: + part_number = match.group("part_number") + self.assertIn(part_number, ECU_PART_NUMBER[ecu], f"Unexpected part number for {fw!r}") - prefix, core, suffix = fw_parts - self.assertEqual(len(prefix), 4, "Expected FW prefix to be 4 characters") - self.assertIn(len(core), (5, 6), "Expected FW core to be 5-6 characters") - self.assertIn(core, ECU_FW_CORE[ecu], f"Unexpected FW core for {ecu}") - self.assertIn(len(suffix), (2, 3), "Expected FW suffix to be 2-3 characters") + codes = get_platform_codes([fw]) + self.assertEqual(1, len(codes), f"Unable to parse FW: {fw!r}") + + @settings(max_examples=100) + @given(data=st.data()) + def test_platform_codes_fuzzy_fw(self, data): + """Ensure function doesn't raise an exception""" + fw_strategy = st.lists(st.binary()) + fws = data.draw(fw_strategy) + get_platform_codes(fws) + + def test_platform_codes_spot_check(self): + # Asserts basic platform code parsing behavior for a few cases + results = get_platform_codes([ + b"JX6A-14C204-BPL\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"NZ6T-14F397-AC\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"PJ6T-14H102-ABJ\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"LB5A-14C204-EAC\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ]) + self.assertEqual(results, {(b"X6A", b"J"), (b"Z6T", b"N"), (b"J6T", b"P"), (b"B5A", b"L")}) + + def test_fuzzy_match(self): + for platform, fw_by_addr in FW_VERSIONS.items(): + # Ensure there's no overlaps in platform codes + for _ in range(20): + car_fw = [] + for ecu, fw_versions in fw_by_addr.items(): + ecu_name, addr, sub_addr = ecu + fw = random.choice(fw_versions) + car_fw.append({"ecu": ecu_name, "fwVersion": fw, "address": addr, + "subAddress": 0 if sub_addr is None else sub_addr}) + + CP = car.CarParams.new_message(carFw=car_fw) + matches = FW_QUERY_CONFIG.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), CP.carVin, FW_VERSIONS) + self.assertEqual(matches, {platform}) + + def test_match_fw_fuzzy(self): + offline_fw = { + (Ecu.eps, 0x730, None): [ + b"L1MC-14D003-AJ\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"L1MC-14D003-AL\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ], + (Ecu.abs, 0x760, None): [ + b"L1MC-2D053-BA\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"L1MC-2D053-BD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ], + (Ecu.fwdRadar, 0x764, None): [ + b"LB5T-14D049-AB\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"LB5T-14D049-AD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ], + # We consider all model year hints for ECU, even with different platform codes + (Ecu.fwdCamera, 0x706, None): [ + b"LB5T-14F397-AD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"NC5T-14F397-AF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ], + } + expected_fingerprint = CAR.FORD_EXPLORER_MK6 + + # ensure that we fuzzy match on all non-exact FW with changed revisions + live_fw = { + (0x730, None): {b"L1MC-14D003-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, + (0x760, None): {b"L1MC-2D053-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, + (0x764, None): {b"LB5T-14D049-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, + (0x706, None): {b"LB5T-14F397-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, + } + candidates = FW_QUERY_CONFIG.match_fw_to_car_fuzzy(live_fw, '', {expected_fingerprint: offline_fw}) + self.assertEqual(candidates, {expected_fingerprint}) + + # model year hint in between the range should match + live_fw[(0x706, None)] = {b"MB5T-14F397-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"} + candidates = FW_QUERY_CONFIG.match_fw_to_car_fuzzy(live_fw, '', {expected_fingerprint: offline_fw,}) + self.assertEqual(candidates, {expected_fingerprint}) + + # unseen model year hint should not match + live_fw[(0x760, None)] = {b"M1MC-2D053-XX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"} + candidates = FW_QUERY_CONFIG.match_fw_to_car_fuzzy(live_fw, '', {expected_fingerprint: offline_fw}) + self.assertEqual(len(candidates), 0, "Should not match new model year hint") if __name__ == "__main__": diff --git a/selfdrive/car/ford/values.py b/selfdrive/car/ford/values.py index a5494c921c..b1868bfa9b 100644 --- a/selfdrive/car/ford/values.py +++ b/selfdrive/car/ford/values.py @@ -1,4 +1,5 @@ import copy +import re from dataclasses import dataclass, field, replace from enum import Enum, IntFlag @@ -7,7 +8,7 @@ from cereal import car from openpilot.selfdrive.car import AngleRateLimit, CarSpecs, dbc_dict, DbcDict, PlatformConfig, Platforms from openpilot.selfdrive.car.docs_definitions import CarFootnote, CarHarness, CarDocs, CarParts, Column, \ Device -from openpilot.selfdrive.car.fw_query_definitions import FwQueryConfig, Request, StdQueries, p16 +from openpilot.selfdrive.car.fw_query_definitions import FwQueryConfig, LiveFwVersions, OfflineFwVersions, Request, StdQueries, p16 Ecu = car.CarParams.Ecu @@ -143,6 +144,76 @@ class CAR(Platforms): ) +# FW response contains a combined software and part number +# A-Z except no I, O or W +# e.g. NZ6A-14C204-AAA +# 1222-333333-444 +# 1 = Model year hint (approximates model year/generation) +# 2 = Platform hint +# 3 = Part number +# 4 = Software version +FW_ALPHABET = b'A-HJ-NP-VX-Z' +FW_PATTERN = re.compile(b'^(?P[' + FW_ALPHABET + b'])' + + b'(?P[0-9' + FW_ALPHABET + b']{3})-' + + b'(?P[0-9' + FW_ALPHABET + b']{5,6})-' + + b'(?P[' + FW_ALPHABET + b']{2,})\x00*$') + + +def get_platform_codes(fw_versions: list[bytes] | set[bytes]) -> set[tuple[bytes, bytes]]: + codes = set() + for fw in fw_versions: + match = FW_PATTERN.match(fw) + if match is not None: + codes.add((match.group('platform_hint'), match.group('model_year_hint'))) + + return codes + + +def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, vin: str, offline_fw_versions: OfflineFwVersions) -> set[str]: + candidates: set[str] = set() + + for candidate, fws in offline_fw_versions.items(): + # Keep track of ECUs which pass all checks (platform hint, within model year hint range) + valid_found_ecus = set() + valid_expected_ecus = {ecu[1:] for ecu in fws if ecu[0] in PLATFORM_CODE_ECUS} + for ecu, expected_versions in fws.items(): + addr = ecu[1:] + # Only check ECUs expected to have platform codes + if ecu[0] not in PLATFORM_CODE_ECUS: + continue + + # Expected platform codes & model year hints + codes = get_platform_codes(expected_versions) + expected_platform_codes = {code for code, _ in codes} + expected_model_year_hints = {model_year_hint for _, model_year_hint in codes} + + # Found platform codes & model year hints + codes = get_platform_codes(live_fw_versions.get(addr, set())) + found_platform_codes = {code for code, _ in codes} + found_model_year_hints = {model_year_hint for _, model_year_hint in codes} + + # Check platform code matches for any found versions + if not any(found_platform_code in expected_platform_codes for found_platform_code in found_platform_codes): + break + + # Check any model year hint within range in the database. Note that some models have more than one + # platform code per ECU which we don't consider as separate ranges + if not any(min(expected_model_year_hints) <= found_model_year_hint <= max(expected_model_year_hints) for + found_model_year_hint in found_model_year_hints): + break + + valid_found_ecus.add(addr) + + # If all live ECUs pass all checks for candidate, add it as a match + if valid_expected_ecus.issubset(valid_found_ecus): + candidates.add(candidate) + + return candidates + + +# All of these ECUs must be present and are expected to have platform codes we can match +PLATFORM_CODE_ECUS = (Ecu.abs, Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps) + DATA_IDENTIFIER_FORD_ASBUILT = 0xDE00 ASBUILT_BLOCKS: list[tuple[int, list]] = [ @@ -201,6 +272,8 @@ FW_QUERY_CONFIG = FwQueryConfig( (Ecu.shiftByWire, 0x732, None), # Gear Shift Module (Ecu.debug, 0x7d0, None), # Accessory Protocol Interface Module ], + # Custom fuzzy fingerprinting function using platform and model year hints + match_fw_to_car_fuzzy=match_fw_to_car_fuzzy, ) DBC = CAR.create_dbc_map()