Controls: Support for Torque Lateral Control v0 Tune (#1693)

* init

* no

* more

* tree it

* final

* comment

* only with enforce torque

* only with enforce torque

* missed

* no

* lint

* Apply suggestion from @sunnyhaibin

* sunnylink metadata sync

* Apply suggestion from @sunnyhaibin

---------

Co-authored-by: nayan <nayan8teen@gmail.com>
This commit is contained in:
Jason Wen
2026-02-17 13:17:03 -05:00
committed by GitHub
parent f81e7245fe
commit e8f65bc652
9 changed files with 357 additions and 2 deletions

View File

@@ -268,6 +268,7 @@ inline static std::unordered_map<std::string, ParamKeyAttributes> keys = {
{"EnforceTorqueControl", {PERSISTENT | BACKUP, BOOL}},
{"LiveTorqueParamsToggle", {PERSISTENT | BACKUP , BOOL}},
{"LiveTorqueParamsRelaxedToggle", {PERSISTENT | BACKUP , BOOL}},
{"TorqueControlTune", {PERSISTENT | BACKUP, FLOAT}},
{"TorqueParamsOverrideEnabled", {PERSISTENT | BACKUP, BOOL, "0"}},
{"TorqueParamsOverrideFriction", {PERSISTENT | BACKUP, FLOAT, "0.1"}},
{"TorqueParamsOverrideLatAccelFactor", {PERSISTENT | BACKUP, FLOAT, "2.5"}},

View File

@@ -64,6 +64,8 @@ class Controls(ControlsExt):
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CP_SP, self.CI, DT_CTRL)
self.LaC = ControlsExt.initialize_lateral_control(self, self.LaC, self.CI, DT_CTRL)
def update(self):
self.sm.update(15)
if self.sm.updated["liveCalibration"]:

View File

@@ -4,15 +4,24 @@ Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import json
import math
import os
from collections.abc import Callable
import pyray as rl
from openpilot.common.basedir import BASEDIR
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.multilang import tr
from openpilot.system.ui.sunnypilot.widgets.list_view import toggle_item_sp, option_item_sp
from openpilot.system.ui.sunnypilot.lib.utils import NoElideButtonAction
from openpilot.system.ui.sunnypilot.widgets.list_view import ListItemSP, toggle_item_sp, option_item_sp
from openpilot.system.ui.sunnypilot.widgets.tree_dialog import TreeOptionDialog, TreeFolder, TreeNode
from openpilot.system.ui.widgets import Widget, DialogResult
from openpilot.system.ui.widgets.network import NavButton
from openpilot.system.ui.widgets.scroller_tici import Scroller
from openpilot.system.ui.widgets import Widget
TORQUE_VERSIONS_PATH = os.path.join(BASEDIR, "sunnypilot", "selfdrive", "controls", "lib", "latcontrol_torque_versions.json")
class TorqueSettingsLayout(Widget):
@@ -20,10 +29,23 @@ class TorqueSettingsLayout(Widget):
super().__init__()
self._back_button = NavButton(tr("Back"))
self._back_button.set_click_callback(back_btn_callback)
self._torque_version_dialog: TreeOptionDialog | None = None
self.cached_torque_versions = {}
self._load_versions()
items = self._initialize_items()
self._scroller = Scroller(items, line_separator=True, spacing=0)
def _load_versions(self):
with open(TORQUE_VERSIONS_PATH) as f:
self.cached_torque_versions = json.load(f)
def _initialize_items(self):
self._torque_control_versions = ListItemSP(
title=tr("Torque Control Tune Version"),
description="Select the version of Torque Control Tune to use.",
action_item=NoElideButtonAction(tr("SELECT")),
callback=self._show_torque_version_dialog,
)
self._self_tune_toggle = toggle_item_sp(
param="LiveTorqueParamsToggle",
title=lambda: tr("Self-Tune"),
@@ -73,6 +95,7 @@ class TorqueSettingsLayout(Widget):
)
items = [
self._torque_control_versions,
self._self_tune_toggle,
self._relaxed_tune_toggle,
self._custom_tune_toggle,
@@ -103,6 +126,7 @@ class TorqueSettingsLayout(Widget):
title_text = tr("Real-Time & Offline") if ui_state.params.get("TorqueParamsOverrideEnabled") else tr("Offline Only")
self._torque_lat_accel_factor.set_title(lambda: tr("Lateral Acceleration Factor") + " (" + title_text + ")")
self._torque_friction.set_title(lambda: tr("Friction") + " (" + title_text + ")")
self._torque_control_versions.action_item.set_value(self._get_current_torque_version_label())
def _render(self, rect):
self._back_button.set_position(self._rect.x, self._rect.y + 20)
@@ -113,3 +137,54 @@ class TorqueSettingsLayout(Widget):
def show_event(self):
self._scroller.show_event()
def _get_current_torque_version_label(self):
current_val_bytes = ui_state.params.get("TorqueControlTune")
if current_val_bytes is None:
return tr("Default")
try:
current_val = float(current_val_bytes)
for label, info in self.cached_torque_versions.items():
if math.isclose(float(info["version"]), current_val, rel_tol=1e-5):
return label
except (ValueError, KeyError):
pass
return tr("Default")
def _show_torque_version_dialog(self):
options_map = {}
for label, info in self.cached_torque_versions.items():
try:
options_map[label] = float(info["version"])
except (ValueError, KeyError):
pass
# Sort options by label in descending order
sorted_labels = sorted(options_map.keys(), key=lambda k: options_map[k], reverse=True)
nodes = [TreeNode(tr("Default"))]
for label in sorted_labels:
nodes.append(TreeNode(label))
folders = [TreeFolder("", nodes)]
current_label = self._get_current_torque_version_label()
def handle_selection(result: int):
if result == DialogResult.CONFIRM and self._torque_version_dialog:
selected_ref = self._torque_version_dialog.selection_ref
if selected_ref == tr("Default"):
ui_state.params.remove("TorqueControlTune")
elif selected_ref in options_map:
ui_state.params.put("TorqueControlTune", options_map[selected_ref])
self._torque_version_dialog = None
self._torque_version_dialog = TreeOptionDialog(
tr("Select Torque Control Tune Version"),
folders,
current_ref=current_label,
option_font_weight=FontWeight.UNIFONT,
)
gui_app.set_modal_overlay(self._torque_version_dialog, callback=handle_selection)

View File

@@ -16,6 +16,7 @@ from openpilot.sunnypilot import PARAMS_UPDATE_PERIOD
from openpilot.sunnypilot.livedelay.helpers import get_lat_delay
from openpilot.sunnypilot.modeld.modeld_base import ModelStateBase
from openpilot.sunnypilot.selfdrive.controls.lib.blinker_pause_lateral import BlinkerPauseLateral
from openpilot.sunnypilot.selfdrive.controls.lib.latcontrol_torque_v0 import LatControlTorque as LatControlTorqueV0
class ControlsExt(ModelStateBase):
@@ -33,6 +34,17 @@ class ControlsExt(ModelStateBase):
self.sm_services_ext = ['radarState', 'selfdriveStateSP']
self.pm_services_ext = ['carControlSP']
def initialize_lateral_control(self, lac, CI, dt):
enforce_torque_control = self.params.get_bool("EnforceTorqueControl")
torque_versions = self.params.get("TorqueControlTune")
if not enforce_torque_control:
return lac
if torque_versions == 0.0: # v0
return LatControlTorqueV0(self.CP, self.CP_SP, CI, dt)
else:
return lac
def get_params_sp(self, sm: messaging.SubMaster) -> None:
if time.monotonic() - self._param_update_time > PARAMS_UPDATE_PERIOD:
self.blinker_pause_lateral.get_params()

View File

@@ -0,0 +1,128 @@
import math
import numpy as np
from collections import deque
from cereal import log
from opendbc.car.lateral import get_friction
from openpilot.common.constants import ACCELERATION_DUE_TO_GRAVITY
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.common.pid import PIDController
from openpilot.sunnypilot.selfdrive.controls.lib.latcontrol_torque_ext import LatControlTorqueExt
# At higher speeds (25+mph) we can assume:
# Lateral acceleration achieved by a specific car correlates to
# torque applied to the steering rack. It does not correlate to
# wheel slip, or to speed.
# This controller applies torque to achieve desired lateral
# accelerations. To compensate for the low speed effects the
# proportional gain is increased at low speeds by the PID controller.
# Additionally, there is friction in the steering wheel that needs
# to be overcome to move it at all, this is compensated for too.
KP = 1.0
KI = 0.3
KD = 0.0
INTERP_SPEEDS = [1, 1.5, 2.0, 3.0, 5, 7.5, 10, 15, 30]
KP_INTERP = [250, 120, 65, 30, 11.5, 5.5, 3.5, 2.0, KP]
LP_FILTER_CUTOFF_HZ = 1.2
LAT_ACCEL_REQUEST_BUFFER_SECONDS = 1.0
FRICTION_THRESHOLD = 0.3
VERSION = 0
class LatControlTorque(LatControl):
def __init__(self, CP, CP_SP, CI, dt):
super().__init__(CP, CP_SP, CI, dt)
self.torque_params = CP.lateralTuning.torque.as_builder()
self.torque_from_lateral_accel = CI.torque_from_lateral_accel()
self.lateral_accel_from_torque = CI.lateral_accel_from_torque()
self.pid = PIDController([INTERP_SPEEDS, KP_INTERP], KI, KD, rate=1/self.dt)
self.update_limits()
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
self.lat_accel_request_buffer_len = int(LAT_ACCEL_REQUEST_BUFFER_SECONDS / self.dt)
self.lat_accel_request_buffer = deque([0.] * self.lat_accel_request_buffer_len , maxlen=self.lat_accel_request_buffer_len)
self.previous_measurement = 0.0
self.measurement_rate_filter = FirstOrderFilter(0.0, 1 / (2 * np.pi * LP_FILTER_CUTOFF_HZ), self.dt)
self.extension = LatControlTorqueExt(self, CP, CP_SP, CI)
def update_live_torque_params(self, latAccelFactor, latAccelOffset, friction):
self.torque_params.latAccelFactor = latAccelFactor
self.torque_params.latAccelOffset = latAccelOffset
self.torque_params.friction = friction
self.update_limits()
def update_limits(self):
self.pid.set_limits(self.lateral_accel_from_torque(self.steer_max, self.torque_params),
self.lateral_accel_from_torque(-self.steer_max, self.torque_params))
def update(self, active, CS, VM, params, steer_limited_by_safety, desired_curvature, calibrated_pose, curvature_limited, lat_delay):
# Override torque params from extension
if self.extension.update_override_torque_params(self.torque_params):
self.update_limits()
pid_log = log.ControlsState.LateralTorqueState.new_message()
pid_log.version = VERSION
if not active:
output_torque = 0.0
pid_log.active = False
else:
measured_curvature = -VM.calc_curvature(math.radians(CS.steeringAngleDeg - params.angleOffsetDeg), CS.vEgo, params.roll)
roll_compensation = params.roll * ACCELERATION_DUE_TO_GRAVITY
curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0))
lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2
delay_frames = int(np.clip(lat_delay / self.dt, 1, self.lat_accel_request_buffer_len))
expected_lateral_accel = self.lat_accel_request_buffer[-delay_frames]
# TODO factor out lateral jerk from error to later replace it with delay independent alternative
future_desired_lateral_accel = desired_curvature * CS.vEgo ** 2
self.lat_accel_request_buffer.append(future_desired_lateral_accel)
gravity_adjusted_future_lateral_accel = future_desired_lateral_accel - roll_compensation
desired_lateral_jerk = (future_desired_lateral_accel - expected_lateral_accel) / lat_delay
measurement = measured_curvature * CS.vEgo ** 2
measurement_rate = self.measurement_rate_filter.update((measurement - self.previous_measurement) / self.dt)
self.previous_measurement = measurement
setpoint = lat_delay * desired_lateral_jerk + expected_lateral_accel
error = setpoint - measurement
# do error correction in lateral acceleration space, convert at end to handle non-linear torque responses correctly
pid_log.error = float(error)
ff = gravity_adjusted_future_lateral_accel
# latAccelOffset corrects roll compensation bias from device roll misalignment relative to car roll
ff -= self.torque_params.latAccelOffset
# TODO jerk is weighted by lat_delay for legacy reasons, but should be made independent of it
ff += get_friction(error, lateral_accel_deadzone, FRICTION_THRESHOLD, self.torque_params)
freeze_integrator = steer_limited_by_safety or CS.steeringPressed or CS.vEgo < 5
output_lataccel = self.pid.update(pid_log.error,
-measurement_rate,
feedforward=ff,
speed=CS.vEgo,
freeze_integrator=freeze_integrator)
output_torque = self.torque_from_lateral_accel(output_lataccel, self.torque_params)
# Lateral acceleration torque controller extension updates
# Overrides pid_log.error and output_torque
pid_log, output_torque = self.extension.update(CS, VM, self.pid, params, ff, pid_log, setpoint, measurement, calibrated_pose, roll_compensation,
future_desired_lateral_accel, measurement, lateral_accel_deadzone, gravity_adjusted_future_lateral_accel,
desired_curvature, measured_curvature, steer_limited_by_safety, output_torque)
pid_log.active = True
pid_log.p = float(self.pid.p)
pid_log.i = float(self.pid.i)
pid_log.d = float(self.pid.d)
pid_log.f = float(self.pid.f)
pid_log.output = float(-output_torque) # TODO: log lat accel?
pid_log.actualLateralAccel = float(measurement)
pid_log.desiredLateralAccel = float(setpoint)
pid_log.desiredLateralJerk = float(desired_lateral_jerk)
pid_log.saturated = bool(self._check_saturation(self.steer_max - abs(output_torque) < 1e-3, CS, steer_limited_by_safety, curvature_limited))
# TODO left is positive in this convention
return -output_torque, 0.0, pid_log

