feat: Squash all pre-brand features into pre

This commit is contained in:
Rick Lan
2026-01-08 12:31:31 +08:00
parent ef7cd06332
commit 29beffdd30
27 changed files with 635 additions and 36 deletions

View File

@@ -156,4 +156,10 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"dp_dev_tethering", {PERSISTENT, BOOL, "0"}},
{"dp_ui_mici", {PERSISTENT, BOOL, "0"}},
{"dp_lat_offset_cm", {PERSISTENT, INT, "0"}},
{"dp_toyota_door_auto_lock_unlock", {PERSISTENT, BOOL, "0"}},
{"dp_toyota_tss1_sng", {PERSISTENT, BOOL, "0"}},
{"dp_toyota_stock_lon", {PERSISTENT, BOOL, "0"}},
{"dp_vag_a0_sng", {PERSISTENT, BOOL, "0"}},
{"dp_vag_pq_steering_patch", {PERSISTENT, BOOL, "0"}},
{"dp_vag_avoid_eps_lockout", {PERSISTENT, BOOL, "0"}},
};

View File

@@ -8,6 +8,24 @@ SETTINGS = [
"title": "Toyota / Lexus",
"condition": "brand == 'toyota'",
"settings": [
{
"key": "dp_toyota_door_auto_lock_unlock",
"type": "toggle_item",
"title": lambda: tr("Door Auto Lock/Unlock"),
"description": lambda: tr("Enable openpilot to auto-lock doors above 20 km/h and auto-unlock when shifting to Park."),
},
{
"key": "dp_toyota_tss1_sng",
"type": "toggle_item",
"title": lambda: tr("Enable TSS1 SnG Mod"),
"description": ""
},
{
"key": "dp_toyota_stock_lon",
"type": "toggle_item",
"title": lambda: tr("Use Stock Longitudinal Control"),
"description": ""
},
],
},
@@ -15,6 +33,24 @@ SETTINGS = [
"title": "VAG",
"condition": "brand == 'volkswagen'",
"settings": [
{
"key": "dp_vag_a0_sng",
"type": "toggle_item",
"title": lambda: tr("MQB A0 SnG Mod"),
"description": ""
},
{
"key": "dp_vag_pq_steering_patch",
"type": "toggle_item",
"title": lambda: tr("PQ Steering Patch"),
"description": "",
},
{
"key": "dp_vag_avoid_eps_lockout",
"type": "toggle_item",
"title": lambda: tr("Avoid EPS Lockout"),
"description": ""
},
],
},

View File

@@ -91,6 +91,9 @@ class Bus(StrEnum):
party = auto()
ap_party = auto()
sdsu = auto()
zss = auto()
def rate_limit(new_value, last_value, dw_step, up_step):
return float(np.clip(new_value, last_value + dw_step, last_value + up_step))

View File

@@ -91,13 +91,25 @@ class CarInterface(CarInterfaceBase):
ret.longitudinalTuning.kiV = [1.2, 0.8, 0.5]
# Disable control if EPS mod detected
eps_modified = False
for fw in car_fw:
if fw.ecu == "eps" and b"," in fw.fwVersion:
ret.dashcamOnly = True
# ret.dashcamOnly = True
eps_modified = True
if candidate == CAR.HONDA_CIVIC:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560], [0, 2560]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[1.1], [0.33]]
if eps_modified:
# stock request input values: 0x0000, 0x00DE, 0x014D, 0x01EF, 0x0290, 0x0377, 0x0454, 0x0610, 0x06EE
# stock request output values: 0x0000, 0x0917, 0x0DC5, 0x1017, 0x119F, 0x140B, 0x1680, 0x1680, 0x1680
# modified request output values: 0x0000, 0x0917, 0x0DC5, 0x1017, 0x119F, 0x140B, 0x1680, 0x2880, 0x3180
# stock filter output values: 0x009F, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108
# modified filter output values: 0x009F, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0108, 0x0400, 0x0480
# note: max request allowed is 4096, but request is capped at 3840 in firmware, so modifications result in 2x max
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560, 8000], [0, 2560, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.3], [0.1]]
else:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560], [0, 2560]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[1.1], [0.33]]
elif candidate in (CAR.HONDA_CIVIC_BOSCH, CAR.HONDA_CIVIC_BOSCH_DIESEL):
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 4096], [0, 4096]] # TODO: determine if there is a dead zone at the top end
@@ -112,7 +124,10 @@ class CarInterface(CarInterfaceBase):
elif candidate == CAR.HONDA_ACCORD:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 4096], [0, 4096]] # TODO: determine if there is a dead zone at the top end
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.6], [0.18]]
if eps_modified:
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.3], [0.09]]
else:
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.6], [0.18]]
elif candidate == CAR.ACURA_ILX:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]] # TODO: determine if there is a dead zone at the top end
@@ -124,8 +139,15 @@ class CarInterface(CarInterfaceBase):
ret.wheelSpeedFactor = 1.025
elif candidate == CAR.HONDA_CRV_5G:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.64], [0.192]]
if eps_modified:
# stock request input values: 0x0000, 0x00DB, 0x01BB, 0x0296, 0x0377, 0x0454, 0x0532, 0x0610, 0x067F
# stock request output values: 0x0000, 0x0500, 0x0A15, 0x0E6D, 0x1100, 0x1200, 0x129A, 0x134D, 0x1400
# modified request output values: 0x0000, 0x0500, 0x0A15, 0x0E6D, 0x1100, 0x1200, 0x1ACD, 0x239A, 0x2800
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 2560, 10000], [0, 2560, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.21], [0.07]]
else:
ret.lateralParams.torqueBP, ret.lateralParams.torqueV = [[0, 3840], [0, 3840]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.64], [0.192]]
ret.wheelSpeedFactor = 1.025
elif candidate == CAR.HONDA_CRV_HYBRID:

View File

@@ -152,6 +152,13 @@ class CarInterface(CarInterfaceBase):
if candidate in (CAR.KIA_OPTIMA_H,):
ret.dashcamOnly = True
# w/ SMDPS, allow steering to 0
if 0x2AA in fingerprint[0]:
ret.minSteerSpeed = 0.
print("----------------------------------------------")
print("dragonpilot: SMDPS detected!")
print("----------------------------------------------")
return ret
@staticmethod

View File

