selfdrive/car: generic CAN send/receive callbacks (#33215)

* start

* hmm API doesn't seem great

* better

* rm line

* sendcan -> can_send callable for best static type coverage, list -> tuple msg

TODO: logcan

* more sendcan -> can_send references

* remove pandad's capnp from selfdrive/car

* fix and remove cereal from test_can_fingerprint

* test_fw_fingerprint: remove pandad, less cereal

* comment done too

* better comment

* fix test_models test

* niceeee it works

* move to can_definitions

* can't come up with a better name :(

* I think we can remove SimpleNamespace soon

* fix test_can_fingerprint.py

* maintain previous behavior

* Revert "maintain previous behavior"

This reverts commit f848fd32132391692c6191a305bb38f74091ec91.

* can test comment

* no need for get_one_can now!

* big clean up: no SimpleNamespace

* now self explanatory!

* not needed

* use empty can again since this is now real

* cmt
old-commit-hash: 9880b1393c
This commit is contained in:
Shane Smiskol 2024-08-08 01:49:25 -05:00 committed by GitHub
parent 8131716c9e
commit a7db075f08
17 changed files with 130 additions and 106 deletions

View File

@ -29,17 +29,16 @@ ignore_imports =
openpilot.selfdrive.car.body.carcontroller -> openpilot.selfdrive.controls.lib.pid
openpilot.selfdrive.car.tests.test_docs -> openpilot.common.basedir
openpilot.selfdrive.car.docs -> openpilot.common.basedir
# car interface will not filter the speed
openpilot.selfdrive.car.interfaces -> openpilot.common.simple_kalman
openpilot.selfdrive.car.gm.interface -> openpilot.common.basedir
openpilot.selfdrive.car.interfaces -> openpilot.common.basedir
# these two will still live in openpilot, but require some modification
# params will need to move to new openpilot files that just call car files
openpilot.selfdrive.car.fw_versions -> openpilot.common.params
openpilot.selfdrive.car.ecu_addrs -> openpilot.common.params
# requires generic CAN send/receive functions
openpilot.selfdrive.car.ecu_addrs -> openpilot.selfdrive.pandad
openpilot.selfdrive.car.isotp_parallel_query -> openpilot.selfdrive.pandad
openpilot.selfdrive.car.tests.test_fw_fingerprint -> openpilot.selfdrive.pandad
# these are okay
openpilot.selfdrive.car.card -> openpilot.common.swaglog

View File

@ -1,7 +1,15 @@
from typing import NamedTuple
from collections.abc import Callable
from typing import NamedTuple, Protocol
class CanData(NamedTuple):
address: int
dat: bytes
src: int
CanSendCallable = Callable[[list[CanData]], None]
class CanRecvCallable(Protocol):
def __call__(self, wait_for_one: bool = False) -> list[list[CanData]]: ...

View File

@ -1,15 +1,14 @@
import os
import time
from collections.abc import Callable
from cereal import car
from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN
from openpilot.selfdrive.car.fw_versions import get_fw_versions_ordered, get_present_ecus, match_fw_to_car
from openpilot.selfdrive.car.fw_versions import ObdCallback, get_fw_versions_ordered, get_present_ecus, match_fw_to_car
from openpilot.selfdrive.car.mock.values import CAR as MOCK
import cereal.messaging as messaging
from openpilot.selfdrive.car import gen_empty_fingerprint
FRAME_FINGERPRINT = 100 # 1s
@ -41,7 +40,7 @@ interface_names = _get_interface_names()
interfaces = load_interfaces(interface_names)
def can_fingerprint(can_recv: Callable) -> tuple[str | None, dict[int, dict]]:
def can_fingerprint(can_recv: CanRecvCallable) -> tuple[str | None, dict[int, dict]]:
finger = gen_empty_fingerprint()
candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
frame = 0
@ -49,10 +48,10 @@ def can_fingerprint(can_recv: Callable) -> tuple[str | None, dict[int, dict]]:
done = False
while not done:
# can_recv() may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv()
# can_recv(wait_for_one=True) may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv(wait_for_one=True)
for can_packet in can_packets:
for can in can_packet.can:
for can in can_packet:
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can.src < 128:
@ -83,7 +82,7 @@ def can_fingerprint(can_recv: Callable) -> tuple[str | None, dict[int, dict]]:
# **** for use live only ****
def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params_raw):
def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int, cached_params_raw: bytes | None):
fixed_fingerprint = os.environ.get('FINGERPRINT', "")
skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False)
@ -109,9 +108,9 @@ def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params
# NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time
set_obd_multiplexing(True)
# VIN query only reliably works through OBDII
vin_rx_addr, vin_rx_bus, vin = get_vin(logcan, sendcan, (0, 1))
ecu_rx_addrs = get_present_ecus(logcan, sendcan, set_obd_multiplexing, num_pandas=num_pandas)
car_fw = get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing, vin, ecu_rx_addrs, num_pandas=num_pandas)
vin_rx_addr, vin_rx_bus, vin = get_vin(can_recv, can_send, (0, 1))
ecu_rx_addrs = get_present_ecus(can_recv, can_send, set_obd_multiplexing, num_pandas=num_pandas)
car_fw = get_fw_versions_ordered(can_recv, can_send, set_obd_multiplexing, vin, ecu_rx_addrs, num_pandas=num_pandas)
cached = False
exact_fw_match, fw_candidates = match_fw_to_car(car_fw, vin)
@ -132,8 +131,8 @@ def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params
# CAN fingerprint
# drain CAN socket so we get the latest messages
messaging.drain_sock_raw(logcan)
car_fingerprint, finger = can_fingerprint(lambda: messaging.drain_sock(logcan, wait_for_one=True))
can_recv()
car_fingerprint, finger = can_fingerprint(can_recv)
exact_match = True
source = car.CarParams.FingerprintSource.can
@ -160,8 +159,9 @@ def get_car_interface(CP):
return CarInterface(CP, CarController, CarState)
def get_car(logcan, sendcan, set_obd_multiplexing, experimental_long_allowed, num_pandas=1, cached_params=None):
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params)
def get_car(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, experimental_long_allowed: bool,
num_pandas: int = 1, cached_params: bytes | None = None):
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(can_recv, can_send, set_obd_multiplexing, num_pandas, cached_params)
if candidate is None:
carlog.error({"event": "car doesn't match any fingerprints", "fingerprints": repr(fingerprints)})