View File

@@ -0,0 +1,8 @@
{
"v1.0": {
"version": "1.0"
},
"v0.0": {
"version": "0.0"
}
}

View File

@@ -1247,6 +1247,24 @@
"title": "[TIZI/TICI only] Steering Arc",
"description": "Display steering arc on the driving screen when lateral control is enabled."
},
"TorqueControlTune": {
"title": "Torque Control Tune Version",
"description": "Select the version of Torque Control Tune to use.",
"options": [
{
"value": "",
"label": "Default"
},
{
"value": 1.0,
"label": "v1.0"
},
{
"value": 0.0,
"label": "v0.0"
}
]
},
"TorqueParamsOverrideEnabled": {
"title": "Manual Real-Time Tuning",
"description": ""

View File

@@ -200,3 +200,85 @@ def test_known_params_metadata():
assert acc_long["min"] == 1
assert acc_long["max"] == 10
assert acc_long["step"] == 1
def test_torque_control_tune_versions_in_sync():
"""
Test that TorqueControlTune options in params_metadata.json match versions in latcontrol_torque_versions.json.
Why:
The TorqueControlTune dropdown in the UI should always reflect the available torque tune versions.
If versions are added/removed from latcontrol_torque_versions.json, the metadata must be updated accordingly.
Expected:
- TorqueControlTune should have a 'Default' option with empty string value
- All versions from latcontrol_torque_versions.json should be present in the options
- The version values and labels should match between both files
"""
from openpilot.common.basedir import BASEDIR
versions_json_path = os.path.join(BASEDIR, "sunnypilot", "selfdrive", "controls", "lib", "latcontrol_torque_versions.json")
sync_script_path = "python3 sunnypilot/sunnylink/tools/sync_torque_versions.py"
# Load both files
with open(METADATA_PATH) as f:
metadata = json.load(f)
with open(versions_json_path) as f:
versions = json.load(f)
# Get TorqueControlTune metadata
torque_tune = metadata.get("TorqueControlTune")
if torque_tune is None:
pytest.fail(f"TorqueControlTune not found in params_metadata.json. Please run '{sync_script_path}' to sync.")
if "options" not in torque_tune:
pytest.fail(f"TorqueControlTune must have options. Please run '{sync_script_path}' to sync.")
options = torque_tune["options"]
if not isinstance(options, list):
pytest.fail(f"TorqueControlTune options must be a list. Please run '{sync_script_path}' to sync.")
if len(options) == 0:
pytest.fail(f"TorqueControlTune must have at least one option. Please run '{sync_script_path}' to sync.")
# Check that Default option exists
default_option = next((opt for opt in options if opt.get("value") == ""), None)
if default_option is None:
pytest.fail(f"TorqueControlTune must have a 'Default' option with empty string value. Please run '{sync_script_path}' to sync.")
if default_option.get("label") != "Default":
pytest.fail(f"Default option must have label 'Default'. Please run '{sync_script_path}' to sync.")
# Build expected options from versions.json
expected_version_keys = set(versions.keys())
actual_version_keys = set()
for option in options:
if option.get("value") == "":
continue # Skip the default option
label = option.get("label")
value = option.get("value")
# Check that this option corresponds to a version
if label not in versions:
pytest.fail(f"Option label '{label}' not found in latcontrol_torque_versions.json. Please run '{sync_script_path}' to sync.")
# Check that the value matches the version number
expected_value = float(versions[label]["version"])
if value != expected_value:
pytest.fail(f"Option '{label}' has value {value}, expected {expected_value}. Please run '{sync_script_path}' to sync.")
actual_version_keys.add(label)
# Check that all versions are represented
missing_versions = expected_version_keys - actual_version_keys
if missing_versions:
pytest.fail(f"The following versions are missing from TorqueControlTune options: {missing_versions}. " +
f"Please run '{sync_script_path}' to sync.")
extra_versions = actual_version_keys - expected_version_keys
if extra_versions:
pytest.fail("The following versions in TorqueControlTune options are not in latcontrol_torque_versions.json: " +
f"{extra_versions}. Please run '{sync_script_path}' to sync.")

View File

@@ -8,9 +8,11 @@ See the LICENSE.md file in the root directory for more details.
import json
import os
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
METADATA_PATH = os.path.join(os.path.dirname(__file__), "../params_metadata.json")
TORQUE_VERSIONS_JSON = os.path.join(BASEDIR, "sunnypilot", "selfdrive", "controls", "lib", "latcontrol_torque_versions.json")
def main():
@@ -51,6 +53,33 @@ def main():
print(f"Updated {METADATA_PATH}")
# update torque versions param
update_torque_versions_param()
def update_torque_versions_param():
with open(TORQUE_VERSIONS_JSON) as f:
current_versions = json.load(f)
try:
with open(METADATA_PATH) as f:
params_metadata = json.load(f)
options = [{"value": "", "label": "Default"}]
for version_key, version_data in current_versions.items():
version_value = float(version_data["version"])
options.append({"value": version_value, "label": str(version_key)})
if "TorqueControlTune" in params_metadata:
params_metadata["TorqueControlTune"]["options"] = options
with open(METADATA_PATH, 'w') as f:
json.dump(params_metadata, f, indent=2)
f.write('\n')
print(f"Updated TorqueControlTune options in params_metadata.json with {len(options)} options: \n{options}")
except Exception as e:
print(f"Failed to update TorqueControlTune versions in params_metadata.json: {e}")
if __name__ == "__main__":
main()