mirror of https://github.com/commaai/openpilot.git
74 lines
2.5 KiB
Python
Executable File
74 lines
2.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import math
|
|
|
|
from cereal import messaging, car
|
|
from openpilot.common.numpy_fast import clip
|
|
from openpilot.common.realtime import DT_CTRL, Ratekeeper
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
|
|
|
|
MAX_LAT_ACCEL = 2.5
|
|
|
|
|
|
def joystickd_thread():
|
|
params = Params()
|
|
cloudlog.info("joystickd is waiting for CarParams")
|
|
CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
|
|
VM = VehicleModel(CP)
|
|
|
|
sm = messaging.SubMaster(['carState', 'onroadEvents', 'liveParameters', 'selfdriveState', 'testJoystick'], frequency=1. / DT_CTRL)
|
|
pm = messaging.PubMaster(['carControl', 'controlsState'])
|
|
|
|
rk = Ratekeeper(100, print_delay_threshold=None)
|
|
while 1:
|
|
sm.update(0)
|
|
|
|
cc_msg = messaging.new_message('carControl')
|
|
cc_msg.valid = True
|
|
CC = cc_msg.carControl
|
|
CC.enabled = sm['selfdriveState'].enabled
|
|
CC.latActive = sm['selfdriveState'].active and not sm['carState'].steerFaultTemporary and not sm['carState'].steerFaultPermanent
|
|
CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in sm['onroadEvents']) and CP.openpilotLongitudinalControl
|
|
CC.cruiseControl.cancel = sm['carState'].cruiseState.enabled and (not CC.enabled or not CP.pcmCruise)
|
|
CC.hudControl.leadDistanceBars = 2
|
|
|
|
actuators = CC.actuators
|
|
|
|
# reset joystick if it hasn't been received in a while
|
|
should_reset_joystick = sm.recv_frame['testJoystick'] == 0 or (sm.frame - sm.recv_frame['testJoystick'])*DT_CTRL > 0.2
|
|
|
|
if not should_reset_joystick:
|
|
joystick_axes = sm['testJoystick'].axes
|
|
else:
|
|
joystick_axes = [0.0, 0.0]
|
|
|
|
if CC.longActive:
|
|
actuators.accel = 4.0 * clip(joystick_axes[0], -1, 1)
|
|
|
|
if CC.latActive:
|
|
max_curvature = MAX_LAT_ACCEL / max(sm['carState'].vEgo ** 2, 5)
|
|
max_angle = math.degrees(VM.get_steer_from_curvature(max_curvature, sm['carState'].vEgo, sm['liveParameters'].roll))
|
|
|
|
actuators.steer = clip(joystick_axes[1], -1, 1)
|
|
actuators.steeringAngleDeg, actuators.curvature = actuators.steer * max_angle, actuators.steer * -max_curvature
|
|
|
|
pm.send('carControl', cc_msg)
|
|
|
|
cs_msg = messaging.new_message('controlsState')
|
|
cs_msg.valid = True
|
|
controlsState = cs_msg.controlsState
|
|
controlsState.lateralControlState.init('debugState')
|
|
pm.send('controlsState', cs_msg)
|
|
|
|
rk.keep_time()
|
|
|
|
|
|
def main():
|
|
joystickd_thread()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|