View File

@ -14,6 +14,8 @@ from openpilot.common.swaglog import cloudlog, ForwardingHandler
from openpilot.selfdrive.pandad import can_capnp_to_list, can_list_to_can_capnp
from openpilot.selfdrive.car import DT_CTRL, carlog
from openpilot.selfdrive.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.fw_versions import ObdCallback
from openpilot.selfdrive.car.car_helpers import get_car
from openpilot.selfdrive.car.interfaces import CarInterfaceBase
from openpilot.selfdrive.controls.lib.events import Events
@ -26,7 +28,7 @@ EventName = car.CarEvent.EventName
carlog.addHandler(ForwardingHandler(cloudlog))
def obd_callback(params: Params):
def obd_callback(params: Params) -> ObdCallback:
def set_obd_multiplexing(obd_multiplexing: bool):
if params.get_bool("ObdMultiplexingEnabled") != obd_multiplexing:
cloudlog.warning(f"Setting OBD multiplexing to {obd_multiplexing}")
@ -37,10 +39,28 @@ def obd_callback(params: Params):
return set_obd_multiplexing
def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket) -> tuple[CanRecvCallable, CanSendCallable]:
def can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
"""
wait_for_one: wait the normal logcan socket timeout for a CAN packet, may return empty list if nothing comes
Returns: CAN packets comprised of CanData objects for easy access
"""
ret = []
for can in messaging.drain_sock(logcan, wait_for_one=wait_for_one):
ret.append([CanData(msg.address, msg.dat, msg.src) for msg in can.can])
return ret
def can_send(msgs: list[CanData]) -> None:
sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
return can_recv, can_send
class Car:
CI: CarInterfaceBase
def __init__(self, CI=None):
def __init__(self, CI=None) -> None:
self.can_sock = messaging.sub_sock('can', timeout=20)
self.sm = messaging.SubMaster(['pandaStates', 'carControl', 'onroadEvents'])
self.pm = messaging.PubMaster(['sendcan', 'carState', 'carParams', 'carOutput'])
@ -55,6 +75,8 @@ class Car:
self.params = Params()
self.can_callbacks = can_comm_callbacks(self.can_sock, self.pm.sock['sendcan'])
if CI is None:
# wait for one pandaState and one CAN packet
print("Waiting for CAN messages...")
@ -66,7 +88,7 @@ class Car:
experimental_long_allowed = self.params.get_bool("ExperimentalLongitudinalEnabled")
num_pandas = len(messaging.recv_one_retry(self.sm.sock['pandaStates']).pandaStates)
cached_params = self.params.get("CarParamsCache")
self.CI = get_car(self.can_sock, self.pm.sock['sendcan'], obd_callback(self.params), experimental_long_allowed, num_pandas, cached_params)
self.CI = get_car(*self.can_callbacks, obd_callback(self.params), experimental_long_allowed, num_pandas, cached_params)
self.CP = self.CI.CP
# continue onto next fingerprinting step in pandad
@ -169,7 +191,7 @@ class Car:
if not self.initialized_prev:
# Initialize CarInterface, once controls are ready
# TODO: this can make us miss at least a few cycles when doing an ECU knockout
self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan'])
self.CI.init(self.CP, *self.can_callbacks)
# signal pandad to switch to car safety mode
self.params.put_bool_nonblocking("ControlsReady", True)

