mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-03-01 23:13:55 +08:00
* init dec * Update sunnypilot/selfdrive/controls/lib/dynamic_experimental_controller.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * Update sunnypilot/selfdrive/controls/lib/dynamic_experimental_controller.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * fix static test * ff * fix static test * unitee testt * Refactor test_dynamic_controller and fix formatting issues Added a new import for STOP_AND_GO_FRAME and corrected a float initialization for v_ego in MockCarState. Also fixed indentation in the test_standstill_detection method for consistency. * Refactor test indentation for dynamic controller tests Adjust indentation and formatting in test_dynamic_controller.py to ensure consistency and readability. This change does not alter functionality but improves the maintainability of the test code. * Migrated to pytest using claude * Integrate radar parameter into dynamic controller's pytest tests Added a `has_radar` parameter to the test functions in the dynamic controller's pytest file. This allows each function to run both with and without radar inputs, thus enhancing the coverage of our test cases. * Disabling unittest file to allow checks on the pipeline to succeed. Pending to remove this, but leaving it to validate the move to pytest is okay before merging * Replace unittest with pytest for dynamic controller tests Migrated dynamic controller tests from unittest to pytest for improved readability and maintainability. Refactored mock setup using pytest fixtures and monkeypatching while preserving test coverage. * new line... * Refactor and modularize DynamicExperimentalController logic Moved DynamicExperimentalController logic and helper functions to a dedicated module for better readability and maintainability. Simplified longitudinal planner logic by introducing reusable methods to manage MPC mode and longitudinal plan publishing. Adjusted file structure for dynamic controller-related components and updated relevant imports. * Add missing import for messaging in helpers.py The `messaging` module was added to resolve potential issues with undefined references. This change ensures all required imports are present, improving the reliability and maintainability of the code. * Format * Formatting * rebase fix * Refactor MpcSource definition and update references. Moved MpcSource enum into LongitudinalPlanSP for better encapsulation. Updated references in helpers.py to use the new path. This change improves code organization and maintains functionality. * Format * Refactor DEC into a dedicated longitudinal planner class Move Dynamic Experimental Control (DEC) logic to a new `DecLongitudinalPlanner` class for better modularity and maintainability. This simplifies the `LongitudinalPlanner` by delegating DEC-specific behavior and consolidates related methods into a single file. Additionally, redundant code was removed to improve readability and reduce complexity. * **Refactor DEC module structure for better organization** Moved DEC-related files from `dec` to `lib` for improved clarity and consistency within the project structure. Updated all relevant import paths to reflect the new locations. Ensured functionality remains unaffected with these changes. * static test * static * had moved to car_state * cleanup * some more * static method * move around * more cleanup * stuff * into their own * rename * check live param * sync with stock * type hint * unused * smoother trans * window time * fix type hint * pass sm.frame from plannerd * more fixes * more * more explicit * fix test * Revert "fix test" This reverts commit635b15f2bc. * Revert "pass sm.frame from plannerd" This reverts commita8deaa69b8. * use internal frame --------- Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> Co-authored-by: DevTekVE <devtekve@gmail.com> Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
216 lines
9.4 KiB
Python
Executable File
216 lines
9.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import math
|
|
import numpy as np
|
|
from openpilot.common.numpy_fast import clip, interp
|
|
|
|
import cereal.messaging as messaging
|
|
from opendbc.car.interfaces import ACCEL_MIN, ACCEL_MAX
|
|
from openpilot.common.conversions import Conversions as CV
|
|
from openpilot.common.filter_simple import FirstOrderFilter
|
|
from openpilot.common.realtime import DT_MDL
|
|
from openpilot.selfdrive.modeld.constants import ModelConstants
|
|
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
|
|
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
|
|
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, get_speed_error
|
|
from openpilot.selfdrive.car.cruise import V_CRUISE_MAX, V_CRUISE_UNSET
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
from openpilot.sunnypilot.selfdrive.controls.lib.dec.helpers import DecPlanner
|
|
|
|
LON_MPC_STEP = 0.2 # first step is 0.2s
|
|
A_CRUISE_MIN = -1.2
|
|
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
|
|
A_CRUISE_MAX_BP = [0., 10.0, 25., 40.]
|
|
CONTROL_N_T_IDX = ModelConstants.T_IDXS[:CONTROL_N]
|
|
ALLOW_THROTTLE_THRESHOLD = 0.5
|
|
MIN_ALLOW_THROTTLE_SPEED = 2.5
|
|
|
|
# Lookup table for turns
|
|
_A_TOTAL_MAX_V = [1.7, 3.2]
|
|
_A_TOTAL_MAX_BP = [20., 40.]
|
|
|
|
|
|
def get_max_accel(v_ego):
|
|
return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS)
|
|
|
|
def get_coast_accel(pitch):
|
|
return np.sin(pitch) * -5.65 - 0.3 # fitted from data using xx/projects/allow_throttle/compute_coast_accel.py
|
|
|
|
|
|
def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
|
|
"""
|
|
This function returns a limited long acceleration allowed, depending on the existing lateral acceleration
|
|
this should avoid accelerating when losing the target in turns
|
|
"""
|
|
# FIXME: This function to calculate lateral accel is incorrect and should use the VehicleModel
|
|
# The lookup table for turns should also be updated if we do this
|
|
a_total_max = interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V)
|
|
a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
|
|
a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.))
|
|
|
|
return [a_target[0], min(a_target[1], a_x_allowed)]
|
|
|
|
|
|
def get_accel_from_plan(speeds, accels, action_t=DT_MDL, vEgoStopping=0.05):
|
|
if len(speeds) == CONTROL_N:
|
|
v_now = speeds[0]
|
|
a_now = accels[0]
|
|
|
|
v_target = interp(action_t, CONTROL_N_T_IDX, speeds)
|
|
a_target = 2 * (v_target - v_now) / (action_t) - a_now
|
|
v_target_1sec = interp(action_t + 1.0, CONTROL_N_T_IDX, speeds)
|
|
else:
|
|
v_target = 0.0
|
|
v_target_1sec = 0.0
|
|
a_target = 0.0
|
|
should_stop = (v_target < vEgoStopping and
|
|
v_target_1sec < vEgoStopping)
|
|
return a_target, should_stop
|
|
|
|
|
|
class LongitudinalPlanner(DecPlanner):
|
|
def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL):
|
|
self.CP = CP
|
|
self.mpc = LongitudinalMpc(dt=dt)
|
|
DecPlanner.__init__(self, self.CP, self.mpc)
|
|
self.fcw = False
|
|
self.dt = dt
|
|
self.allow_throttle = True
|
|
|
|
self.a_desired = init_a
|
|
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt)
|
|
self.v_model_error = 0.0
|
|
|
|
self.v_desired_trajectory = np.zeros(CONTROL_N)
|
|
self.a_desired_trajectory = np.zeros(CONTROL_N)
|
|
self.j_desired_trajectory = np.zeros(CONTROL_N)
|
|
self.solverExecutionTime = 0.0
|
|
|
|
@staticmethod
|
|
def parse_model(model_msg, model_error):
|
|
if (len(model_msg.position.x) == ModelConstants.IDX_N and
|
|
len(model_msg.velocity.x) == ModelConstants.IDX_N and
|
|
len(model_msg.acceleration.x) == ModelConstants.IDX_N):
|
|
x = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.position.x) - model_error * T_IDXS_MPC
|
|
v = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.velocity.x) - model_error
|
|
a = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.acceleration.x)
|
|
j = np.zeros(len(T_IDXS_MPC))
|
|
else:
|
|
x = np.zeros(len(T_IDXS_MPC))
|
|
v = np.zeros(len(T_IDXS_MPC))
|
|
a = np.zeros(len(T_IDXS_MPC))
|
|
j = np.zeros(len(T_IDXS_MPC))
|
|
if len(model_msg.meta.disengagePredictions.gasPressProbs) > 1:
|
|
throttle_prob = model_msg.meta.disengagePredictions.gasPressProbs[1]
|
|
else:
|
|
throttle_prob = 1.0
|
|
return x, v, a, j, throttle_prob
|
|
|
|
def update(self, sm):
|
|
DecPlanner.update(self, sm)
|
|
self.mpc.mode = 'blended' if sm['selfdriveState'].experimentalMode else 'acc'
|
|
if dec_mpc_mode := self.get_mpc_mode(sm):
|
|
self.mpc.mode = dec_mpc_mode
|
|
|
|
if len(sm['carControl'].orientationNED) == 3:
|
|
accel_coast = get_coast_accel(sm['carControl'].orientationNED[1])
|
|
else:
|
|
accel_coast = ACCEL_MAX
|
|
|
|
v_ego = sm['carState'].vEgo
|
|
v_cruise_kph = min(sm['carState'].vCruise, V_CRUISE_MAX)
|
|
v_cruise = v_cruise_kph * CV.KPH_TO_MS
|
|
v_cruise_initialized = sm['carState'].vCruise != V_CRUISE_UNSET
|
|
|
|
long_control_off = sm['controlsState'].longControlState == LongCtrlState.off
|
|
force_slow_decel = sm['controlsState'].forceDecel
|
|
|
|
# Reset current state when not engaged, or user is controlling the speed
|
|
reset_state = long_control_off if self.CP.openpilotLongitudinalControl else not sm['selfdriveState'].enabled
|
|
# PCM cruise speed may be updated a few cycles later, check if initialized
|
|
reset_state = reset_state or not v_cruise_initialized
|
|
|
|
# No change cost when user is controlling the speed, or when standstill
|
|
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
|
|
|
|
if self.mpc.mode == 'acc':
|
|
accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
|
|
steer_angle_without_offset = sm['carState'].steeringAngleDeg - sm['liveParameters'].angleOffsetDeg
|
|
accel_limits_turns = limit_accel_in_turns(v_ego, steer_angle_without_offset, accel_limits, self.CP)
|
|
else:
|
|
accel_limits = [ACCEL_MIN, ACCEL_MAX]
|
|
accel_limits_turns = [ACCEL_MIN, ACCEL_MAX]
|
|
|
|
if reset_state:
|
|
self.v_desired_filter.x = v_ego
|
|
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
|
|
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
|
|
|
|
# Prevent divergence, smooth in current v_ego
|
|
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
|
|
# Compute model v_ego error
|
|
self.v_model_error = get_speed_error(sm['modelV2'], v_ego)
|
|
x, v, a, j, throttle_prob = self.parse_model(sm['modelV2'], self.v_model_error)
|
|
# Don't clip at low speeds since throttle_prob doesn't account for creep
|
|
self.allow_throttle = throttle_prob > ALLOW_THROTTLE_THRESHOLD or v_ego <= MIN_ALLOW_THROTTLE_SPEED
|
|
|
|
if not self.allow_throttle:
|
|
clipped_accel_coast = max(accel_coast, accel_limits_turns[0])
|
|
clipped_accel_coast_interp = interp(v_ego, [MIN_ALLOW_THROTTLE_SPEED, MIN_ALLOW_THROTTLE_SPEED*2], [accel_limits_turns[1], clipped_accel_coast])
|
|
accel_limits_turns[1] = min(accel_limits_turns[1], clipped_accel_coast_interp)
|
|
|
|
if force_slow_decel:
|
|
v_cruise = 0.0
|
|
# clip limits, cannot init MPC outside of bounds
|
|
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
|
|
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
|
|
|
|
self.mpc.set_weights(prev_accel_constraint, personality=sm['selfdriveState'].personality)
|
|
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
|
|
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
|
|
self.mpc.update(sm['radarState'], v_cruise, x, v, a, j, personality=sm['selfdriveState'].personality)
|
|
|
|
self.v_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.v_solution)
|
|
self.a_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC, self.mpc.a_solution)
|
|
self.j_desired_trajectory = np.interp(CONTROL_N_T_IDX, T_IDXS_MPC[:-1], self.mpc.j_solution)
|
|
|
|
# TODO counter is only needed because radar is glitchy, remove once radar is gone
|
|
self.fcw = self.mpc.crash_cnt > 2 and not sm['carState'].standstill
|
|
if self.fcw:
|
|
cloudlog.info("FCW triggered")
|
|
|
|
# Interpolate 0.05 seconds and save as starting point for next iteration
|
|
a_prev = self.a_desired
|
|
self.a_desired = float(interp(self.dt, CONTROL_N_T_IDX, self.a_desired_trajectory))
|
|
self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0
|
|
|
|
def publish(self, sm, pm):
|
|
plan_send = messaging.new_message('longitudinalPlan')
|
|
|
|
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'selfdriveState'])
|
|
|
|
longitudinalPlan = plan_send.longitudinalPlan
|
|
longitudinalPlan.modelMonoTime = sm.logMonoTime['modelV2']
|
|
longitudinalPlan.processingDelay = (plan_send.logMonoTime / 1e9) - sm.logMonoTime['modelV2']
|
|
longitudinalPlan.solverExecutionTime = self.mpc.solve_time
|
|
|
|
longitudinalPlan.speeds = self.v_desired_trajectory.tolist()
|
|
longitudinalPlan.accels = self.a_desired_trajectory.tolist()
|
|
longitudinalPlan.jerks = self.j_desired_trajectory.tolist()
|
|
|
|
longitudinalPlan.hasLead = sm['radarState'].leadOne.status
|
|
longitudinalPlan.longitudinalPlanSource = self.mpc.source
|
|
longitudinalPlan.fcw = self.fcw
|
|
|
|
action_t = self.CP.longitudinalActuatorDelay + DT_MDL
|
|
a_target, should_stop = get_accel_from_plan(longitudinalPlan.speeds, longitudinalPlan.accels,
|
|
action_t=action_t, vEgoStopping=self.CP.vEgoStopping)
|
|
longitudinalPlan.aTarget = a_target
|
|
longitudinalPlan.shouldStop = should_stop
|
|
longitudinalPlan.allowBrake = True
|
|
longitudinalPlan.allowThrottle = self.allow_throttle
|
|
|
|
pm.send('longitudinalPlan', plan_send)
|
|
self.publish_longitudinal_plan_sp(sm, pm)
|