mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-03-01 14:23:53 +08:00
Common interface attribute function (#24731)
* replace get_attr_from_cars with get_interface_attr * and not combining the brands * explicit check * minimize diff * values
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
import os
|
||||
from typing import Any, Dict, List
|
||||
from typing import Dict, List
|
||||
|
||||
from cereal import car
|
||||
from common.params import Params
|
||||
from common.basedir import BASEDIR
|
||||
from selfdrive.version import is_comma_remote, is_tested_branch
|
||||
from selfdrive.car.interfaces import get_interface_attr
|
||||
from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
|
||||
from selfdrive.car.vin import get_vin, VIN_UNKNOWN
|
||||
from selfdrive.car.fw_versions import get_fw_versions, match_fw_to_car
|
||||
@@ -11,7 +13,6 @@ from selfdrive.swaglog import cloudlog
|
||||
import cereal.messaging as messaging
|
||||
from selfdrive.car import gen_empty_fingerprint
|
||||
|
||||
from cereal import car
|
||||
EventName = car.CarEvent.EventName
|
||||
|
||||
|
||||
@@ -59,19 +60,6 @@ def load_interfaces(brand_names):
|
||||
return ret
|
||||
|
||||
|
||||
def get_interface_attr(attr: str) -> Dict[str, Any]:
|
||||
# returns given attribute from each interface
|
||||
brand_names = {}
|
||||
for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
|
||||
try:
|
||||
brand_name = car_folder.split('/')[-1]
|
||||
attr_data = getattr(__import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr]), attr, None)
|
||||
brand_names[brand_name] = attr_data
|
||||
except (ImportError, OSError):
|
||||
pass
|
||||
return brand_names
|
||||
|
||||
|
||||
def _get_interface_names() -> Dict[str, List[str]]:
|
||||
# returns a dict of brand name and its respective models
|
||||
brand_names = {}
|
||||
|
||||
@@ -15,9 +15,8 @@ from selfdrive.car.tests.routes import non_tested_cars
|
||||
|
||||
def get_all_footnotes() -> Dict[Enum, int]:
|
||||
all_footnotes = []
|
||||
for _, footnotes in get_interface_attr("Footnote").items():
|
||||
if footnotes is not None:
|
||||
all_footnotes += footnotes
|
||||
for footnotes in get_interface_attr("Footnote", ignore_none=True).values():
|
||||
all_footnotes += footnotes
|
||||
return {fn: idx + 1 for idx, fn in enumerate(all_footnotes)}
|
||||
|
||||
|
||||
@@ -28,21 +27,20 @@ CARS_MD_TEMPLATE = os.path.join(BASEDIR, "selfdrive", "car", "CARS_template.md")
|
||||
|
||||
def get_all_car_info() -> List[CarInfo]:
|
||||
all_car_info: List[CarInfo] = []
|
||||
for models in get_interface_attr("CAR_INFO").values():
|
||||
for model, car_info in models.items():
|
||||
# Hyundai exception: those with radar have openpilot longitudinal
|
||||
fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}}
|
||||
CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True)
|
||||
for model, car_info in get_interface_attr("CAR_INFO", combine_brands=True).items():
|
||||
# Hyundai exception: those with radar have openpilot longitudinal
|
||||
fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}}
|
||||
CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True)
|
||||
|
||||
if CP.dashcamOnly or car_info is None:
|
||||
continue
|
||||
if CP.dashcamOnly or car_info is None:
|
||||
continue
|
||||
|
||||
# A platform can include multiple car models
|
||||
if not isinstance(car_info, list):
|
||||
car_info = (car_info,)
|
||||
# A platform can include multiple car models
|
||||
if not isinstance(car_info, list):
|
||||
car_info = (car_info,)
|
||||
|
||||
for _car_info in car_info:
|
||||
all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES))
|
||||
for _car_info in car_info:
|
||||
all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES))
|
||||
|
||||
# Sort cars by make and model + year
|
||||
sorted_cars: List[CarInfo] = natsorted(all_car_info, key=lambda car: (car.make + car.model).lower())
|
||||
|
||||
@@ -1,44 +1,12 @@
|
||||
import os
|
||||
from common.basedir import BASEDIR
|
||||
from selfdrive.car.interfaces import get_interface_attr
|
||||
|
||||
|
||||
def get_attr_from_cars(attr, result=dict, combine_brands=True):
|
||||
# read all the folders in selfdrive/car and return a dict where:
|
||||
# - keys are all the car models
|
||||
# - values are attr values from all car folders
|
||||
result = result()
|
||||
|
||||
for car_folder in [x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]:
|
||||
try:
|
||||
car_name = car_folder.split('/')[-1]
|
||||
values = __import__(f'selfdrive.car.{car_name}.values', fromlist=[attr])
|
||||
if hasattr(values, attr):
|
||||
attr_values = getattr(values, attr)
|
||||
else:
|
||||
continue
|
||||
|
||||
if isinstance(attr_values, dict):
|
||||
for f, v in attr_values.items():
|
||||
if combine_brands:
|
||||
result[f] = v
|
||||
else:
|
||||
if car_name not in result:
|
||||
result[car_name] = {}
|
||||
result[car_name][f] = v
|
||||
elif isinstance(attr_values, list):
|
||||
result += attr_values
|
||||
|
||||
except (ImportError, OSError):
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
FW_VERSIONS = get_attr_from_cars('FW_VERSIONS')
|
||||
_FINGERPRINTS = get_attr_from_cars('FINGERPRINTS')
|
||||
FW_VERSIONS = get_interface_attr('FW_VERSIONS', combine_brands=True, ignore_none=True)
|
||||
_FINGERPRINTS = get_interface_attr('FINGERPRINTS', combine_brands=True, ignore_none=True)
|
||||
|
||||
_DEBUG_ADDRESS = {1880: 8} # reserved for debug purposes
|
||||
|
||||
|
||||
def is_valid_for_fingerprint(msg, car_fingerprint):
|
||||
adr = msg.address
|
||||
# ignore addresses that are more than 11 bits
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
import struct
|
||||
import traceback
|
||||
from typing import Any, List
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
|
||||
from typing import Any, List
|
||||
from tqdm import tqdm
|
||||
|
||||
import panda.python.uds as uds
|
||||
from cereal import car
|
||||
from selfdrive.car.fingerprints import FW_VERSIONS, get_attr_from_cars
|
||||
from selfdrive.car.interfaces import get_interface_attr
|
||||
from selfdrive.car.fingerprints import FW_VERSIONS
|
||||
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
|
||||
from selfdrive.car.toyota.values import CAR as TOYOTA
|
||||
from selfdrive.swaglog import cloudlog
|
||||
@@ -303,7 +303,7 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr
|
||||
addrs = []
|
||||
parallel_addrs = []
|
||||
|
||||
versions = get_attr_from_cars('FW_VERSIONS', combine_brands=False)
|
||||
versions = get_interface_attr('FW_VERSIONS', ignore_none=True)
|
||||
if extra is not None:
|
||||
versions.update(extra)
|
||||
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import os
|
||||
import time
|
||||
from abc import abstractmethod, ABC
|
||||
from typing import Dict, Tuple, List
|
||||
from typing import Any, Dict, Tuple, List
|
||||
|
||||
from cereal import car
|
||||
from common.basedir import BASEDIR
|
||||
from common.conversions import Conversions as CV
|
||||
from common.kalman.simple_kalman import KF1D
|
||||
from common.realtime import DT_CTRL
|
||||
from selfdrive.car import gen_empty_fingerprint
|
||||
from common.conversions import Conversions as CV
|
||||
from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX
|
||||
from selfdrive.controls.lib.events import Events
|
||||
from selfdrive.controls.lib.vehicle_model import VehicleModel
|
||||
@@ -22,7 +23,6 @@ ACCEL_MIN = -3.5
|
||||
|
||||
# generic car and radar interfaces
|
||||
|
||||
|
||||
class CarInterfaceBase(ABC):
|
||||
def __init__(self, CP, CarController, CarState):
|
||||
self.CP = CP
|
||||
@@ -293,3 +293,31 @@ class CarStateBase(ABC):
|
||||
@staticmethod
|
||||
def get_loopback_can_parser(CP):
|
||||
return None
|
||||
|
||||
|
||||
# interface-specific helpers
|
||||
|
||||
def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> Dict[str, Any]:
|
||||
# read all the folders in selfdrive/car and return a dict where:
|
||||
# - keys are all the car models or brand names
|
||||
# - values are attr values from all car folders
|
||||
result = {}
|
||||
for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
|
||||
try:
|
||||
brand_name = car_folder.split('/')[-1]
|
||||
brand_values = __import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr])
|
||||
if hasattr(brand_values, attr) or not ignore_none:
|
||||
attr_data = getattr(brand_values, attr, None)
|
||||
else:
|
||||
continue
|
||||
|
||||
if combine_brands:
|
||||
if isinstance(attr_data, dict):
|
||||
for f, v in attr_data.items():
|
||||
result[f] = v
|
||||
else:
|
||||
result[brand_name] = attr_data
|
||||
except (ImportError, OSError):
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
@@ -18,7 +18,7 @@ class TestCarDocs(unittest.TestCase):
|
||||
"Run selfdrive/car/docs.py to generate new supported cars documentation")
|
||||
|
||||
def test_missing_car_info(self):
|
||||
all_car_info_platforms = [p for i in get_interface_attr("CAR_INFO").values() for p in i]
|
||||
all_car_info_platforms = get_interface_attr("CAR_INFO", combine_brands=True).keys()
|
||||
for platform in sorted(interfaces.keys()):
|
||||
if platform not in all_car_info_platforms:
|
||||
self.fail("Platform: {} doesn't exist in CarInfo".format(platform))
|
||||
|
||||
Reference in New Issue
Block a user