View File

@ -8,7 +8,7 @@ EXT_DIAG_RESPONSE = b'\x50\x03'
COM_CONT_RESPONSE = b''
def disable_ecu(logcan, sendcan, bus=0, addr=0x7d0, sub_addr=None, com_cont_req=b'\x28\x83\x01', timeout=0.1, retry=10, debug=False):
def disable_ecu(can_recv, can_send, bus=0, addr=0x7d0, sub_addr=None, com_cont_req=b'\x28\x83\x01', timeout=0.1, retry=10, debug=False):
"""Silence an ECU by disabling sending and receiving messages using UDS 0x28.
The ECU will stay silent as long as openpilot keeps sending Tester Present.
@ -18,12 +18,12 @@ def disable_ecu(logcan, sendcan, bus=0, addr=0x7d0, sub_addr=None, com_cont_req=
for i in range(retry):
try:
query = IsoTpParallelQuery(sendcan, logcan, bus, [(addr, sub_addr)], [EXT_DIAG_REQUEST], [EXT_DIAG_RESPONSE], debug=debug)
query = IsoTpParallelQuery(can_send, can_recv, bus, [(addr, sub_addr)], [EXT_DIAG_REQUEST], [EXT_DIAG_RESPONSE], debug=debug)
for _, _ in query.get_data(timeout).items():
carlog.warning("communication control disable tx/rx ...")
query = IsoTpParallelQuery(sendcan, logcan, bus, [(addr, sub_addr)], [com_cont_req], [COM_CONT_RESPONSE], debug=debug)
query = IsoTpParallelQuery(can_send, can_recv, bus, [(addr, sub_addr)], [com_cont_req], [COM_CONT_RESPONSE], debug=debug)
query.get_data(0)
carlog.warning("ecu disabled")

View File

@ -2,11 +2,10 @@
import capnp
import time
import cereal.messaging as messaging
from panda.python.uds import SERVICE_TYPE
from openpilot.selfdrive.car import make_tester_present_msg, carlog
from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.fw_query_definitions import EcuAddrBusType
from openpilot.selfdrive.pandad import can_list_to_can_capnp
def _is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int = None) -> bool:
@ -23,26 +22,26 @@ def _is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subad
return False
def _get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> set[EcuAddrBusType]:
def _get_all_ecu_addrs(can_recv: CanRecvCallable, can_send: CanSendCallable, bus: int, timeout: float = 1, debug: bool = True) -> set[EcuAddrBusType]:
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
queries: set[EcuAddrBusType] = {(addr, None, bus) for addr in addr_list}
responses = queries
return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
return get_ecu_addrs(can_recv, can_send, queries, responses, timeout=timeout, debug=debug)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: set[EcuAddrBusType],
def get_ecu_addrs(can_recv: CanRecvCallable, can_send: CanSendCallable, queries: set[EcuAddrBusType],
responses: set[EcuAddrBusType], timeout: float = 1, debug: bool = False) -> set[EcuAddrBusType]:
ecu_responses: set[EcuAddrBusType] = set() # set((addr, subaddr, bus),)
try:
msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]
messaging.drain_sock_raw(logcan)
sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
can_recv()
can_send(msgs)
start_time = time.monotonic()
while time.monotonic() - start_time < timeout:
can_packets = messaging.drain_sock(logcan, wait_for_one=True)
can_packets = can_recv(wait_for_one=True)
for packet in can_packets:
for msg in packet.can:
for msg in packet:
if not len(msg.dat):
carlog.warning("ECU addr scan: skipping empty remote frame")
continue
@ -61,6 +60,7 @@ def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, que
if __name__ == "__main__":
import argparse
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.selfdrive.car.card import obd_callback

View File

@ -11,6 +11,7 @@ from cereal import car
from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions
from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
@ -171,7 +172,7 @@ def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], vi
return True, set()
def get_present_ecus(logcan, sendcan, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]:
def get_present_ecus(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]:
# queries are split by OBD multiplexing mode
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []}
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []}
@ -205,7 +206,7 @@ def get_present_ecus(logcan, sendcan, set_obd_multiplexing: ObdCallback, num_pan
for obd_multiplexing in queries:
set_obd_multiplexing(obd_multiplexing)
for query in queries[obd_multiplexing]:
ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1))
ecu_responses.update(get_ecu_addrs(can_recv, can_send, set(query), responses, timeout=0.1))
return ecu_responses
@ -227,8 +228,9 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad
return brand_matches
def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback, vin: str, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1,
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str,
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False,
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
all_car_fw = []
@ -239,7 +241,8 @@ def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback,
if not len(brand_matches[brand]):
continue
car_fw = get_fw_versions(logcan, sendcan, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress)
car_fw = get_fw_versions(can_recv, can_send, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug,
progress=progress)
all_car_fw.extend(car_fw)
# If there is a match using this brand's FW alone, finish querying early
@ -250,8 +253,9 @@ def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback,
return all_car_fw
def get_fw_versions(logcan, sendcan, set_obd_multiplexing: ObdCallback, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1,
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None,
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False,
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
versions = VERSIONS.copy()
if query_brand is not None:
@ -301,7 +305,7 @@ def get_fw_versions(logcan, sendcan, set_obd_multiplexing: ObdCallback, query_br
(len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)]
if query_addrs:
query = IsoTpParallelQuery(sendcan, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug)
query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug)
for (tx_addr, sub_addr), version in query.get_data(timeout).items():
f = car.CarParams.CarFw.new_message()

View File

@ -217,9 +217,9 @@ class CarInterface(CarInterfaceBase):
return ret
@staticmethod
def init(CP, logcan, sendcan):
def init(CP, can_recv, can_send):
if CP.carFingerprint in (HONDA_BOSCH - HONDA_BOSCH_RADARLESS) and CP.openpilotLongitudinalControl:
disable_ecu(logcan, sendcan, bus=1, addr=0x18DAB0F1, com_cont_req=b'\x28\x83\x03')
disable_ecu(can_recv, can_send, bus=1, addr=0x18DAB0F1, com_cont_req=b'\x28\x83\x03')
# returns a car.CarState
def _update(self, c):

View File

@ -139,16 +139,16 @@ class CarInterface(CarInterfaceBase):
return ret
@staticmethod
def init(CP, logcan, sendcan):
def init(CP, can_recv, can_send):
if CP.openpilotLongitudinalControl and not (CP.flags & HyundaiFlags.CANFD_CAMERA_SCC.value):
addr, bus = 0x7d0, 0
if CP.flags & HyundaiFlags.CANFD_HDA2.value:
addr, bus = 0x730, CanBus(CP).ECAN
disable_ecu(logcan, sendcan, bus=bus, addr=addr, com_cont_req=b'\x28\x83\x01')
disable_ecu(can_recv, can_send, bus=bus, addr=addr, com_cont_req=b'\x28\x83\x01')
# for blinkers
if CP.flags & HyundaiFlags.ENABLE_BLINKERS:
disable_ecu(logcan, sendcan, bus=CanBus(CP).ECAN, addr=0x7B1, com_cont_req=b'\x28\x83\x01')
disable_ecu(can_recv, can_send, bus=CanBus(CP).ECAN, addr=0x7B1, com_cont_req=b'\x28\x83\x01')
def _update(self, c):
ret = self.CS.update(self.cp, self.cp_cam)

View File

@ -12,7 +12,7 @@ from cereal import car
from openpilot.common.basedir import BASEDIR
from openpilot.common.simple_kalman import KF1D, get_kalman_gain
from openpilot.selfdrive.car import DT_CTRL, apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, get_friction, STD_CARGO_KG
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.conversions import Conversions as CV
from openpilot.selfdrive.car.helpers import clip
from openpilot.selfdrive.car.values import PLATFORMS
@ -155,7 +155,7 @@ class CarInterfaceBase(ABC):
raise NotImplementedError
@staticmethod
def init(CP, logcan, sendcan):
def init(CP: car.CarParams, can_recv: CanRecvCallable, can_send: CanSendCallable):
pass
@staticmethod

View File

@ -2,20 +2,18 @@ import time
from collections import defaultdict
from functools import partial
import cereal.messaging as messaging
from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.can_definitions import CanData, CanRecvCallable, CanSendCallable
from openpilot.selfdrive.car.fw_query_definitions import AddrType
from openpilot.selfdrive.pandad import can_list_to_can_capnp
from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr
class IsoTpParallelQuery:
def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType],
def __init__(self, can_send: CanSendCallable, can_recv: CanRecvCallable, bus: int, addrs: list[int] | list[AddrType],
request: list[bytes], response: list[bytes], response_offset: int = 0x8,
functional_addrs: list[int] = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
self.sendcan = sendcan
self.logcan = logcan
self.can_send = can_send
self.can_recv = can_recv
self.bus = bus
self.request = request
self.response = response
@ -32,17 +30,17 @@ class IsoTpParallelQuery:
def rx(self) -> None:
"""Drain can socket and sort messages into buffers based on address"""
can_packets = messaging.drain_sock(self.logcan, wait_for_one=True)
can_packets = self.can_recv(wait_for_one=True)
for packet in can_packets:
for msg in packet.can:
for msg in packet:
if msg.src == self.bus and msg.address in self.msg_addrs.values():
self.msg_buffer[msg.address].append(CanData(msg.address, msg.dat, msg.src))
def _can_tx(self, tx_addr, dat, bus):
def _can_tx(self, tx_addr: int, dat: bytes, bus: int):
"""Helper function to send single message"""
msg = [tx_addr, dat, bus]
self.sendcan.send(can_list_to_can_capnp([msg], msgtype='sendcan'))
msg = CanData(tx_addr, dat, bus)
self.can_send([msg])
def _can_rx(self, addr, sub_addr=None):
"""Helper function to retrieve message with specified address and subadress from buffer"""
@ -63,8 +61,8 @@ class IsoTpParallelQuery:
self.msg_buffer[addr] = keep_msgs
return msgs
def _drain_rx(self):
messaging.drain_sock_raw(self.logcan)
def _drain_rx(self) -> None:
self.can_recv()
self.msg_buffer = defaultdict(list)
def _create_isotp_msg(self, tx_addr: int, sub_addr: int | None, rx_addr: int):

View File

@ -106,6 +106,6 @@ class CarInterface(CarInterfaceBase):
return ret
@staticmethod
def init(CP, logcan, sendcan):
def init(CP, can_recv, can_send):
if CP.flags & SubaruFlags.DISABLE_EYESIGHT:
disable_ecu(logcan, sendcan, bus=2, addr=GLOBAL_ES_ADDR, com_cont_req=b'\x28\x03\x01')
disable_ecu(can_recv, can_send, bus=2, addr=GLOBAL_ES_ADDR, com_cont_req=b'\x28\x03\x01')

View File

@ -1,6 +1,6 @@
from parameterized import parameterized
from cereal import log, messaging
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.car_helpers import FRAME_FINGERPRINT, can_fingerprint
from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS
@ -11,13 +11,11 @@ class TestCanFingerprint:
"""Tests online fingerprinting function on offline fingerprints"""
for fingerprint in fingerprints: # can have multiple fingerprints for each platform
can = messaging.new_message('can', 1)
can.can = [log.CanData(address=address, dat=b'\x00' * length, src=src)
can = [CanData(address=address, dat=b'\x00' * length, src=src)
for address, length in fingerprint.items() for src in (0, 1)]
fingerprint_iter = iter([can])
empty_can = messaging.new_message('can', 0)
car_fingerprint, finger = can_fingerprint(lambda: [next(fingerprint_iter, empty_can)]) # noqa: B023
car_fingerprint, finger = can_fingerprint(lambda **kwargs: [next(fingerprint_iter, [])]) # noqa: B023
assert car_fingerprint == car_model
assert finger[0] == fingerprint
@ -32,30 +30,27 @@ class TestCanFingerprint:
cases = []
# case 1 - one match, make sure we keep going for 100 frames
can = messaging.new_message('can', 1)
can.can = [log.CanData(address=address, dat=b'\x00' * length, src=src)
can = [CanData(address=address, dat=b'\x00' * length, src=src)
for address, length in fingerprint.items() for src in (0, 1)]
cases.append((FRAME_FINGERPRINT, car_model, can))
# case 2 - no matches, make sure we keep going for 100 frames
can = messaging.new_message('can', 1)
can.can = [log.CanData(address=1, dat=b'\x00' * 1, src=src) for src in (0, 1)] # uncommon address
can = [CanData(address=1, dat=b'\x00' * 1, src=src) for src in (0, 1)] # uncommon address
cases.append((FRAME_FINGERPRINT, None, can))
# case 3 - multiple matches, make sure we keep going for 200 frames to try to eliminate some
can = messaging.new_message('can', 1)
can.can = [log.CanData(address=2016, dat=b'\x00' * 8, src=src) for src in (0, 1)] # common address
can = [CanData(address=2016, dat=b'\x00' * 8, src=src) for src in (0, 1)] # common address
cases.append((FRAME_FINGERPRINT * 2, None, can))
for expected_frames, car_model, can in cases:
with subtests.test(expected_frames=expected_frames, car_model=car_model):
frames = 0
def test():
def can_recv(**kwargs):
nonlocal frames
frames += 1
return [can] # noqa: B023
car_fingerprint, _ = can_fingerprint(test)
car_fingerprint, _ = can_fingerprint(can_recv)
assert car_fingerprint == car_model
assert frames == expected_frames + 2 # TODO: fix extra frames

View File

@ -11,7 +11,6 @@ from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \
match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_fw_versions_ordered, get_present_ecus
from openpilot.selfdrive.car.vin import get_vin
from openpilot.selfdrive.pandad import can_list_to_can_capnp
CarFw = car.CarParams.CarFw
Ecu = car.CarParams.Ecu
@ -19,15 +18,6 @@ Ecu = car.CarParams.Ecu
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
class FakeSocket:
def receive(self, non_blocking=False):
return (can_list_to_can_capnp([CanData(random.randint(0x600, 0x800), b'\x00' * 8, 0)])
if random.uniform(0, 1) > 0.5 else None)
def send(self, msg):
pass
class TestFwFingerprint:
def assertFingerprints(self, candidates, expected):
candidates = list(candidates)
@ -214,6 +204,15 @@ class TestFwFingerprintTiming:
current_obd_multiplexing: bool
total_time: float
@staticmethod
def fake_can_send(msgs):
pass
@staticmethod
def fake_can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
return ([[CanData(random.randint(0x600, 0x800), b'\x00' * 8, 0)]]
if random.uniform(0, 1) > 0.5 else [])
def fake_set_obd_multiplexing(self, obd_multiplexing):
"""The 10Hz blocking params loop adds on average 50ms to the query time for each OBD multiplexing change"""
if obd_multiplexing != self.current_obd_multiplexing:
@ -225,7 +224,6 @@ class TestFwFingerprintTiming:
return {}
def _benchmark_brand(self, brand, num_pandas, mocker):
fake_socket = FakeSocket()
self.total_time = 0
mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data)
for _ in range(self.N):
@ -233,7 +231,7 @@ class TestFwFingerprintTiming:
self.current_obd_multiplexing = True
t = time.perf_counter()
get_fw_versions(fake_socket, fake_socket, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas)
get_fw_versions(self.fake_can_recv, self.fake_can_send, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas)
self.total_time += time.perf_counter() - t
return self.total_time / self.N
@ -251,12 +249,11 @@ class TestFwFingerprintTiming:
self.total_time += timeout
return set()
fake_socket = FakeSocket()
self.total_time = 0.0
mocker.patch("openpilot.selfdrive.car.fw_versions.get_ecu_addrs", fake_get_ecu_addrs)
for _ in range(self.N):
self.current_obd_multiplexing = True
get_present_ecus(fake_socket, fake_socket, self.fake_set_obd_multiplexing, num_pandas=2)
get_present_ecus(self.fake_can_recv, self.fake_can_send, self.fake_set_obd_multiplexing, num_pandas=2)
self._assert_timing(self.total_time / self.N, present_ecu_ref_time)
print(f'get_present_ecus, query time={self.total_time / self.N} seconds')
@ -265,7 +262,7 @@ class TestFwFingerprintTiming:
self.total_time = 0.0
mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data)
for _ in range(self.N):
get_vin(fake_socket, fake_socket, (0, 1), **args)
get_vin(self.fake_can_recv, self.fake_can_send, (0, 1), **args)
self._assert_timing(self.total_time / self.N, vin_ref_times[name])
print(f'get_vin {name} case, query time={self.total_time / self.N} seconds')
@ -324,8 +321,7 @@ class TestFwFingerprintTiming:
raise
mocker.patch("openpilot.selfdrive.car.carlog.exception", fake_carlog_exception)
fake_socket = FakeSocket()
get_fw_versions_ordered(fake_socket, fake_socket, lambda obd: None, '0' * 17, set())
get_fw_versions_ordered(self.fake_can_recv, self.fake_can_send, lambda obd: None, '0' * 17, set())
for brand in FW_QUERY_CONFIGS.keys():
with subtests.test(brand=brand):
get_fw_versions(fake_socket, fake_socket, lambda obd: None, brand)
get_fw_versions(self.fake_can_recv, self.fake_can_send, lambda obd: None, brand)

View File

@ -138,11 +138,11 @@ class CarInterface(CarInterfaceBase):
return ret
@staticmethod
def init(CP, logcan, sendcan):
def init(CP, can_recv, can_send):
# disable radar if alpha longitudinal toggled on radar-ACC car
if CP.flags & ToyotaFlags.DISABLE_RADAR.value:
communication_control = bytes([uds.SERVICE_TYPE.COMMUNICATION_CONTROL, uds.CONTROL_TYPE.ENABLE_RX_DISABLE_TX, uds.MESSAGE_TYPE.NORMAL])
disable_ecu(logcan, sendcan, bus=0, addr=0x750, sub_addr=0xf, com_cont_req=communication_control)
disable_ecu(can_recv, can_send, bus=0, addr=0x750, sub_addr=0xf, com_cont_req=communication_control)
# returns a car.CarState
def _update(self, c):

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import re
import cereal.messaging as messaging
from panda.python.uds import get_rx_addr_for_tx_addr, FUNCTIONAL_ADDRS
from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
@ -15,7 +14,7 @@ def is_valid_vin(vin: str):
return re.fullmatch(VIN_RE, vin) is not None
def get_vin(logcan, sendcan, buses, timeout=0.1, retry=2, debug=False):
def get_vin(can_recv, can_send, buses, timeout=0.1, retry=2, debug=False):
for i in range(retry):
for bus in buses:
for request, response, valid_buses, vin_addrs, functional_addrs, rx_offset in (
@ -35,7 +34,7 @@ def get_vin(logcan, sendcan, buses, timeout=0.1, retry=2, debug=False):
tx_addrs = [a for a in range(0x700, 0x800) if a != 0x7DF] + list(range(0x18DA00F1, 0x18DB00F1, 0x100))
try:
query = IsoTpParallelQuery(sendcan, logcan, bus, tx_addrs, [request, ], [response, ], response_offset=rx_offset,
query = IsoTpParallelQuery(can_send, can_recv, bus, tx_addrs, [request, ], [response, ], response_offset=rx_offset,
functional_addrs=functional_addrs, debug=debug)
results = query.get_data(timeout)
@ -63,6 +62,7 @@ def get_vin(logcan, sendcan, buses, timeout=0.1, retry=2, debug=False):
if __name__ == "__main__":
import argparse
import time
import cereal.messaging as messaging
parser = argparse.ArgumentParser(description='Get VIN of the car')
parser.add_argument('--debug', action='store_true')

View File

@ -22,6 +22,7 @@ from openpilot.common.prefix import OpenpilotPrefix
from openpilot.common.timeout import Timeout
from openpilot.common.realtime import DT_CTRL
from panda.python import ALTERNATIVE_EXPERIENCE
from openpilot.selfdrive.car.card import can_comm_callbacks
from openpilot.selfdrive.car.car_helpers import get_car, interfaces
from openpilot.system.manager.process_config import managed_processes
from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
@ -356,7 +357,8 @@ def get_car_params_callback(rc, pm, msgs, fingerprint):
for m in canmsgs[:300]:
can.send(m.as_builder().to_bytes())
CP = get_car(can, sendcan, lambda obd: None, Params().get_bool("ExperimentalLongitudinalEnabled"), cached_params=cached_params).CP
can_callbacks = can_comm_callbacks(can, sendcan)
CP = get_car(*can_callbacks, lambda obd: None, Params().get_bool("ExperimentalLongitudinalEnabled"), cached_params=cached_params).CP
if not params.get_bool("DisengageOnAccelerator"):
CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS