ruff: enable preview rules (#2621)

* preview

* fmt

* fix formatting!

* Revert "fix formatting!"

This reverts commit 559d291cfa27c3500e5fa3105ffdea809bdd1b27.

* some of these are good, but can enable in steps

* and

* ruff check --fix

* preview

* format
This commit is contained in:
Shane Smiskol
2025-08-04 15:38:04 -07:00
committed by GitHub
parent 7ba3c14462
commit 1302e1bb25
46 changed files with 137 additions and 13 deletions

View File

@@ -11,6 +11,7 @@ from kbhit import KBHit
from opendbc.car.structs import CarControl
from opendbc.car.panda_runner import PandaRunner
class Keyboard:
def __init__(self):
self.kb = KBHit()
@@ -37,6 +38,7 @@ class Keyboard:
return False
return True
class Joystick:
def __init__(self, gamepad=False):
# TODO: find a way to get this from API, perhaps "inputs" doesn't support it
@@ -72,10 +74,12 @@ class Joystick:
return False
return True
def joystick_thread(joystick):
while True:
joystick.update()
def main(joystick):
threading.Thread(target=joystick_thread, args=(joystick,), daemon=True).start()
with PandaRunner() as p:

View File

@@ -6,6 +6,7 @@ from select import select
STDIN_FD = sys.stdin.fileno()
class KBHit:
def __init__(self) -> None:
self.set_kbhit_terminal()

View File

@@ -16,7 +16,6 @@ from opendbc.car.tesla.teslacan import tesla_checksum
from opendbc.car.body.bodycan import body_checksum
class SignalType:
DEFAULT = 0
COUNTER = 1
@@ -47,7 +46,6 @@ class Signal:
calc_checksum: 'Callable[[int, Signal, bytearray], int] | None' = None
@dataclass
class Msg:
name: str
@@ -70,6 +68,7 @@ SGM_RE = re.compile(r"^SG_ (\w+) (\w+) *: (\d+)\|(\d+)@(\d)([+-]) \(([0-9.+\-eE]
VAL_RE = re.compile(r"^VAL_ (\w+) (\w+) (.*);")
VAL_SPLIT_RE = re.compile(r'["]+')
@dataclass
class DBC:
name: str

View File

@@ -11,7 +11,6 @@ MAX_BAD_COUNTER = 5
CAN_INVALID_CNT = 5
def get_raw_value(dat: bytes | bytearray, sig: Signal) -> int:
ret = 0
i = sig.msb // 8
@@ -118,6 +117,7 @@ class VLDict(dict):
self.parser._add_message(key)
return super().__getitem__(key)
class CANParser:
def __init__(self, dbc_name: str, messages: list[tuple[str | int, int]], bus: int):
self.dbc_name: str = dbc_name

View File

@@ -38,6 +38,7 @@ def _benchmark(checks, n):
avg_nanos = et / len(can_msgs)
print('[%d] %.1fms to pack, %.1fms to parse %s messages, avg: %dns' % (n, pack_dt/1e6, et/1e6, len(can_msgs), avg_nanos))
if __name__ == "__main__":
# python -m cProfile -s cumulative benchmark.py
_benchmark([('ACC_CONTROL', 10)], 1)

View File

@@ -228,6 +228,7 @@ class TestCanParserPacker:
packer = CANPacker(dbc_file)
i = 0
def send_msg(blank=False):
nonlocal i
i += 1

View File

@@ -77,6 +77,7 @@ def scale_tire_stiffness(mass, wheelbase, center_to_front, tire_stiffness_factor
DbcDict = dict[StrEnum, str]
class Bus(StrEnum):
pt = auto()
cam = auto()

View File

@@ -4,6 +4,7 @@ import struct
from enum import IntEnum, Enum
from dataclasses import dataclass
@dataclass
class ExchangeStationIdsReturn:
id_length: int
@@ -11,26 +12,31 @@ class ExchangeStationIdsReturn:
available: int
protected: int
@dataclass
class GetDaqListSizeReturn:
list_size: int
first_pid: int
@dataclass
class GetSessionStatusReturn:
status: int
info: int | None
@dataclass
class DiagnosticServiceReturn:
length: int
type: int
@dataclass
class ActionServiceReturn:
length: int
type: int
class COMMAND_CODE(IntEnum):
CONNECT = 0x01
SET_MTA = 0x02
@@ -61,6 +67,7 @@ class COMMAND_CODE(IntEnum):
PROGRAM_6 = 0x22
DNLOAD_6 = 0x23
COMMAND_RETURN_CODES = {
0x00: "acknowledge / no error",
0x01: "DAQ processor overload",
@@ -82,16 +89,20 @@ COMMAND_RETURN_CODES = {
0x36: "resource/function not available",
}
class BYTE_ORDER(Enum):
LITTLE_ENDIAN = '<'
BIG_ENDIAN = '>'
class CommandTimeoutError(Exception):
pass
class CommandCounterError(Exception):
pass
class CommandResponseError(Exception):
def __init__(self, message, return_code):
super().__init__()
@@ -101,6 +112,7 @@ class CommandResponseError(Exception):
def __str__(self):
return self.message
class CcpClient:
def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, byte_order: BYTE_ORDER=BYTE_ORDER.BIG_ENDIAN, debug=False):
self.tx_addr = tx_addr

View File

@@ -5,6 +5,7 @@ from opendbc.car.chrysler.values import RAM_CARS
GearShifter = structs.CarState.GearShifter
VisualAlert = structs.CarControl.HUDControl.VisualAlert
def create_lkas_hud(packer, CP, lkas_active, hud_alert, hud_count, car_model, auto_high_beam):
# LKAS_HUD - Controls what lane-keeping icon is displayed

View File

@@ -9,6 +9,7 @@ RADAR_MSGS_D = list(range(0x2a2, 0x2b4+2, 2)) # d_ messages
LAST_MSG = max(RADAR_MSGS_C + RADAR_MSGS_D)
NUMBER_MSGS = len(RADAR_MSGS_C) + len(RADAR_MSGS_D)
def _create_radar_can_parser(car_fingerprint):
if Bus.radar not in DBC[car_fingerprint]:
return None
@@ -28,6 +29,7 @@ def _create_radar_can_parser(car_fingerprint):
return CANParser(DBC[car_fingerprint][Bus.radar], messages, 1)
def _address_to_track(address):
if address in RADAR_MSGS_C:
return (address - RADAR_MSGS_C[0]) // 2
@@ -35,6 +37,7 @@ def _address_to_track(address):
return (address - RADAR_MSGS_D[0]) // 2
raise ValueError("radar received unexpected address %d" % address)
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)

View File

@@ -18,6 +18,7 @@ class ChryslerFlags(IntFlag):
# Detected flags
HIGHER_MIN_STEERING_SPEED = 1
@dataclass
class ChryslerCarDocs(CarDocs):
package: str = "Adaptive Cruise Control (ACC)"

View File

@@ -1,5 +1,6 @@
import numpy as np
class Conversions:
# Speed
MPH_TO_KPH = 1.609344

View File

@@ -1,6 +1,7 @@
import numpy as np
from numbers import Number
class PIDController:
def __init__(self, k_p, k_i, k_f=0., k_d=0., pos_limit=1e308, neg_limit=-1e308, rate=100):
self._k_p = k_p

View File

@@ -86,6 +86,7 @@ class FordCarDocs(CarDocs):
if CP.carFingerprint in (CAR.FORD_F_150_MK14, CAR.FORD_F_150_LIGHTNING_MK1, CAR.FORD_EXPEDITION_MK4):
self.setup_video = "https://www.youtube.com/watch?v=MewJc9LYp9M"
@dataclass
class FordPlatformConfig(PlatformConfig):
dbc_dict: DbcDict = field(default_factory=lambda: {
@@ -113,6 +114,7 @@ class FordCANFDPlatformConfig(FordPlatformConfig):
super().init()
self.flags |= FordFlags.CANFD
@dataclass
class FordF150LightningPlatform(FordCANFDPlatformConfig):
def init(self):

View File

@@ -177,4 +177,3 @@ class CarState(CarStateBase):
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], 2),
Bus.loopback: CANParser(DBC[CP.carFingerprint][Bus.pt], loopback_messages, 128),
}

View File

@@ -106,6 +106,7 @@ class GMASCMPlatformConfig(GMPlatformConfig):
# ASCM is supported, but due to a janky install and hardware configuration, we are not showing in the car docs
self.car_docs = []
@dataclass
class GMSDGMPlatformConfig(GMPlatformConfig):
def init(self):
@@ -202,12 +203,14 @@ class CruiseButtons:
MAIN = 5
CANCEL = 6
class AccState:
OFF = 0
ACTIVE = 1
FAULTED = 3
STANDSTILL = 4
class CanBus:
POWERTRAIN = 0
OBSTACLE = 1

View File

@@ -10,6 +10,7 @@ Ecu = structs.CarParams.Ecu
VisualAlert = structs.CarControl.HUDControl.VisualAlert
GearShifter = structs.CarState.GearShifter
class CarControllerParams:
# Allow small margin below -3.5 m/s^2 from ISO 15622:2018 since we
# perform the closed loop control, and might need some
@@ -75,6 +76,7 @@ class HondaFlags(IntFlag):
HAS_EPB = 512
ALLOW_MANUAL_TRANS = 1024
# Car button codes
class CruiseButtons:
RES_ACCEL = 4

View File

@@ -3,6 +3,7 @@ from opendbc.car.hyundai.values import CAR, HyundaiFlags
hyundai_checksum = crcmod.mkCrcFun(0x11D, initCrc=0xFD, rev=False, xorOut=0xdf)
def create_lkas11(packer, frame, CP, apply_torque, steer_req,
torque_fault, lkas11, sys_warning, sys_state, enabled,
left_lane, right_lane,
@@ -123,6 +124,7 @@ def create_lfahda_mfc(packer, enabled):
}
return packer.make_can_msg("LFAHDA_MFC", 0, values)
def create_acc_commands(packer, enabled, accel, upper_jerk, idx, hud_control, set_speed, stopping, long_override, use_fca, CP):
commands = []
@@ -185,6 +187,7 @@ def create_acc_commands(packer, enabled, accel, upper_jerk, idx, hud_control, se
return commands
def create_acc_opt(packer, CP):
commands = []
@@ -206,6 +209,7 @@ def create_acc_opt(packer, CP):
return commands
def create_frt_radar_opt(packer):
frt_radar11_values = {
"CF_FCA_Equip_Front_Radar": 1,

View File

@@ -66,6 +66,7 @@ def create_steering_messages(packer, CP, CAN, enabled, lat_active, apply_torque)
return ret
def create_suppress_lfa(packer, CAN, lfa_block_msg, lka_steering_alt):
suppress_msg = "CAM_0x362" if lka_steering_alt else "CAM_0x2a4"
msg_bytes = 32 if lka_steering_alt else 24
@@ -78,6 +79,7 @@ def create_suppress_lfa(packer, CAN, lfa_block_msg, lka_steering_alt):
values["RIGHT_LANE_LINE"] = 0
return packer.make_can_msg(suppress_msg, CAN.ACAN, values)
def create_buttons(packer, CP, CAN, cnt, btn):
values = {
"COUNTER": cnt,
@@ -88,6 +90,7 @@ def create_buttons(packer, CP, CAN, cnt, btn):
bus = CAN.ECAN if CP.flags & HyundaiFlags.CANFD_LKA_STEERING else CAN.CAM
return packer.make_can_msg("CRUISE_BUTTONS", bus, values)
def create_acc_cancel(packer, CP, CAN, cruise_info_copy):
# TODO: why do we copy different values here?
if CP.flags & HyundaiFlags.CANFD_CAMERA_SCC.value:
@@ -118,6 +121,7 @@ def create_acc_cancel(packer, CP, CAN, cruise_info_copy):
})
return packer.make_can_msg("SCC_CONTROL", CAN.ECAN, values)
def create_lfahda_cluster(packer, CAN, enabled):
values = {
"HDA_ICON": 1 if enabled else 0,

View File

@@ -10,6 +10,7 @@ RADAR_MSG_COUNT = 32
# POC for parsing corner radars: https://github.com/commaai/openpilot/pull/24221/
def get_radar_can_parser(CP):
if Bus.radar not in DBC[CP.carFingerprint]:
return None

View File

@@ -387,6 +387,7 @@ INTERFACE_ATTR_FILE = {
# interface-specific helpers
def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> dict[str | StrEnum, Any]:
# read all the folders in opendbc/car and return a dict where:
# - keys are all the car models or brand names

View File

@@ -3,6 +3,7 @@ import time
DEBUG = False
def msg(x):
if DEBUG:
print("S:", binascii.hexlify(x))
@@ -10,7 +11,10 @@ def msg(x):
ret = bytes([len(x)]) + x
return ret.ljust(8, b"\x00")
kmsgs = []
def recv(panda, cnt, addr, nbus):
global kmsgs
ret = []
@@ -27,6 +31,7 @@ def recv(panda, cnt, addr, nbus):
kmsgs = nmsgs[-256:]
return ret
def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr):
msg = recv(panda, 1, addr, bus)[0]
@@ -60,6 +65,7 @@ def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr):
# **** import below this line ****
def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None):
if recvaddr is None:
recvaddr = addr + 8
@@ -102,6 +108,7 @@ def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None):
panda.can_send(addr, dat, bus)
time.sleep(rate)
def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None):
if sendaddr is None:
sendaddr = addr - 8