@@ -22,4 +22,10 @@ CarParamsT = capnp.lib.capnp._StructModule
class DPFlags:
LatALKA = 1
ExtRadar = 2
ToyotaLockCtrl = 2 ** 2
ToyotaTSS1SnG = 2 ** 3
ToyotaStockLon = 2 ** 4
VagA0SnG = 2 ** 5
VAGPQSteeringPatch = 2 ** 6
VagAvoidEPSLockout = 2 ** 7
pass

View File

@@ -33,6 +33,8 @@ class CarInterface(CarInterfaceBase):
ret.safetyConfigs = [get_safety_config(structs.CarParams.SafetyModel.subaru)]
if ret.flags & SubaruFlags.GLOBAL_GEN2:
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.GEN2.value
elif ret.flags & (SubaruFlags.IMPREZA_2018 | SubaruFlags.HYBRID) :
ret.safetyConfigs[0].safetyParam |= SubaruSafetyFlags.IMPREZA_2018.value
ret.steerLimitTimer = 0.4
ret.steerActuatorDelay = 0.1
@@ -50,11 +52,11 @@ class CarInterface(CarInterfaceBase):
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.0025, 0.1], [0.00025, 0.01]]
elif candidate == CAR.SUBARU_IMPREZA:
ret.steerActuatorDelay = 0.4 # end-to-end angle controller
ret.steerActuatorDelay = 0.1 # end-to-end angle controller
ret.lateralTuning.init('pid')
ret.lateralTuning.pid.kf = 0.00005
ret.lateralTuning.pid.kf = 0.00003
ret.lateralTuning.pid.kiBP, ret.lateralTuning.pid.kpBP = [[0., 20.], [0., 20.]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.2, 0.3], [0.02, 0.03]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.12, 0.18], [0.012, 0.018]]
elif candidate == CAR.SUBARU_IMPREZA_2020:
ret.lateralTuning.init('pid')

View File

