This commit is contained in:
mmoo758
2026-03-04 20:10:50 +08:00
parent e067a53e5e
commit ae12334773
2 changed files with 51 additions and 18 deletions

View File

@@ -3,7 +3,7 @@ import os
import argparse
import threading
import numpy as np
from inputs import UnpluggedError, get_gamepad
from inputs import UnpluggedError, get_gamepad, devices
from cereal import messaging
from openpilot.common.params import Params
@@ -42,21 +42,33 @@ class Keyboard:
class Joystick:
def __init__(self):
# This class supports a PlayStation 5 DualSense controller on the comma 3X
# TODO: find a way to get this from API or detect gamepad/PC, perhaps "inputs" doesn't support it
self.cancel_button = 'BTN_NORTH' # BTN_NORTH=X/triangle
if HARDWARE.get_device_type() == 'pc':
accel_axis = 'ABS_Z'
steer_axis = 'ABS_RX'
# TODO: once the longcontrol API is finalized, we can replace this with outputting gas/brake and steering
self.flip_map = {'ABS_RZ': accel_axis}
else:
accel_axis = 'ABS_RX'
steer_axis = 'ABS_Z'
self.flip_map = {'ABS_RY': accel_axis}
self.min_axis_value = {accel_axis: 0., steer_axis: 0.}
self.max_axis_value = {accel_axis: 255., steer_axis: 255.}
gamepad_name = ''
if devices.gamepads:
gp0 = devices.gamepads[0]
gamepad_name = getattr(gp0, 'name', '') or str(gp0)
print(f"Detected: {gamepad_name}\n")
if 'Stadia' in gamepad_name:
print("Google Stadia axes mapping")
steer_axis = 'ABS_X'
accel_axis = 'ABS_GAS'
self.flip_map = {'ABS_BRAKE': accel_axis}
elif HARDWARE.get_device_type() == 'pc':
print("PlayStation 5 DualSense (PC) axes mapping")
accel_axis = 'ABS_RZ'
steer_axis = 'ABS_RX'
self.flip_map = {'ABS_Z': accel_axis}
else:
print("PlayStation 5 DualSense axes mapping")
accel_axis = 'ABS_RY'
steer_axis = 'ABS_Z'
self.flip_map = {'ABS_RX': accel_axis}
self.min_axis_value = {accel_axis: -255 if accel_axis in self.flip_map.values() else 0,
steer_axis: 0}
self.max_axis_value = {accel_axis: 255, steer_axis: 255}
self.axes_values = {accel_axis: 0., steer_axis: 0.}
self.axes_order = [accel_axis, steer_axis]
self.cancel = False
@@ -70,7 +82,7 @@ class Joystick:
event = (joystick_event.code, joystick_event.state)
# flip left trigger to negative accel
# flip left trigger (brake) to negative accel
if event[0] in self.flip_map:
event = (self.flip_map[event[0]], -event[1])
@@ -83,7 +95,7 @@ class Joystick:
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]])
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]])
norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.]))
norm = float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.]))
norm = norm if abs(norm) > 0.03 else 0. # center can be noisy, deadzone of 3%
self.axes_values[event[0]] = EXPO * norm ** 3 + (1 - EXPO) * norm # less action near center for fine control
else:
@@ -98,11 +110,13 @@ def send_thread(joystick):
while True:
if rk.frame % 20 == 0:
print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items()))
print('\n' + ', '.join(f'{name}: {round(v, 3)} ' for name, v in joystick.axes_values.items()))
print(joystick.cancel_button, ": ", joystick.cancel)
joystick_msg = messaging.new_message('testJoystick')
joystick_msg.valid = True
joystick_msg.testJoystick.axes = [joystick.axes_values[ax] for ax in joystick.axes_order]
joystick_msg.testJoystick.buttons = [1] if joystick.cancel else []
pm.send('testJoystick', joystick_msg)

View File

@@ -9,6 +9,8 @@ from openpilot.common.realtime import DT_CTRL, Ratekeeper
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from opendbc.car.interfaces import get_interface_attr
LongCtrlState = car.CarControl.Actuators.LongControlState
MAX_LAT_ACCEL = 3.0
@@ -19,6 +21,16 @@ def joystickd_thread():
CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
VM = VehicleModel(CP)
car_angle_max = None
try:
car_params = get_interface_attr('CarControllerParams')
if car_params and CP.carFingerprint in car_params:
angle_limits = car_params[CP.carFingerprint].ANGLE_LIMITS
if hasattr(angle_limits, 'steerAngleMax'):
car_angle_max = angle_limits.steerAngleMax
except Exception:
pass # Fall back to the existing max_angle if anything goes wrong
sm = messaging.SubMaster(['carState', 'onroadEvents', 'liveParameters', 'selfdriveState', 'testJoystick'], frequency=1. / DT_CTRL)
pm = messaging.PubMaster(['carControl', 'controlsState'])
@@ -42,6 +54,8 @@ def joystickd_thread():
if not should_reset_joystick:
joystick_axes = sm['testJoystick'].axes
if hasattr(sm['testJoystick'], 'buttons') and sm['testJoystick'].buttons and sm['testJoystick'].buttons[0] == 1:
CC.cruiseControl.cancel = True
else:
joystick_axes = [0.0, 0.0]
@@ -54,7 +68,12 @@ def joystickd_thread():
max_curvature = MAX_LAT_ACCEL / max(sm['carState'].vEgo ** 2, 5)
max_angle = math.degrees(VM.get_steer_from_curvature(max_curvature, sm['carState'].vEgo, sm['liveParameters'].roll))
actuators.torque = float(np.clip(joystick_axes[1], -1, 1))
if car_angle_max is not None:
max_angle = min(max_angle, car_angle_max)
max_angle = min(max_angle, 390) # keep the PSA limit as a hard cap
actuators.torque = -float(np.clip(joystick_axes[1], -1, 1))
actuators.steeringAngleDeg, actuators.curvature = actuators.torque * max_angle, actuators.torque * -max_curvature
pm.send('carControl', cc_msg)