View File

@@ -23,12 +23,12 @@ def create_steering_control(packer, CP, frame, apply_torque, lkas):
tmp = steering_angle + 2048
ahi = tmp >> 10
amd = (tmp & 0x3FF) >> 2
amd = (amd >> 4) | (( amd & 0xF) << 4)
amd = (amd >> 4) | ((amd & 0xF) << 4)
alo = (tmp & 0x3) << 2
ctr = frame % 16
# bytes: [ 1 ] [ 2 ] [ 3 ] [ 4 ]
csum = 249 - ctr - hi - lo - (lnv << 3) - er1 - (ldw << 7) - ( er2 << 4) - (b1 << 5)
csum = 249 - ctr - hi - lo - (lnv << 3) - er1 - (ldw << 7) - (er2 << 4) - (b1 << 5)
# bytes [ 5 ] [ 6 ] [ 7 ]
csum = csum - ahi - amd - alo - b2
@@ -50,7 +50,7 @@ def create_steering_control(packer, CP, frame, apply_torque, lkas):
"LKAS_REQUEST": apply_torque,
"CTR": ctr,
"ERR_BIT_1": er1,
"LINE_NOT_VISIBLE" : lnv,
"LINE_NOT_VISIBLE": lnv,
"LDW": ldw,
"BIT_1": b1,
"ERR_BIT_2": er2,

View File

@@ -6,6 +6,7 @@ from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData
from opendbc.car.structs import CarParams, CarControl
class PandaRunner(AbstractContextManager):
def __enter__(self):
self.p = Panda()
@@ -52,6 +53,7 @@ class PandaRunner(AbstractContextManager):
self.p.can_send_many(can_sends, timeout=25)
self.p.send_heartbeat()
if __name__ == "__main__":
with PandaRunner() as p:
print(p.read())

View File

@@ -8,6 +8,7 @@ from opendbc.car.rivian.values import DBC
RADAR_START_ADDR = 0x500
RADAR_MSG_COUNT = 32
def get_radar_can_parser(CP):
messages = [(f"RADAR_TRACK_{addr:x}", 20) for addr in range(RADAR_START_ADDR, RADAR_START_ADDR + RADAR_MSG_COUNT)]
return CANParser(DBC[CP.carFingerprint][Bus.radar], messages, 1)

View File

@@ -3,6 +3,7 @@ import struct
from Crypto.Hash import CMAC
from Crypto.Cipher import AES
def add_mac(key, trip_cnt, reset_cnt, msg_cnt, msg):
# TODO: clean up conversion to and from hex
@@ -32,6 +33,7 @@ def add_mac(key, trip_cnt, reset_cnt, msg_cnt, msg):
return (addr, payload, bus)
def build_sync_mac(key, trip_cnt, reset_cnt, id_=0xf):
id_ = struct.pack('>H', id_) # 16
trip_cnt = struct.pack('>H', trip_cnt) # 16

View File

@@ -25,6 +25,7 @@ def create_steering_control_angle(packer, apply_torque, steer_req):
def create_steering_status(packer):
return packer.make_can_msg("ES_LKAS_State", 0, {})
def create_es_distance(packer, frame, es_distance_msg, bus, pcm_cancel_cmd, long_enabled = False, brake_cmd = False, cruise_throttle = 0):
values = {s: es_distance_msg[s] for s in [
"CHECKSUM",
@@ -129,6 +130,7 @@ def create_es_lkas_state(packer, frame, es_lkas_state_msg, enabled, visual_alert
return packer.make_can_msg("ES_LKAS_State", CanBus.main, values)
def create_es_dashstatus(packer, frame, dashstatus_msg, enabled, long_enabled, long_active, lead_visible):
values = {s: dashstatus_msg[s] for s in [
"CHECKSUM",
@@ -177,6 +179,7 @@ def create_es_dashstatus(packer, frame, dashstatus_msg, enabled, long_enabled, l
return packer.make_can_msg("ES_DashStatus", CanBus.main, values)
def create_es_brake(packer, frame, es_brake_msg, long_enabled, long_active, brake_value):
values = {s: es_brake_msg[s] for s in [
"CHECKSUM",
@@ -203,6 +206,7 @@ def create_es_brake(packer, frame, es_brake_msg, long_enabled, long_active, brak
return packer.make_can_msg("ES_Brake", CanBus.main, values)
def create_es_status(packer, frame, es_status_msg, long_enabled, long_active, cruise_rpm):
values = {s: es_status_msg[s] for s in [
"CHECKSUM",

View File

@@ -20,6 +20,7 @@ def _create_radar_can_parser(car_fingerprint):
return CANParser(DBC[car_fingerprint][Bus.radar], messages, 1)
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)

View File

@@ -399,7 +399,7 @@ class CAR(Platforms):
# (addr, cars, bus, 1/freq*100, vl)
STATIC_DSU_MSGS = [
(0x128, (CAR.TOYOTA_PRIUS, CAR.TOYOTA_RAV4H, CAR.LEXUS_RX, CAR.LEXUS_NX, CAR.TOYOTA_RAV4, CAR.TOYOTA_COROLLA, CAR.TOYOTA_AVALON), \
(0x128, (CAR.TOYOTA_PRIUS, CAR.TOYOTA_RAV4H, CAR.LEXUS_RX, CAR.LEXUS_NX, CAR.TOYOTA_RAV4, CAR.TOYOTA_COROLLA, CAR.TOYOTA_AVALON),
1, 3, b'\xf4\x01\x90\x83\x00\x37'),
(0x128, (CAR.TOYOTA_HIGHLANDER, CAR.TOYOTA_SIENNA, CAR.LEXUS_CTH, CAR.LEXUS_ES), 1, 3, b'\x03\x00\x20\x00\x00\x52'),
(0x141, (CAR.TOYOTA_PRIUS, CAR.TOYOTA_RAV4H, CAR.LEXUS_RX, CAR.LEXUS_NX, CAR.TOYOTA_RAV4, CAR.TOYOTA_COROLLA, CAR.TOYOTA_HIGHLANDER, CAR.TOYOTA_AVALON,

View File

@@ -36,12 +36,14 @@ class SERVICE_TYPE(IntEnum):
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
@@ -49,31 +51,37 @@ class RESET_TYPE(IntEnum):
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
@@ -84,11 +92,13 @@ class RESPONSE_EVENT_TYPE(IntEnum):
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
@@ -100,6 +110,7 @@ class BAUD_RATE_TYPE(IntEnum):
CAN500000 = 18
CAN1000000 = 19
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0xF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0xF181
@@ -133,33 +144,39 @@ class DATA_IDENTIFIER_TYPE(IntEnum):
ODX_FILE = 0xF19E
ENTITY = 0xF19F
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
class ISOTP_FRAME_TYPE(IntEnum):
SINGLE = 0
FIRST = 1
CONSECUTIVE = 2
FLOW = 3
class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
@@ -183,6 +200,7 @@ class DTC_REPORT_TYPE(IntEnum):
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
@@ -194,31 +212,37 @@ class DTC_STATUS_MASK_TYPE(IntEnum):
WARNING_INDICATOR_REQUESTED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
class CONTROL_PARAMETER_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super().__init__()
@@ -229,15 +253,19 @@ class NegativeResponseError(Exception):
def __str__(self):
return self.message
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctionError(Exception):
pass
class InvalidSubAddressError(Exception):
pass
_negative_response_codes = {
0x00: 'positive response',
0x10: 'general reject',
@@ -282,6 +310,7 @@ _negative_response_codes = {
0x93: 'voltage too low',
}
def get_dtc_num_as_str(dtc_num_bytes):
# ISO 15031-6
designator = {
@@ -294,6 +323,7 @@ def get_dtc_num_as_str(dtc_num_bytes):
n = bytes([dtc_num_bytes[0] & 0x3F]) + dtc_num_bytes[1:]
return d + n.hex()
def get_dtc_status_names(status):
result = list()
for m in DTC_STATUS_MASK_TYPE:
@@ -303,6 +333,7 @@ def get_dtc_status_names(status):
result.append(m.name)
return result
class CanClient:
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], list[tuple[int, bytes, int]]],
tx_addr: int, rx_addr: int, bus: int, sub_addr: int | None = None, rx_sub_addr: int | None = None):
@@ -383,6 +414,7 @@ class CanClient:
if i % 10 == 9:
self._recv_buffer()
class IsoTpMessage:
def __init__(self, can_client: CanClient, timeout: float = 1, single_frame_mode: bool = False, separation_time: float = 0):
self._can_client = can_client

View File

@@ -275,4 +275,3 @@ class CarState(CarStateBase):
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).pt),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).cam),
}

View File

@@ -26,8 +26,8 @@ class TestVolkswagenPlatformConfigs:
with subtests.test(platform=platform.value):
assert len(platform.config.wmis) > 0, "WMIs not set"
assert len(platform.config.chassis_codes) > 0, "Chassis codes not set"
assert all(CHASSIS_CODE_PATTERN.match(cc) for cc in \
platform.config.chassis_codes), "Bad chassis codes"
assert all(CHASSIS_CODE_PATTERN.match(cc) for cc in
platform.config.chassis_codes), "Bad chassis codes"
# No two platforms should share chassis codes
for comp in CAR:

View File

@@ -3,6 +3,7 @@ import time
import struct
from enum import IntEnum
class COMMAND_CODE(IntEnum):
CONNECT = 0xFF
DISCONNECT = 0xFE
@@ -61,6 +62,7 @@ class COMMAND_CODE(IntEnum):
PROGRAM_MAX = 0xC9
PROGRAM_VERIFY = 0xC8
ERROR_CODES = {
0x00: "Command processor synchronization",
0x10: "Command was not executed",
@@ -82,10 +84,12 @@ ERROR_CODES = {
0x32: "The slave internal program verify routine detects an error",
}
class CONNECT_MODE(IntEnum):
NORMAL = 0x00,
USER_DEFINED = 0x01,
class GET_ID_REQUEST_TYPE(IntEnum):
ASCII = 0x00,
ASAM_MC2_FILE = 0x01,
@@ -94,12 +98,15 @@ class GET_ID_REQUEST_TYPE(IntEnum):
ASAM_MC2_UPLOAD = 0x04,
# 128-255 user defined
class CommandTimeoutError(Exception):
pass
class CommandCounterError(Exception):
pass
class CommandResponseError(Exception):
def __init__(self, message, return_code):
super().__init__()
@@ -109,6 +116,7 @@ class CommandResponseError(Exception):
def __str__(self):
return self.message
class XcpClient:
def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, timeout: float=0.1, debug=False, pad=True):
self.tx_addr = tx_addr

View File

@@ -60,5 +60,6 @@ def create_all(output_path: str):
#print(filename)
create_dbc(src_dir, filename, output_path)
if __name__ == "__main__":
create_all(opendbc_root)

View File

@@ -31,6 +31,7 @@ BO_ {base_id+1} {base_name}_B: 8 Radar
SG_ Index2 : 63|1@1+ (1,0) [0|1] "" Autopilot
"""
def get_val_definition(base_id):
return f"""
VAL_ {base_id+1} MovingState 3 "RADAR_MOVESTATE_STANDING" 2 "RADAR_MOVESTATE_STOPPED" 1 "RADAR_MOVESTATE_MOVING" 0 "RADAR_MOVESTATE_INDETERMINATE" ;

View File

@@ -37,6 +37,7 @@ void can_set_checksum(CANPacket_t *packet);
setup_safety_helpers(ffi)
class CANPacket:
reserved: int
bus: int
@@ -47,6 +48,7 @@ class CANPacket:
addr: int
data: list[int]
class Panda(PandaSafety, Protocol):
# CAN
def can_set_checksum(self, p: CANPacket) -> None: ...

View File

@@ -1,6 +1,7 @@
# panda safety helpers, from safety_helpers.c
from typing import Protocol
def setup_safety_helpers(ffi):
ffi.cdef("""
void set_controls_allowed(bool c);
@@ -52,6 +53,7 @@ def setup_safety_helpers(ffi):
int get_honda_hw(void);
""")
class PandaSafety(Protocol):
def set_controls_allowed(self, c: bool) -> None: ...
def get_controls_allowed(self) -> bool: ...
@@ -100,5 +102,3 @@ class PandaSafety(Protocol):
def set_honda_alt_brake_msg(self, c: bool) -> None: ...
def set_honda_bosch_long(self, c: bool) -> None: ...
def get_honda_hw(self) -> int: ...

View File

@@ -43,6 +43,7 @@ for p in patterns:
mutations = random.sample(mutations, 2) # can remove this once cppcheck is faster
@pytest.mark.parametrize("fn, rule, transform, should_fail", mutations)
def test_misra_mutation(fn, rule, transform, should_fail):
with tempfile.TemporaryDirectory() as tmp:

View File

@@ -4,12 +4,14 @@ from opendbc.car.toyota.values import ToyotaSafetyFlags
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
def to_signed(d, bits):
ret = d
if d >= (1 << (bits - 1)):
ret = d - (1 << bits)
return ret
def is_steering_msg(mode, param, addr):
ret = False
if mode in (CarParams.SafetyModel.hondaNidec, CarParams.SafetyModel.hondaBosch):
@@ -38,6 +40,7 @@ def is_steering_msg(mode, param, addr):
ret = addr == 0x488
return ret
def get_steer_value(mode, param, msg):
# TODO: use CANParser
torque, angle = 0, 0
@@ -77,9 +80,11 @@ def get_steer_value(mode, param, msg):
angle = (((msg.data[0] & 0x7F) << 8) | (msg.data[1])) - 16384 # ceil(1638.35/0.1)
return torque, angle
def package_can_msg(msg):
return libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
def init_segment(safety, msgs, mode, param):
sendcan = (msg for msg in msgs if msg.which() == 'sendcan')
steering_msgs = (can for msg in sendcan for can in msg.sendcan if is_steering_msg(mode, param, can.address))

View File

@@ -7,6 +7,7 @@ from opendbc.car.carlog import carlog
from opendbc.safety.tests.libsafety import libsafety_py
from opendbc.safety.tests.safety_replay.helpers import package_can_msg, init_segment
# replay a drive to check for safety violations
def replay_drive(msgs, safety_mode, param, alternative_experience):
safety = libsafety_py.libsafety
@@ -71,6 +72,7 @@ def replay_drive(msgs, safety_mode, param, alternative_experience):
return tx_controls_blocked == 0 and rx_invalid == 0 and not safety_tick_rx_invalid
if __name__ == "__main__":
from openpilot.tools.lib.logreader import LogReader

View File

@@ -95,6 +95,7 @@ class TestChryslerRamDTSafety(TestChryslerSafety):
values = {"Vehicle_Speed": speed}
return self.packer.make_can_msg_panda("ESP_8", 0, values)
class TestChryslerRamHDSafety(TestChryslerSafety):
TX_MSGS = [[0x275, 0], [0x276, 0], [0x23A, 2]]
RELAY_MALFUNCTION_ADDRS = {0: (0x276, 0x275)}

View File

@@ -10,6 +10,7 @@ from opendbc.safety.tests.common import CANPackerPanda, MAX_WRONG_COUNTERS
HONDA_N_COMMON_TX_MSGS = [[0xE4, 0], [0x194, 0], [0x1FA, 0], [0x30C, 0], [0x33D, 0]]
class Btn:
NONE = 0
MAIN = 1

View File

@@ -190,6 +190,7 @@ class TestHyundaiLegacySafetyHEV(TestHyundaiSafety):
values = {"CR_Vcu_AccPedDep_Pos": gas}
return self.packer.make_can_msg_panda("E_EMS11", 0, values, fix_checksum=checksum)
class TestHyundaiLongitudinalSafety(HyundaiLongitudinalBase, TestHyundaiSafety):
TX_MSGS = [[0x340, 0], [0x4F1, 0], [0x485, 0], [0x420, 0], [0x421, 0], [0x50A, 0], [0x389, 0], [0x4A2, 0], [0x38D, 0], [0x483, 0], [0x7D0, 0]]

View File

@@ -9,6 +9,7 @@ import opendbc.safety.tests.common as common
from opendbc.safety.tests.common import CANPackerPanda
from functools import partial
class SubaruMsg(enum.IntEnum):
Brake_Status = 0x13c
CruiseControl = 0x240
@@ -41,19 +42,23 @@ def lkas_tx_msgs(alt_bus, lkas_msg=SubaruMsg.ES_LKAS):
[SubaruMsg.ES_LKAS_State, SUBARU_MAIN_BUS],
[SubaruMsg.ES_Infotainment, SUBARU_MAIN_BUS]]
def long_tx_msgs(alt_bus):
return [[SubaruMsg.ES_Brake, alt_bus],
[SubaruMsg.ES_Status, alt_bus]]
def gen2_long_additional_tx_msgs():
return [[SubaruMsg.ES_UDS_Request, SUBARU_CAM_BUS],
[SubaruMsg.ES_HighBeamAssist, SUBARU_MAIN_BUS],
[SubaruMsg.ES_STATIC_1, SUBARU_MAIN_BUS],
[SubaruMsg.ES_STATIC_2, SUBARU_MAIN_BUS]]
def fwd_blacklisted_addr(lkas_msg=SubaruMsg.ES_LKAS):
return {SUBARU_CAM_BUS: [lkas_msg, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State, SubaruMsg.ES_Infotainment]}
class TestSubaruSafetyBase(common.PandaCarSafetyTest):
FLAGS = 0
RELAY_MALFUNCTION_ADDRS = {SUBARU_MAIN_BUS: (SubaruMsg.ES_LKAS, SubaruMsg.ES_DashStatus, SubaruMsg.ES_LKAS_State,

View File

@@ -187,5 +187,6 @@ class TestVolkswagenPqLongSafety(TestVolkswagenPqSafetyBase, common.Longitudinal
self.assertTrue(self._tx(self._torque_cmd_msg(self.MAX_RATE_UP, steer_req=1, hca_status=enabled_status)),
f"torque cmd rejected with {enabled_status=}")
if __name__ == "__main__":
unittest.main()

View File

@@ -86,6 +86,7 @@ indent-width = 2
target-version="py311"
[tool.ruff.lint]
preview = true
select = [
"E", "F", "W", "PIE", "C4", "ISC", "A", "B",
"NPY", # numpy
@@ -107,6 +108,10 @@ ignore = [
"UP031",
"NPY002", # new numpy random syntax is worse
"C420",
"E111", "E114", "E226", "E241",
"E221", "E265", "E266", "E261",
"E252", "E251", "E231", "E225",
"E262", "C419",
]
flake8-implicit-str-concat.allow-multiline=false