@@ -26,6 +26,12 @@ class CarControllerParams:
elif CP.carFingerprint == CAR.SUBARU_IMPREZA_2020:
self.STEER_DELTA_UP = 35
self.STEER_MAX = 1439
self.STEER_DELTA_UP = 35
self.STEER_DELTA_DOWN = 70
elif CP.carFingerprint == CAR.SUBARU_IMPREZA:
self.STEER_MAX = 3071
self.STEER_DELTA_UP = 60
self.STEER_DELTA_DOWN = 60
else:
self.STEER_MAX = 2047
@@ -57,7 +63,7 @@ class SubaruSafetyFlags(IntFlag):
GEN2 = 1
LONG = 2
PREGLOBAL_REVERSED_DRIVER_TORQUE = 4
IMPREZA_2018 = 8
class SubaruFlags(IntFlag):
# Detected flags
@@ -74,6 +80,8 @@ class SubaruFlags(IntFlag):
HYBRID = 32
LKAS_ANGLE = 64
# rick
IMPREZA_2018 = 2 ** 10
GLOBAL_ES_ADDR = 0x787
GEN2_ES_BUTTONS_DID = b'\x11\x30'
@@ -143,7 +151,8 @@ class CAR(Platforms):
SubaruCarDocs("Subaru Crosstrek 2018-19", video="https://youtu.be/Agww7oE1k-s?t=26"),
SubaruCarDocs("Subaru XV 2018-19", video="https://youtu.be/Agww7oE1k-s?t=26"),
],
CarSpecs(mass=1568, wheelbase=2.67, steerRatio=15),
CarSpecs(mass=1568, wheelbase=2.67, steerRatio=13.5),
flags=SubaruFlags.IMPREZA_2018,
)
SUBARU_IMPREZA_2020 = SubaruPlatformConfig(
[

View File

@@ -268,6 +268,7 @@ routes = [
CarTestRoute("6719965b0e1d1737/2023-02-09--22-44-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid
CarTestRoute("6719965b0e1d1737/2023-08-29--06-40-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid, openpilot longitudinal, radar disabled
CarTestRoute("14623aae37e549f3/2021-10-24--01-20-49", TOYOTA.TOYOTA_PRIUS_V),
CarTestRoute("ea8fbe72b96a185c|2023-02-22--09-20-34", TOYOTA.TOYOTA_CHR_TSS2), # openpilot longitudinal, with Radar Filter
CarTestRoute("202c40641158a6e5/2021-09-21--09-43-24", VOLKSWAGEN.VOLKSWAGEN_ARTEON_MK1),
CarTestRoute("2c68dda277d887ac/2021-05-11--15-22-20", VOLKSWAGEN.VOLKSWAGEN_ATLAS_MK1),

View File

@@ -2,6 +2,7 @@ import math
import numpy as np
from opendbc.car import Bus, make_tester_present_msg, rate_limit, structs, ACCELERATION_DUE_TO_GRAVITY, DT_CTRL
from opendbc.car.lateral import apply_meas_steer_torque_limits, apply_std_steer_angle_limits, common_fault_avoidance
from opendbc.car.can_definitions import CanData
from opendbc.car.carlog import carlog
from opendbc.car.common.filter_simple import FirstOrderFilter, HighPassFilter
from opendbc.car.common.pid import PIDController
@@ -34,6 +35,17 @@ MAX_STEER_RATE_FRAMES = 18 # tx control frames needed before torque can be cut
# EPS allows user torque above threshold for 50 frames before permanently faulting
MAX_USER_TORQUE = 500
# Lock / unlock door commands - Credit goes to AlexandreSato!
from opendbc.car.common.conversions import Conversions as CV
LOCK_SPEED = 20 * CV.KPH_TO_MS
LOCK_UNLOCK_CAN_ID = 0x750
UNLOCK_CMD = b'\x40\x05\x30\x11\x00\x40\x00\x00'
LOCK_CMD = b'\x40\x05\x30\x11\x00\x80\x00\x00'
from cereal import car
PARK = car.CarState.GearShifter.park
DRIVE = car.CarState.GearShifter.drive
def get_long_tune(CP, params):
if CP.carFingerprint in TSS2_CAR:
@@ -78,6 +90,8 @@ class CarController(CarControllerBase):
self.secoc_acc_message_counter = 0
self.secoc_prev_reset_counter = 0
self.doors_locked = False
def update(self, CC, CS, now_nanos):
actuators = CC.actuators
stopping = actuators.longControlState == LongCtrlState.stopping
@@ -190,6 +204,9 @@ class CarController(CarControllerBase):
if not should_resume and CS.out.cruiseState.standstill:
self.standstill_req = True
if (self.CP.flags & ToyotaFlags.TSS1_SNG.value) and CS.out.standstill and not self.last_standstill:
self.standstill_req = False
self.last_standstill = CS.out.standstill
# handle UI messages
@@ -261,6 +278,10 @@ class CarController(CarControllerBase):
elif net_acceleration_request_min > 0.3:
self.permit_braking = False
# rick - do not do delay compensation for non-TSS2 vehicles (e.g. car with sDSU?), assign the value back to actuators.accel
# from Jason, see https://github.com/sunnypilot/opendbc/compare/dd2016f77f8467ca2f7934db1b8c6d73164b3df7...f90b75b1531d0ef949c1c7fb8c175059448a2a97#diff-dc03b1fc7156134429efc0cdced75bc227d0ceb8bbd0c55763022fb9db6794d9
if not self.CP.carFingerprint in TSS2_CAR:
pcm_accel_cmd = actuators.accel
pcm_accel_cmd = float(np.clip(pcm_accel_cmd, self.params.ACCEL_MIN, self.params.ACCEL_MAX))
main_accel_cmd = 0. if self.CP.flags & ToyotaFlags.SECOC.value else pcm_accel_cmd
@@ -306,11 +327,11 @@ class CarController(CarControllerBase):
hud_control.rightLaneVisible, hud_control.leftLaneDepart,
hud_control.rightLaneDepart, lat_active, CS.lkas_hud))
if (self.frame % 100 == 0 or send_ui) and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if (self.frame % 100 == 0 or send_ui) and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value and not self.CP.flags & ToyotaFlags.RADAR_FILTER.value:
can_sends.append(toyotacan.create_fcw_command(self.packer, fcw_alert))
# keep radar disabled
if self.frame % 20 == 0 and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if self.frame % 20 == 0 and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value and not self.CP.flags & ToyotaFlags.RADAR_FILTER.value:
can_sends.append(make_tester_present_msg(0x750, 0, 0xF))
new_actuators = actuators.as_builder()
@@ -319,5 +340,13 @@ class CarController(CarControllerBase):
new_actuators.steeringAngleDeg = self.last_angle
new_actuators.accel = self.accel
if self.CP.flags & ToyotaFlags.LOCK_CTRL.value:
if not self.doors_locked and CS.out.gearShifter == DRIVE and CS.out.vEgo >= LOCK_SPEED:
can_sends.append(CanData(LOCK_UNLOCK_CAN_ID, LOCK_CMD, 0))
self.doors_locked = True
elif self.doors_locked and CS.out.gearShifter == PARK:
can_sends.append(CanData(LOCK_UNLOCK_CAN_ID, UNLOCK_CMD, 0))
self.doors_locked = False
self.frame += 1
return new_actuators, can_sends

View File

@@ -53,12 +53,32 @@ class CarState(CarStateBase):
self.gvc = 0.0
self.secoc_synchronization = None
# radar filter (mainly for CHR/Camry)
# the idea is to place a Panda in between Radar and camera/body (engine room) to block 0x343 (longitudinal)
# depends on the firmware, we should be able to read most CAN directly from cp (not cp_cam, its empty)
self.dp_radar_filter = bool(self.CP.flags & ToyotaFlags.RADAR_FILTER.value)
from opendbc.car.toyota.sdsu import SDSU
self.sdsu = SDSU(CP.flags)
# rick - dsu_bypass from cydia2020: https://github.com/cydia2020/toyota-dsu-reroute-harness/
# the idea is to "re-route" the DSU to Panda CAN2 (Which connects to ADAS Camera)
# * when comma device is not available, the DSU message can still communicate with ADAS Camera, and over to car.
# * when comma device is active, CAN message of DSU and ADAS camera will the be blocked by Panda, only forward some CAN messages over to car (from CAN0).
self.dp_dsu_bypass = self.CP.flags & ToyotaFlags.DSU_BYPASS.value
from opendbc.car.toyota.zss import ZSS
self.zss = ZSS(CP.flags)
def update(self, can_parsers) -> structs.CarState:
cp = can_parsers[Bus.pt]
cp_cam = can_parsers[Bus.cam]
ret = structs.CarState()
cp_acc = cp_cam if self.CP.carFingerprint in (TSS2_CAR - RADAR_ACC_CAR) else cp
if self.dp_dsu_bypass:
cp_acc = cp_cam
else:
cp_acc = cp_cam if self.CP.carFingerprint in (TSS2_CAR - RADAR_ACC_CAR) else cp
if not self.CP.flags & ToyotaFlags.SECOC.value:
self.gvc = cp.vl["VSC1S07"]["GVC"]
@@ -78,7 +98,7 @@ class CarState(CarStateBase):
else:
ret.gasPressed = cp.vl["PCM_CRUISE"]["GAS_RELEASED"] == 0 # TODO: these also have GAS_PEDAL, come back and unify
can_gear = int(cp.vl["GEAR_PACKET"]["GEAR"])
if not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value or self.dp_radar_filter:
ret.stockAeb = bool(cp_acc.vl["PRE_COLLISION"]["PRECOLLISION_ACTIVE"] and cp_acc.vl["PRE_COLLISION"]["FORCE"] < -1e-5)
self.parse_wheel_speeds(ret,
@@ -117,6 +137,10 @@ class CarState(CarStateBase):
# we could use the override bit from dbc, but it's triggered at too high torque values
ret.steeringPressed = abs(ret.steeringTorque) > STEER_THRESHOLD
if self.zss.enabled:
self.zss.set_values(can_parsers[Bus.zss])
ret.steeringAngleDeg = self.zss.get_steering_angle_deg(cp.vl["PCM_CRUISE"]["CRUISE_ACTIVE"], ret.steeringAngleDeg)
# Check EPS LKA/LTA fault status
ret.steerFaultTemporary = cp.vl["EPS_STATUS"]["LKA_STATE"] in TEMP_STEER_FAULTS
ret.steerFaultPermanent = cp.vl["EPS_STATUS"]["LKA_STATE"] in PERM_STEER_FAULTS
@@ -147,8 +171,10 @@ class CarState(CarStateBase):
conversion_factor = CV.KPH_TO_MS if is_metric else CV.MPH_TO_MS
ret.cruiseState.speedCluster = cluster_set_speed * conversion_factor
if self.CP.carFingerprint in TSS2_CAR and not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:
self.acc_type = cp_acc.vl["ACC_CONTROL"]["ACC_TYPE"]
if (self.CP.carFingerprint in TSS2_CAR and not self.CP.flags & ToyotaFlags.DISABLE_RADAR.value) or \
self.dp_dsu_bypass:
if not (self.CP.flags & ToyotaFlags.SDSU.value):
self.acc_type = cp_acc.vl["ACC_CONTROL"]["ACC_TYPE"]
ret.stockFcw = bool(cp_acc.vl["PCS_HUD"]["FCW"])
# some TSS2 cars have low speed lockout permanently set, so ignore on those cars
@@ -191,7 +217,15 @@ class CarState(CarStateBase):
buttonEvents.extend(create_button_events(1, 0, {1: ButtonType.lkas}) +
create_button_events(0, 1, {1: ButtonType.lkas}))
if self.CP.carFingerprint not in (RADAR_ACC_CAR | SECOC_CAR):
if self.sdsu.enabled:
# The follow distance button signal as forwarded by the sdsu
self.sdsu.update_states(can_parsers[Bus.sdsu])
prev_distance_button = self.distance_button
self.distance_button = self.sdsu.dist_btn
buttonEvents += create_button_events(self.distance_button, prev_distance_button, {1: ButtonType.gapAdjustCruise})
elif self.CP.carFingerprint not in (RADAR_ACC_CAR | SECOC_CAR):
# distance button is wired to the ACC module (camera or radar)
prev_distance_button = self.distance_button
self.distance_button = cp_acc.vl["ACC_CONTROL"]["DISTANCE"]
@@ -211,7 +245,15 @@ class CarState(CarStateBase):
("BLINKERS_STATE", float('nan')),
]
return {
parsers = {
Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, 0),
Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], 2),
}
if CP.flags & ToyotaFlags.SDSU:
parsers[Bus.sdsu] = CANParser("toyota_sdsu", [("SDSU", 100)], 0)
if CP.flags & ToyotaFlags.ZSS:
parsers[Bus.zss] = CANParser("toyota_zss", [("SECONDARY_STEER_ANGLE", float('nan'))], 0)
return parsers

View File

@@ -59,13 +59,32 @@ class CarInterface(CarInterfaceBase):
if Ecu.hybrid in found_ecus:
ret.flags |= ToyotaFlags.HYBRID.value
# 0x343 should not be present on bus 2 on cars other than TSS2_CAR unless we are re-routing DSU
dsu_bypass = False
if (0x343 in fingerprint[2] or 0x4CB in fingerprint[2]) and candidate not in TSS2_CAR:
print("----------------------------------------------")
print("dragonpilot: DSU_BYPASS detected!")
print("----------------------------------------------")
# rick - disable for now, breaks TOYOTA_AVALON_2019 model tests.
# dsu_bypass = True
# ret.flags |= ToyotaFlags.DSU_BYPASS.value
if 0x23 in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: ZSS detected!")
print("----------------------------------------------")
ret.flags |= ToyotaFlags.ZSS.value
if candidate == CAR.TOYOTA_PRIUS:
stop_and_go = True
# Only give steer angle deadzone to for bad angle sensor prius
for fw in car_fw:
if fw.ecu == "eps" and not fw.fwVersion == b'8965B47060\x00\x00\x00\x00\x00\x00':
ret.steerActuatorDelay = 0.25
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning, steering_angle_deadzone_deg=0.2)
if ret.flags & ToyotaFlags.ZSS.value:
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
else:
ret.steerActuatorDelay = 0.25
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning, steering_angle_deadzone_deg=0.2)
elif candidate in (CAR.LEXUS_RX, CAR.LEXUS_RX_TSS2):
stop_and_go = True
@@ -115,6 +134,27 @@ class CarInterface(CarInterfaceBase):
if alpha_long and candidate in RADAR_ACC_CAR:
ret.flags |= ToyotaFlags.DISABLE_RADAR.value
# RADAR_ACC_CAR = CHR TSS2 / RAV4 TSS2
# NO_DSU_CAR = CAMRY / CHR
if 0x2FF in fingerprint[0] or 0x2AA in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: RADAR_FILTER detected!")
print("----------------------------------------------")
ret.alphaLongitudinalAvailable = False
ret.flags |= ToyotaFlags.RADAR_FILTER.value | ToyotaFlags.DISABLE_RADAR.value
sdsu_active = False
if not (candidate in (RADAR_ACC_CAR | NO_DSU_CAR)) and 0x2FF in fingerprint[0]:
print("----------------------------------------------")
print("dragonpilot: SDSU detected!")
print("----------------------------------------------")
sdsu_active = True
stop_and_go = True
ret.flags |= ToyotaFlags.SDSU.value
ret.alphaLongitudinalAvailable = False
# openpilot longitudinal enabled by default:
# - cars w/ DSU disconnected
# - TSS2 cars with camera sending ACC_CONTROL where we can block it
@@ -122,7 +162,13 @@ class CarInterface(CarInterfaceBase):
# - TSS2 radar ACC cars (disables radar)
ret.openpilotLongitudinalControl = (candidate in (TSS2_CAR - RADAR_ACC_CAR) or
bool(ret.flags & ToyotaFlags.DISABLE_RADAR.value))
bool(ret.flags & ToyotaFlags.DISABLE_RADAR.value) or \
dsu_bypass or \
sdsu_active)
if dp_params & structs.DPFlags.ToyotaStockLon:
ret.openpilotLongitudinalControl = False
ret.alphaLongitudinalAvailable = False
ret.autoResumeSng = ret.openpilotLongitudinalControl and candidate in NO_STOP_TIMER_CAR
@@ -144,12 +190,19 @@ class CarInterface(CarInterfaceBase):
if ret.flags & ToyotaFlags.HYBRID.value:
ret.longitudinalActuatorDelay = 0.05
if dp_params & structs.DPFlags.ToyotaLockCtrl:
ret.flags |= ToyotaFlags.LOCK_CTRL.value
ret.safetyConfigs[0].safetyParam |= ToyotaSafetyFlags.LOCK_CTRL.value
if dp_params & structs.DPFlags.ToyotaTSS1SnG:
ret.flags |= ToyotaFlags.TSS1_SNG.value
return ret
@staticmethod
def init(CP, can_recv, can_send, communication_control=None):
# disable radar if alpha longitudinal toggled on radar-ACC car
if CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if not CP.flags & ToyotaFlags.RADAR_FILTER.value and CP.flags & ToyotaFlags.DISABLE_RADAR.value:
if communication_control is None:
communication_control = bytes([uds.SERVICE_TYPE.COMMUNICATION_CONTROL, uds.CONTROL_TYPE.ENABLE_RX_DISABLE_TX, uds.MESSAGE_TYPE.NORMAL])
disable_ecu(can_recv, can_send, bus=0, addr=0x750, sub_addr=0xf, com_cont_req=communication_control)

View File

@@ -0,0 +1,18 @@
# Copyright (c) 2025, Rick Lan
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, and/or sublicense, for non-commercial purposes only, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
# Commercial use (e.g. use in a product, service, or activity intended to generate revenue) is prohibited without explicit written permission from the copyright holder.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from opendbc.car.toyota.values import ToyotaFlags
from opendbc.can.parser import CANParser
class SDSU:
def __init__(self, flags: int):
self.enabled = flags & ToyotaFlags.SDSU.value
self.dist_btn = 0
def update_states(self, cp: CANParser):
self.dist_btn = cp.vl["SDSU"]["FD_BUTTON"]

View File

@@ -57,11 +57,14 @@ class ToyotaSafetyFlags(IntFlag):
LTA = (4 << 8)
SECOC = (8 << 8)
UNSUPPORTED_DSU = (16 << 8) # dp - use DSU_CRUISE (0x365) for ACC main instead of PCM_CRUISE_2 (0x1D3)
LOCK_CTRL = (32 << 8)
class ToyotaFlags(IntFlag):
# Detected flags
HYBRID = 1
# use legacy id
SDSU = 2
DISABLE_RADAR = 4
# Static flags
@@ -80,6 +83,11 @@ class ToyotaFlags(IntFlag):
# these cars are speculated to allow stop and go when the DSU is unplugged
SNG_WITHOUT_DSU_DEPRECATED = 512
LOCK_CTRL = 2 ** 13
TSS1_SNG = 2 ** 14
RADAR_FILTER = 2 ** 15
DSU_BYPASS = 2 ** 16
ZSS = 2 ** 17
def dbc_dict(pt, radar):
return {Bus.pt: pt, Bus.radar: radar}

View File

@@ -0,0 +1,81 @@
from opendbc.car.toyota.values import ToyotaFlags
from opendbc.can.parser import CANParser
ANGLE_DIFF_THRESHOLD = 4.0
THRESHOLD_COUNT = 10
class ZSS:
def __init__(self, flags: int):
self.enabled = flags & ToyotaFlags.ZSS.value
# self._alka_enabled = flags & ToyotaFlags.ALKA.value
self._offset_compute_once = True
# self._alka_active_prev = False
self._cruise_active_prev = False
self._offset = 0.
self._threshold_count = 0
self._zss_value = 0.
self._print_allow = True
def set_values(self, cp: CANParser):
self._zss_value = cp.vl["SECONDARY_STEER_ANGLE"]["ZORRO_STEER"]
def get_enabled(self):
return self.enabled
def get_steering_angle_deg(self, cruise_active, stock_steering_angle_deg):
# off, fall back to stock
if not self.enabled:
return stock_steering_angle_deg
# when lka control is off, use stock
# alka_active = self._is_alka_active(main_on)
if not cruise_active: # and not alka_active:
return stock_steering_angle_deg
# lka just activated
# if not self._offset_compute_once and ((alka_active and not self._alka_active_prev) or (cruise_active and not self._cruise_active_prev)):
if not self._offset_compute_once and (cruise_active and not self._cruise_active_prev):
self._threshold_count = 0
self._offset_compute_once = True
self._print_allow = True
# self._alka_active_prev = alka_active
self._cruise_active_prev = cruise_active
# compute offset when required
if self._offset_compute_once:
self._offset_compute_once = not self._compute_offset(stock_steering_angle_deg)
# error checking
if self._threshold_count >= THRESHOLD_COUNT:
if self._print_allow:
print("ZSS: Too many large diff, fallback to stock.")
self._print_allow = False
return stock_steering_angle_deg
if self._offset_compute_once:
print("ZSS: Compute offset required, fallback to stock.")
return stock_steering_angle_deg
zss_steering_angle_deg = self._zss_value - self._offset
angle_diff = abs(stock_steering_angle_deg - zss_steering_angle_deg)
if angle_diff > ANGLE_DIFF_THRESHOLD:
print(f"ZSS: Diff too large ({angle_diff}), fallback to stock. ")
# if self._is_alka_active(main_on) or cruise_active:
if cruise_active:
self._threshold_count += 1
return stock_steering_angle_deg
return zss_steering_angle_deg
# def _is_alka_active(self, main_on):
# return self._alka_enabled and main_on != 0
def _compute_offset(self, steering_angle_deg):
if abs(steering_angle_deg) > 1e-3 and abs(self._zss_value) > 1e-3:
self._offset = self._zss_value - steering_angle_deg
print(f"ZSS: offset computed: {self._offset}")
return True
return False

View File

@@ -31,6 +31,9 @@ class CarController(CarControllerBase):
self.eps_timer_soft_disable_alert = False
self.hca_frame_timer_running = 0
self.hca_frame_same_torque = 0
self._dp_vag_a0_sng = bool(self.CP.flags & VolkswagenFlags.A0SnG)
self.dp_vag_pq_steering_patch = 7 if CP.flags & VolkswagenFlags.PQSteeringPatch else 5
self.dp_avoid_eps_lockout = CP.flags & VolkswagenFlags.AVOID_EPS_LOCKOUT
def update(self, CC, CS, now_nanos):
actuators = CC.actuators
@@ -50,7 +53,13 @@ class CarController(CarControllerBase):
# of HCA disabled; this is done whenever output happens to be zero.
if CC.latActive:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
if self.dp_avoid_eps_lockout:
#根據速度縮放new_torque扭力上限
torque_scale = np.interp(CS.out.vEgo, [0.4, 3.5, 4.0], [0.8, 0.95, 1.0])
scaled_steer_max = self.CCP.STEER_MAX * torque_scale
new_torque = int(round(actuators.torque * scaled_steer_max))
else:
new_torque = int(round(actuators.torque * self.CCP.STEER_MAX))
apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP)
self.hca_frame_timer_running += self.CCP.STEER_STEP
if self.apply_torque_last == apply_torque:
@@ -70,7 +79,7 @@ class CarController(CarControllerBase):
self.eps_timer_soft_disable_alert = self.hca_frame_timer_running > self.CCP.STEER_TIME_ALERT / DT_CTRL
self.apply_torque_last = apply_torque
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled))
can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled, self.dp_vag_pq_steering_patch))
if self.CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT:
# Pacify VW Emergency Assist driver inactivity detection by changing its view of driver steering input torque
@@ -119,11 +128,22 @@ class CarController(CarControllerBase):
lead_distance, hud_control.leadDistanceBars))
# **** Stock ACC Button Controls **************************************** #
if self._dp_vag_a0_sng:
if self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last:
standing_resume_spam = CS.out.standstill
spam_window = self.frame % 50 < 15
gra_send_ready = self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last
if gra_send_ready and (CC.cruiseControl.cancel or CC.cruiseControl.resume):
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=CC.cruiseControl.cancel, resume=CC.cruiseControl.resume))
send_cancel = CC.cruiseControl.cancel
send_resume = CC.cruiseControl.resume or (standing_resume_spam and spam_window)
if send_cancel or send_resume:
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=send_cancel, resume=send_resume))
else:
gra_send_ready = self.CP.pcmCruise and CS.gra_stock_values["COUNTER"] != self.gra_acc_counter_last
if gra_send_ready and (CC.cruiseControl.cancel or CC.cruiseControl.resume):
can_sends.append(self.CCS.create_acc_buttons_control(self.packer_pt, self.CAN.ext, CS.gra_stock_values,
cancel=CC.cruiseControl.cancel, resume=CC.cruiseControl.resume))
new_actuators = actuators.as_builder()
new_actuators.torque = self.apply_torque_last / self.CCP.STEER_MAX

View File

@@ -35,7 +35,7 @@ class CarInterface(CarInterfaceBase):
# It is documented in a four-part blog series:
# https://blog.willemmelching.nl/carhacking/2022/01/02/vw-part1/
# Panda ALLOW_DEBUG firmware required.
ret.dashcamOnly = True
# ret.dashcamOnly = True
elif ret.flags & VolkswagenFlags.MLB:
# Set global MLB parameters
@@ -106,4 +106,13 @@ class CarInterface(CarInterfaceBase):
safety_configs.insert(0, get_safety_config(structs.CarParams.SafetyModel.noOutput))
ret.safetyConfigs = safety_configs
if dp_params & structs.DPFlags.VagA0SnG:
ret.flags |= VolkswagenFlags.A0SnG.value
if ret.flags & VolkswagenFlags.PQ and dp_params & structs.DPFlags.VAGPQSteeringPatch:
ret.flags |= VolkswagenFlags.PQSteeringPatch.value
if dp_params & structs.DPFlags.VagAvoidEPSLockout:
ret.flags |= VolkswagenFlags.AVOID_EPS_LOCKOUT.value
return ret

View File

@@ -2,7 +2,7 @@ from opendbc.car.volkswagen.mqbcan import (volkswagen_mqb_meb_checksum, xor_chec
create_lka_hud_control as mqb_create_lka_hud_control)
# TODO: Parameterize the hca control type (5 vs 7) and consolidate with MQB (and PQ?)
def create_steering_control(packer, bus, apply_steer, lkas_enabled):
def create_steering_control(packer, bus, apply_steer, lkas_enabled, dp_vag_pq_steering_patch):
values = {
"HCA_01_Status_HCA": 7 if lkas_enabled else 3,
"HCA_01_LM_Offset": abs(apply_steer),

View File

@@ -1,7 +1,7 @@
from opendbc.car.crc import CRC8H2F
def create_steering_control(packer, bus, apply_torque, lkas_enabled):
def create_steering_control(packer, bus, apply_torque, lkas_enabled, dp_vag_pq_steering_patch):
values = {
"HCA_01_Status_HCA": 5 if lkas_enabled else 3,
"HCA_01_LM_Offset": abs(apply_torque),

View File

@@ -1,8 +1,8 @@
def create_steering_control(packer, bus, apply_torque, lkas_enabled):
def create_steering_control(packer, bus, apply_torque, lkas_enabled, dp_vag_pq_steering_patch = 5):
values = {
"LM_Offset": abs(apply_torque),
"LM_OffSign": 1 if apply_torque < 0 else 0,
"HCA_Status": 5 if (lkas_enabled and apply_torque != 0) else 3,
"HCA_Status": dp_vag_pq_steering_patch if (lkas_enabled and apply_torque != 0) else 3,
"Vib_Freq": 16,
}

View File

@@ -190,6 +190,9 @@ class VolkswagenFlags(IntFlag):
PQ = 2
MLB = 8
A0SnG = 2 ** 10
PQSteeringPatch = 2 ** 11
AVOID_EPS_LOCKOUT = 2 ** 12
@dataclass
class VolkswagenMLBPlatformConfig(PlatformConfig):

View File

@@ -0,0 +1,8 @@
BO_ 767 SDSU: 8 XXX
SG_ FD_BUTTON : 7|1@0+ (1,0) [0|1] "" XXX
SG_ STATE : 23|4@0+ (1,0) [0|15] "" XXX
CM_ "BO_ SDSU: The sDSU is a modified DSU for use in TSS-P Toyotas. Learn more: https://github.com/RetroPilot/ocelot/tree/main/firmware/smart_dsu";
CM_ SG_ 767 FD_BUTTON "The follow distance button signal as forwarded by the sdsu";
VAL_ 767 STATE 7 "STATE_AEB_CTRL" 6 "FAULT_INVALID" 5 "FAULT_TIMEOUT" 4 "FAULT_STARTUP" 3 "FAULT_SCE" 2 "FAULT_SEND" 1 "FAULT_BAD_CHECKSUM" 0 "NO_FAULT" ;

View File

@@ -0,0 +1,4 @@
BO_ 35 SECONDARY_STEER_ANGLE: 8 XXX
SG_ ZORRO_STEER : 7|24@0- (0.004901594652,0) [-500|500] "" XXX
CM_ "BO_ SECONDARY_STEER_ANGLE: ZSS is a high-precision steering angle sensor that can replace the lower resolution sensor in most TSS1 Toyotas. Learn more: https://github.com/commaai/openpilot/wiki/Toyota-Lexus#zorro-steering-sensor-zss";

View File

@@ -72,6 +72,8 @@
static bool subaru_gen2 = false;
static bool subaru_longitudinal = false;
// rick
static bool subaru_impreza_2018 = false;
static uint32_t subaru_get_checksum(const CANPacket_t *msg) {
return (uint8_t)msg->data[0];
@@ -140,6 +142,8 @@ static void subaru_rx_hook(const CANPacket_t *msg) {
static bool subaru_tx_hook(const CANPacket_t *msg) {
const TorqueSteeringLimits SUBARU_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(2047, 50, 70);
const TorqueSteeringLimits SUBARU_GEN2_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(1000, 40, 40);
// rick
const TorqueSteeringLimits SUBARU_IMPREZA_2018_STEERING_LIMITS = SUBARU_STEERING_LIMITS_GENERATOR(3071, 60, 60);
const LongitudinalLimits SUBARU_LONG_LIMITS = {
.min_gas = 808, // appears to be engine braking
@@ -161,7 +165,11 @@ static bool subaru_tx_hook(const CANPacket_t *msg) {
bool steer_req = (msg->data[3] >> 5) & 1U;
const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS : SUBARU_STEERING_LIMITS;
// const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS : SUBARU_STEERING_LIMITS;
// rick
const TorqueSteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS :
(subaru_impreza_2018 ? SUBARU_IMPREZA_2018_STEERING_LIMITS: SUBARU_STEERING_LIMITS);
violation |= steer_torque_cmd_checks(desired_torque, steer_req, limits);
}
@@ -244,6 +252,10 @@ static safety_config subaru_init(uint16_t param) {
subaru_gen2 = GET_FLAG(param, SUBARU_PARAM_GEN2);
// rick
const uint16_t SUBARU_PARAM_IMPREZA_2018 = 8;
subaru_impreza_2018 = GET_FLAG(param, SUBARU_PARAM_IMPREZA_2018);
#ifdef ALLOW_DEBUG
const uint16_t SUBARU_PARAM_LONGITUDINAL = 2;
subaru_longitudinal = GET_FLAG(param, SUBARU_PARAM_LONGITUDINAL);

View File

@@ -65,6 +65,9 @@ static int toyota_dbc_eps_torque_factor = 100; // conversion factor for STEER_
// dp - use DSU_CRUISE (0x365) for ACC main instead of PCM_CRUISE_2 (0x1D3)
static bool toyota_unsupported_dsu = false;
// lock ctrl
static bool toyota_lock_ctrl = false;
static uint32_t toyota_compute_checksum(const CANPacket_t *msg) {
int len = GET_LEN(msg);
uint8_t checksum = (uint8_t)(msg->addr) + (uint8_t)((unsigned int)(msg->addr) >> 8U) + (uint8_t)(len);
@@ -335,7 +338,8 @@ static bool toyota_tx_hook(const CANPacket_t *msg) {
}
// UDS: Only tester present ("\x0F\x02\x3E\x00\x00\x00\x00\x00") allowed on diagnostics address
if (msg->addr == 0x750U) {
// dp - lock_ctrl bypasses this restriction to allow any UDS message
if ((msg->addr == 0x750U) && !toyota_lock_ctrl) {
// this address is sub-addressed. only allow tester present to radar (0xF)
bool invalid_uds_msg = (GET_BYTES(msg, 0, 4) != 0x003E020FU) || (GET_BYTES(msg, 4, 4) != 0x0U);
if (invalid_uds_msg) {
@@ -382,11 +386,16 @@ static safety_config toyota_init(uint16_t param) {
const uint32_t TOYOTA_PARAM_UNSUPPORTED_DSU = 16UL << TOYOTA_PARAM_OFFSET;
toyota_unsupported_dsu = GET_FLAG(param, TOYOTA_PARAM_UNSUPPORTED_DSU);
// lock ctrl
const uint32_t TOYOTA_PARAM_LOCK_CTRL = 32UL << TOYOTA_PARAM_OFFSET;
toyota_lock_ctrl = GET_FLAG(param, TOYOTA_PARAM_LOCK_CTRL);
toyota_alt_brake = GET_FLAG(param, TOYOTA_PARAM_ALT_BRAKE);
toyota_stock_longitudinal = GET_FLAG(param, TOYOTA_PARAM_STOCK_LONGITUDINAL);
toyota_lta = GET_FLAG(param, TOYOTA_PARAM_LTA);
toyota_dbc_eps_torque_factor = param & TOYOTA_EPS_FACTOR;
// upstream TX selection + dp features via tx_ext (lock_ctrl, long_filter)
safety_config ret;
if (toyota_secoc) {
if (toyota_stock_longitudinal) {
@@ -433,6 +442,19 @@ static safety_config toyota_init(uint16_t param) {
return ret;
}
// dp - tx_ext hook for dragonpilot-specific TX messages (lock_ctrl)
static TxExtResult toyota_tx_ext_hook(const CANPacket_t *msg) {
TxExtResult result = {.allowed = false, .check_relay = false};
int len = GET_LEN(msg);
// dp - lock_ctrl: allow any UDS message on 0x750 (stock long already has tester present only)
if ((msg->addr == 0x750U) && toyota_lock_ctrl && (msg->bus == 0U) && (len == 8)) {
result.allowed = true;
}
return result;
}
// dp - rx_ext hook for optional messages (placeholder)
static void toyota_rx_ext_hook(const CANPacket_t *msg) {
if (alka_allowed && ((alternative_experience & ALT_EXP_ALKA) != 0)) {
@@ -457,6 +479,7 @@ const safety_hooks toyota_hooks = {
.rx = toyota_rx_hook,
.rx_ext = toyota_rx_ext_hook,
.tx = toyota_tx_hook,
.tx_ext = toyota_tx_ext_hook,
.get_checksum = toyota_get_checksum,
.compute_checksum = toyota_compute_checksum,
.get_quality_flag_valid = toyota_get_quality_flag_valid,

View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""
Tests for tx_ext hook functionality.
tx_ext allows additional TX messages beyond the base whitelist, with per-message
check_relay control. This is used for dragonpilot-specific features:
- lock_ctrl: allows any UDS message on 0x750 (not just tester present)
"""
import unittest
from opendbc.car.toyota.values import ToyotaSafetyFlags
from opendbc.car.structs import CarParams
from opendbc.safety.tests.libsafety import libsafety_py
from opendbc.safety.tests.common import CANPackerSafety
class TestTxExtBase(unittest.TestCase):
"""Base test class for tx_ext functionality."""
TX_MSGS = [] # Required by test_tx_hook_on_wrong_safety_mode
safety: libsafety_py.LibSafety
packer: CANPackerSafety
@classmethod
def setUpClass(cls):
if cls.__name__ == "TestTxExtBase":
raise unittest.SkipTest("Base class")
def _tx(self, msg):
return self.safety.safety_tx_hook(msg)
def _rx(self, msg):
return self.safety.safety_rx_hook(msg)
class TestTxExtToyotaLockCtrl(TestTxExtBase):
"""Test lock_ctrl feature for Toyota.
lock_ctrl allows any UDS message on 0x750, not just tester present.
This is used for door lock control via diagnostic messages.
"""
EPS_SCALE = 73
def setUp(self):
self.packer = CANPackerSafety("toyota_nodsu_pt_generated")
self.safety = libsafety_py.libsafety
def _uds_msg(self, data: bytes):
"""Create a UDS message on 0x750."""
return libsafety_py.make_CANPacket(0x750, 0, data)
def _tester_present_msg(self):
"""Valid tester present message (always allowed without lock_ctrl)."""
return self._uds_msg(b"\x0F\x02\x3E\x00\x00\x00\x00\x00")
def _lock_ctrl_msg(self):
"""Example lock control message (only allowed with lock_ctrl)."""
return self._uds_msg(b"\x0F\x03\x30\x01\x00\x00\x00\x00")
def _random_uds_msg(self):
"""Random UDS message (only allowed with lock_ctrl)."""
return self._uds_msg(b"\x0F\x05\xAA\xBB\xCC\xDD\x00\x00")
def test_lock_ctrl_disabled_only_tester_present(self):
"""Without lock_ctrl, only tester present is allowed on 0x750."""
# OP long mode (0x750 in whitelist), no lock_ctrl
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, self.EPS_SCALE)
self.safety.init_tests()
# Tester present should be allowed
self.assertTrue(self._tx(self._tester_present_msg()))
# Other UDS messages should be blocked
self.assertFalse(self._tx(self._lock_ctrl_msg()))
self.assertFalse(self._tx(self._random_uds_msg()))
def test_lock_ctrl_enabled_allows_any_uds(self):
"""With lock_ctrl, any UDS message is allowed on 0x750."""
# OP long mode with lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# All UDS messages should be allowed
self.assertTrue(self._tx(self._tester_present_msg()))
self.assertTrue(self._tx(self._lock_ctrl_msg()))
self.assertTrue(self._tx(self._random_uds_msg()))
def test_lock_ctrl_stock_long_allows_uds(self):
"""With lock_ctrl + stock_long, 0x750 is allowed via tx_ext."""
# Stock long mode (0x750 NOT in base whitelist) with lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# All UDS messages should be allowed via tx_ext
self.assertTrue(self._tx(self._tester_present_msg()))
self.assertTrue(self._tx(self._lock_ctrl_msg()))
self.assertTrue(self._tx(self._random_uds_msg()))
def test_lock_ctrl_stock_long_without_flag_blocks_uds(self):
"""Without lock_ctrl, stock_long blocks all 0x750 messages."""
# Stock long mode without lock_ctrl
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
# 0x750 is not in stock long whitelist, should be blocked
self.assertFalse(self._tx(self._tester_present_msg()))
self.assertFalse(self._tx(self._lock_ctrl_msg()))
class TestTxExtRelayCheck(TestTxExtBase):
"""Test check_relay behavior for tx_ext messages.
Messages allowed via tx_ext can have check_relay=true or false.
- lock_ctrl 0x750: check_relay=false (no relay check)
"""
EPS_SCALE = 73
def setUp(self):
self.packer = CANPackerSafety("toyota_nodsu_pt_generated")
self.safety = libsafety_py.libsafety
def _uds_msg_raw(self):
"""Create raw UDS message for relay testing."""
return libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
def _wait_for_relay_timeout(self):
"""Wait for relay transition timeout (1 second = 100 ticks at 100Hz)."""
for _ in range(200):
self.safety.safety_tick_current_safety_config()
def test_lock_ctrl_no_relay_malfunction(self):
"""lock_ctrl 0x750 should not trigger relay malfunction when received."""
param = self.EPS_SCALE | ToyotaSafetyFlags.STOCK_LONGITUDINAL | ToyotaSafetyFlags.LOCK_CTRL
self.safety.set_safety_hooks(CarParams.SafetyModel.toyota, param)
self.safety.init_tests()
self._wait_for_relay_timeout()
# Receiving 0x750 should NOT trigger relay malfunction
# (check_relay=false for lock_ctrl)
self.assertFalse(self.safety.get_relay_malfunction())
self._rx(self._uds_msg_raw())
self.assertFalse(self.safety.get_relay_malfunction())
class TestTxExtDisabledBrands(unittest.TestCase):
"""Test that tx_ext returns not allowed for brands without tx_ext hook."""
TX_MSGS = [] # Required by test_tx_hook_on_wrong_safety_mode
def test_hyundai_no_tx_ext(self):
"""Hyundai should not have any tx_ext allowances."""
safety = libsafety_py.libsafety
safety.set_safety_hooks(CarParams.SafetyModel.hyundai, 0)
safety.init_tests()
# Random message should not be allowed via tx_ext
msg = libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
self.assertFalse(safety.safety_tx_hook(msg))
def test_honda_no_tx_ext(self):
"""Honda should not have any tx_ext allowances."""
safety = libsafety_py.libsafety
safety.set_safety_hooks(CarParams.SafetyModel.hondaNidec, 0)
safety.init_tests()
# Random message should not be allowed via tx_ext
msg = libsafety_py.make_CANPacket(0x750, 0, b"\x00" * 8)
self.assertFalse(safety.safety_tx_hook(msg))
if __name__ == "__main__":
unittest.main()

View File

@@ -105,6 +105,24 @@ class Car:
if self.params.get_bool("dp_lat_alka"):
dp_params |= structs.DPFlags.LatALKA
if self.params.get_bool("dp_toyota_door_auto_lock_unlock"):
dp_params |= structs.DPFlags.ToyotaLockCtrl
if self.params.get_bool("dp_toyota_tss1_sng"):
dp_params |= structs.DPFlags.ToyotaTSS1SnG
if self.params.get_bool("dp_toyota_stock_lon"):
dp_params |= structs.DPFlags.ToyotaStockLon
if self.params.get_bool("dp_vag_a0_sng"):
dp_params |= structs.DPFlags.VagA0SnG
if self.params.get_bool("dp_vag_pq_steering_patch"):
dp_params |= structs.DPFlags.VAGPQSteeringPatch
if self.params.get_bool("dp_vag_avoid_eps_lockout"):
dp_params |= structs.DPFlags.VagAvoidEPSLockout
dp_fingerprint = str(self.params.get("dp_dev_model_selected") or "")
self.CI = get_car(*self.can_callbacks, obd_callback(self.params), alpha_long_allowed, is_release, num_pandas, dp_params, cached_params, dp_fingerprint=dp_fingerprint)
self.RI = interfaces[self.CI.CP.carFingerprint].RadarInterface(self.CI.CP)