LateralPlanner should only compute curvature (#20289)

* get curvature from planner

* no need to check active

* remove that

* remove self

* liveParams not needed

* cast

* fix test bug

* fixes

* fix ui.py

* fix radians

* update refs

* update refs

* bump cereal

* bump cereal

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: c23ec9f753
This commit is contained in:
Willem Melching 2021-03-12 06:08:51 +01:00 committed by GitHub
parent c4aa4ef5c7
commit 4bd1929d2b
15 changed files with 147 additions and 126 deletions

View File

@ -28,11 +28,6 @@ class CarInterface(CarInterfaceBase):
ret.steerRateCost = 0.5
ret.steerActuatorDelay = 0.1
ret.lateralTuning.pid.kf = 0.00006
ret.lateralTuning.pid.kiBP, ret.lateralTuning.pid.kpBP = [[0.0], [0.0]]
ret.lateralTuning.pid.kpV, ret.lateralTuning.pid.kiV = [[0.01], [0.005]]
ret.steerMaxBP = [0.] # m/s
ret.steerMaxV = [1.]
if candidate in [CAR.ROGUE, CAR.XTRAIL]:
ret.mass = 1610 + STD_CARGO_KG

View File

@ -33,13 +33,14 @@ class TestCarInterfaces(unittest.TestCase):
self.assertGreater(car_params.mass, 1)
self.assertGreater(car_params.steerRateCost, 1e-3)
tuning = car_params.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(car_params.lateralTuning.pid.kpV))
elif tuning == 'lqr':
self.assertTrue(len(car_params.lateralTuning.lqr.a))
elif tuning == 'indi':
self.assertTrue(len(car_params.lateralTuning.indi.outerLoopGainV))
if car_params.steerControlType != car.CarParams.SteerControlType.angle:
tuning = car_params.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(car_params.lateralTuning.pid.kpV))
elif tuning == 'lqr':
self.assertTrue(len(car_params.lateralTuning.lqr.a))
elif tuning == 'indi':
self.assertTrue(len(car_params.lateralTuning.indi.outerLoopGainV))
# Run car interface
CC = car.CarControl.new_message()

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import os
import math
from cereal import car, log
from common.numpy_fast import clip
from common.realtime import sec_since_boot, config_realtime_process, Priority, Ratekeeper, DT_CTRL
@ -16,6 +17,7 @@ from selfdrive.controls.lib.longcontrol import LongControl, STARTING_TARGET_SPEE
from selfdrive.controls.lib.latcontrol_pid import LatControlPID
from selfdrive.controls.lib.latcontrol_indi import LatControlINDI
from selfdrive.controls.lib.latcontrol_lqr import LatControlLQR
from selfdrive.controls.lib.latcontrol_angle import LatControlAngle
from selfdrive.controls.lib.events import Events, ET
from selfdrive.controls.lib.alertmanager import AlertManager
from selfdrive.controls.lib.vehicle_model import VehicleModel
@ -102,7 +104,9 @@ class Controls:
self.LoC = LongControl(self.CP, self.CI.compute_gb)
self.VM = VehicleModel(self.CP)
if self.CP.lateralTuning.which() == 'pid':
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP)
elif self.CP.lateralTuning.which() == 'indi':
self.LaC = LatControlINDI(self.CP)
@ -363,6 +367,12 @@ class Controls:
def state_control(self, CS):
"""Given the state, this function returns an actuators packet"""
# Update VehicleModel
params = self.sm['liveParameters']
x = max(params.stiffnessFactor, 0.1)
sr = max(params.steerRatio, 0.1)
self.VM.update_params(x, sr)
lat_plan = self.sm['lateralPlan']
long_plan = self.sm['longitudinalPlan']
@ -386,8 +396,9 @@ class Controls:
# Gas/Brake PID loop
actuators.gas, actuators.brake = self.LoC.update(self.active, CS, v_acc_sol, long_plan.vTargetFuture, a_acc_sol, self.CP)
# Steering PID loop and lateral MPC
actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(self.active, CS, self.CP, lat_plan)
actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(self.active, CS, self.CP, self.VM, params, lat_plan)
# Check for difference between desired angle and angle for angle based control
angle_control_saturated = self.CP.steerControlType == car.CarParams.SteerControlType.angle and \
@ -401,12 +412,14 @@ class Controls:
# Send a "steering required alert" if saturation count has reached the limit
if (lac_log.saturated and not CS.steeringPressed) or \
(self.saturated_count > STEER_ANGLE_SATURATION_TIMEOUT):
# Check if we deviated from the path
left_deviation = actuators.steer > 0 and lat_plan.dPathPoints[0] < -0.1
right_deviation = actuators.steer < 0 and lat_plan.dPathPoints[0] > 0.1
if left_deviation or right_deviation:
self.events.add(EventName.steerSaturated)
if len(lat_plan.dPathPoints):
# Check if we deviated from the path
left_deviation = actuators.steer > 0 and lat_plan.dPathPoints[0] < -0.1
right_deviation = actuators.steer < 0 and lat_plan.dPathPoints[0] > 0.1
if left_deviation or right_deviation:
self.events.add(EventName.steerSaturated)
return actuators, v_acc_sol, a_acc_sol, lac_log
@ -468,7 +481,14 @@ class Controls:
force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \
(self.state == State.softDisabling)
steer_angle_rad = (CS.steeringAngleDeg - self.sm['lateralPlan'].angleOffsetDeg) * CV.DEG_TO_RAD
# Curvature & Steering angle
params = self.sm['liveParameters']
lat_plan = self.sm['lateralPlan']
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - params.angleOffsetAverageDeg)
curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo)
angle_steers_des = math.degrees(self.VM.get_steer_from_curvature(-lat_plan.curvature, CS.vEgo))
angle_steers_des += params.angleOffsetDeg
# controlsState
dat = messaging.new_message('controlsState')
@ -486,7 +506,8 @@ class Controls:
controlsState.lateralPlanMonoTime = self.sm.logMonoTime['lateralPlan']
controlsState.enabled = self.enabled
controlsState.active = self.active
controlsState.curvature = self.VM.calc_curvature(steer_angle_rad, CS.vEgo)
controlsState.curvature = curvature
controlsState.steeringAngleDesiredDeg = angle_steers_des
controlsState.state = self.state
controlsState.engageable = not self.events.any(ET.NO_ENTRY)
controlsState.longControlState = self.LoC.long_control_state
@ -495,7 +516,6 @@ class Controls:
controlsState.upAccelCmd = float(self.LoC.pid.p)
controlsState.uiAccelCmd = float(self.LoC.pid.i)
controlsState.ufAccelCmd = float(self.LoC.pid.f)
controlsState.steeringAngleDesiredDeg = float(self.LaC.angle_steers_des)
controlsState.vTargetLead = float(v_acc)
controlsState.aTarget = float(a_acc)
controlsState.cumLagMs = -self.rk.remaining * 1000.
@ -503,7 +523,9 @@ class Controls:
controlsState.forceDecel = bool(force_decel)
controlsState.canErrorCounter = self.can_error_counter
if self.CP.lateralTuning.which() == 'pid':
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
controlsState.lateralControlState.angleState = lac_log
elif self.CP.lateralTuning.which() == 'pid':
controlsState.lateralControlState.pidState = lac_log
elif self.CP.lateralTuning.which() == 'lqr':
controlsState.lateralControlState.lqrState = lac_log

View File

@ -0,0 +1,25 @@
import math
from cereal import log
class LatControlAngle():
def __init__(self, CP):
pass
def reset(self):
pass
def update(self, active, CS, CP, VM, params, lat_plan):
angle_log = log.ControlsState.LateralAngleState.new_message()
if CS.vEgo < 0.3 or not active:
angle_log.active = False
angle_steers_des = float(CS.steeringAngleDeg)
else:
angle_log.active = True
angle_steers_des = math.degrees(VM.get_steer_from_curvature(-lat_plan.curvature, CS.vEgo))
angle_steers_des += params.angleOffsetDeg
angle_log.saturated = False
angle_log.steeringAngleDeg = angle_steers_des
return 0, float(angle_steers_des), angle_log

View File

@ -80,7 +80,7 @@ class LatControlINDI():
return self.sat_count > self.sat_limit
def update(self, active, CS, CP, lat_plan):
def update(self, active, CS, CP, VM, params, lat_plan):
self.speed = CS.vEgo
# Update Kalman filter
y = np.array([[math.radians(CS.steeringAngleDeg)], [math.radians(CS.steeringRateDeg)]])
@ -96,11 +96,10 @@ class LatControlINDI():
self.output_steer = 0.0
self.delayed_output = 0.0
else:
self.angle_steers_des = lat_plan.steeringAngleDeg
self.rate_steers_des = lat_plan.steeringRateDeg
steers_des = VM.get_steer_from_curvature(-lat_plan.curvature, CS.vEgo)
steers_des += math.radians(params.angleOffsetDeg)
steers_des = math.radians(self.angle_steers_des)
rate_des = math.radians(self.rate_steers_des)
rate_des = VM.get_steer_from_curvature(-lat_plan.curvatureRate, CS.vEgo)
# Expected actuator value
alpha = 1. - DT_CTRL / (self.RC + DT_CTRL)
@ -143,4 +142,4 @@ class LatControlINDI():
check_saturation = (CS.vEgo > 10.) and not CS.steeringRateLimited and not CS.steeringPressed
indi_log.saturated = self._check_saturation(self.output_steer, check_saturation, steers_max)
return float(self.output_steer), float(self.angle_steers_des), indi_log
return float(self.output_steer), 0, indi_log

View File

@ -1,4 +1,6 @@
import math
import numpy as np
from selfdrive.controls.lib.drive_helpers import get_steer_max
from common.numpy_fast import clip
from common.realtime import DT_CTRL
@ -28,7 +30,6 @@ class LatControlLQR():
def reset(self):
self.i_lqr = 0.0
self.output_steer = 0.0
self.sat_count = 0.0
def _check_saturation(self, control, check_saturation, limit):
@ -43,39 +44,42 @@ class LatControlLQR():
return self.sat_count > self.sat_limit
def update(self, active, CS, CP, lat_plan):
def update(self, active, CS, CP, VM, params, lat_plan):
lqr_log = log.ControlsState.LateralLQRState.new_message()
steers_max = get_steer_max(CP, CS.vEgo)
torque_scale = (0.45 + CS.vEgo / 60.0)**2 # Scale actuator model with speed
steering_angle = CS.steeringAngleDeg
# Subtract offset. Zero angle should correspond to zero torque
self.angle_steers_des = lat_plan.steeringAngleDeg - lat_plan.angleOffsetDeg
steering_angle -= lat_plan.angleOffsetDeg
steering_angle_no_offset = CS.steeringAngleDeg - params.angleOffsetAverageDeg
desired_angle = math.degrees(VM.get_steer_from_curvature(-lat_plan.curvature, CS.vEgo))
instant_offset = params.angleOffsetDeg - params.angleOffsetAverageDeg
desired_angle += instant_offset # Only add offset that originates from vehicle model errors
# Update Kalman filter
angle_steers_k = float(self.C.dot(self.x_hat))
e = steering_angle - angle_steers_k
e = steering_angle_no_offset - angle_steers_k
self.x_hat = self.A.dot(self.x_hat) + self.B.dot(CS.steeringTorqueEps / torque_scale) + self.L.dot(e)
if CS.vEgo < 0.3 or not active:
lqr_log.active = False
lqr_output = 0.
output_steer = 0.
self.reset()
else:
lqr_log.active = True
# LQR
u_lqr = float(self.angle_steers_des / self.dc_gain - self.K.dot(self.x_hat))
u_lqr = float(desired_angle / self.dc_gain - self.K.dot(self.x_hat))
lqr_output = torque_scale * u_lqr / self.scale
# Integrator
if CS.steeringPressed:
self.i_lqr -= self.i_unwind_rate * float(np.sign(self.i_lqr))
else:
error = self.angle_steers_des - angle_steers_k
error = desired_angle - angle_steers_k
i = self.i_lqr + self.ki * self.i_rate * error
control = lqr_output + i
@ -83,15 +87,15 @@ class LatControlLQR():
(error <= 0 and (control >= -steers_max or i > 0.0)):
self.i_lqr = i
self.output_steer = lqr_output + self.i_lqr
self.output_steer = clip(self.output_steer, -steers_max, steers_max)
output_steer = lqr_output + self.i_lqr
output_steer = clip(output_steer, -steers_max, steers_max)
check_saturation = (CS.vEgo > 10) and not CS.steeringRateLimited and not CS.steeringPressed
saturated = self._check_saturation(self.output_steer, check_saturation, steers_max)
saturated = self._check_saturation(output_steer, check_saturation, steers_max)
lqr_log.steeringAngleDeg = angle_steers_k + lat_plan.angleOffsetDeg
lqr_log.steeringAngleDeg = angle_steers_k + params.angleOffsetAverageDeg
lqr_log.i = self.i_lqr
lqr_log.output = self.output_steer
lqr_log.output = output_steer
lqr_log.lqrOutput = lqr_output
lqr_log.saturated = saturated
return self.output_steer, float(self.angle_steers_des), lqr_log
return output_steer, 0, lqr_log

View File

@ -1,6 +1,7 @@
import math
from selfdrive.controls.lib.pid import PIController
from selfdrive.controls.lib.drive_helpers import get_steer_max
from cereal import car
from cereal import log
@ -10,35 +11,35 @@ class LatControlPID():
(CP.lateralTuning.pid.kiBP, CP.lateralTuning.pid.kiV),
k_f=CP.lateralTuning.pid.kf, pos_limit=1.0, neg_limit=-1.0,
sat_limit=CP.steerLimitTimer)
self.angle_steers_des = 0.
def reset(self):
self.pid.reset()
def update(self, active, CS, CP, lat_plan):
def update(self, active, CS, CP, VM, params, lat_plan):
pid_log = log.ControlsState.LateralPIDState.new_message()
pid_log.steeringAngleDeg = float(CS.steeringAngleDeg)
pid_log.steeringRateDeg = float(CS.steeringRateDeg)
angle_steers_des_no_offset = math.degrees(VM.get_steer_from_curvature(-lat_plan.curvature, CS.vEgo))
angle_steers_des = angle_steers_des_no_offset + params.angleOffsetDeg
if CS.vEgo < 0.3 or not active:
output_steer = 0.0
pid_log.active = False
self.pid.reset()
else:
self.angle_steers_des = lat_plan.steeringAngleDeg # get from MPC/LateralPlanner
steers_max = get_steer_max(CP, CS.vEgo)
self.pid.pos_limit = steers_max
self.pid.neg_limit = -steers_max
steer_feedforward = self.angle_steers_des # feedforward desired angle
if CP.steerControlType == car.CarParams.SteerControlType.torque:
# TODO: feedforward something based on lat_plan.rateSteers
steer_feedforward -= lat_plan.angleOffsetDeg # subtract the offset, since it does not contribute to resistive torque
steer_feedforward *= CS.vEgo**2 # proportional to realigning tire momentum (~ lateral accel)
# TODO: feedforward something based on lat_plan.rateSteers
steer_feedforward = angle_steers_des_no_offset # offset does not contribute to resistive torque
steer_feedforward *= CS.vEgo**2 # proportional to realigning tire momentum (~ lateral accel)
deadzone = 0.0
check_saturation = (CS.vEgo > 10) and not CS.steeringRateLimited and not CS.steeringPressed
output_steer = self.pid.update(self.angle_steers_des, CS.steeringAngleDeg, check_saturation=check_saturation, override=CS.steeringPressed,
output_steer = self.pid.update(angle_steers_des, CS.steeringAngleDeg, check_saturation=check_saturation, override=CS.steeringPressed,
feedforward=steer_feedforward, speed=CS.vEgo, deadzone=deadzone)
pid_log.active = True
pid_log.p = self.pid.p
@ -47,4 +48,4 @@ class LatControlPID():
pid_log.output = output_steer
pid_log.saturated = bool(self.pid.saturated)
return output_steer, float(self.angle_steers_des), pid_log
return output_steer, 0, pid_log

View File

@ -24,8 +24,8 @@ typedef struct {
double x[N+1];
double y[N+1];
double psi[N+1];
double tire_angle[N+1];
double tire_angle_rate[N];
double curvature[N+1];
double curvature_rate[N];
double cost;
} log_t;
@ -95,9 +95,9 @@ int run_mpc(state_t * x0, log_t * solution, double v_ego,
solution->x[i] = acadoVariables.x[i*NX];
solution->y[i] = acadoVariables.x[i*NX+1];
solution->psi[i] = acadoVariables.x[i*NX+2];
solution->tire_angle[i] = acadoVariables.x[i*NX+3];
solution->curvature[i] = acadoVariables.x[i*NX+3];
if (i < N){
solution->tire_angle_rate[i] = acadoVariables.u[i];
solution->curvature_rate[i] = acadoVariables.u[i];
}
}
solution->cost = acado_getObjective();

View File

@ -75,23 +75,13 @@ class LateralPlanner():
self.cur_state[0].psi = 0.0
self.cur_state[0].curvature = 0.0
self.angle_steers_des = 0.0
self.angle_steers_des_mpc = 0.0
self.angle_steers_des_time = 0.0
self.desired_curvature = 0.0
self.desired_curvature_rate = 0.0
def update(self, sm, CP, VM):
def update(self, sm, CP):
v_ego = sm['carState'].vEgo
active = sm['controlsState'].active
steering_wheel_angle_offset_deg = sm['liveParameters'].angleOffsetDeg
steering_wheel_angle_deg = sm['carState'].steeringAngleDeg
# Update vehicle model
x = max(sm['liveParameters'].stiffnessFactor, 0.1)
sr = max(sm['liveParameters'].steerRatio, 0.1)
VM.update_params(x, sr)
curvature_factor = VM.curvature_factor(v_ego)
measured_curvature = -curvature_factor * math.radians(steering_wheel_angle_deg - steering_wheel_angle_offset_deg) / VM.sR
measured_curvature = sm['controlsState'].curvature
md = sm['modelV2']
self.LP.parse_model(sm['modelV2'])
@ -166,8 +156,8 @@ class LateralPlanner():
self.LP.lll_prob *= self.lane_change_ll_prob
self.LP.rll_prob *= self.lane_change_ll_prob
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz)
y_pts = np.interp(v_ego * self.t_idxs[:MPC_N+1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:,1])
heading_pts = np.interp(v_ego * self.t_idxs[:MPC_N+1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
y_pts = np.interp(v_ego * self.t_idxs[:MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:,1])
heading_pts = np.interp(v_ego * self.t_idxs[:MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
self.y_pts = y_pts
assert len(y_pts) == MPC_N + 1
@ -181,31 +171,22 @@ class LateralPlanner():
self.cur_state.x = 0.0
self.cur_state.y = 0.0
self.cur_state.psi = 0.0
self.cur_state.curvature = interp(DT_MDL, self.t_idxs[:MPC_N+1], self.mpc_solution.curvature)
self.cur_state.curvature = interp(DT_MDL, self.t_idxs[:MPC_N + 1], self.mpc_solution.curvature)
# TODO this needs more thought, use .2s extra for now to estimate other delays
delay = CP.steerActuatorDelay + .2
current_curvature = self.mpc_solution.curvature[0]
psi = interp(delay, self.t_idxs[:MPC_N+1], self.mpc_solution.psi)
psi = interp(delay, self.t_idxs[:MPC_N + 1], self.mpc_solution.psi)
next_curvature_rate = self.mpc_solution.curvature_rate[0]
# MPC can plan to turn the wheel and turn back before t_delay. This means
# in high delay cases some corrections never even get commanded. So just use
# psi to calculate a simple linearization of desired curvature
curvature_diff_from_psi = psi/(max(v_ego, 1e-1) * delay) - current_curvature
next_curvature = current_curvature + 2*curvature_diff_from_psi
curvature_diff_from_psi = psi / (max(v_ego, 1e-1) * delay) - current_curvature
next_curvature = current_curvature + 2 * curvature_diff_from_psi
# reset to current steer angle if not active or overriding
if active:
curvature_desired = next_curvature
desired_curvature_rate = next_curvature_rate
else:
curvature_desired = measured_curvature
desired_curvature_rate = 0.0
# negative sign, controls uses different convention
self.desired_steering_wheel_angle_deg = -float(math.degrees(curvature_desired * VM.sR)/curvature_factor) + steering_wheel_angle_offset_deg
self.desired_steering_wheel_angle_rate_deg = -float(math.degrees(desired_curvature_rate * VM.sR)/curvature_factor)
self.desired_curvature = next_curvature
self.desired_curvature_rate = next_curvature_rate
# Check for infeasable MPC solution
mpc_nans = any(math.isnan(x) for x in self.mpc_solution.curvature)
@ -226,16 +207,16 @@ class LateralPlanner():
def publish(self, sm, pm):
plan_solution_valid = self.solution_invalid_cnt < 2
plan_send = messaging.new_message('lateralPlan')
plan_send.valid = sm.all_alive_and_valid(service_list=['carState', 'controlsState', 'liveParameters', 'modelV2'])
plan_send.valid = sm.all_alive_and_valid(service_list=['carState', 'controlsState', 'modelV2'])
plan_send.lateralPlan.laneWidth = float(self.LP.lane_width)
plan_send.lateralPlan.dPathPoints = [float(x) for x in self.y_pts]
plan_send.lateralPlan.lProb = float(self.LP.lll_prob)
plan_send.lateralPlan.rProb = float(self.LP.rll_prob)
plan_send.lateralPlan.dProb = float(self.LP.d_prob)
plan_send.lateralPlan.steeringAngleDeg = float(self.desired_steering_wheel_angle_deg)
plan_send.lateralPlan.steeringRateDeg = float(self.desired_steering_wheel_angle_rate_deg)
plan_send.lateralPlan.angleOffsetDeg = float(sm['liveParameters'].angleOffsetAverageDeg)
plan_send.lateralPlan.curvature = float(self.desired_curvature)
plan_send.lateralPlan.curvatureRate = float(self.desired_curvature_rate)
plan_send.lateralPlan.mpcSolutionValid = bool(plan_solution_valid)
plan_send.lateralPlan.desire = self.desire
@ -246,9 +227,9 @@ class LateralPlanner():
if LOG_MPC:
dat = messaging.new_message('liveMpc')
dat.liveMpc.x = list(self.mpc_solution[0].x)
dat.liveMpc.y = list(self.mpc_solution[0].y)
dat.liveMpc.psi = list(self.mpc_solution[0].psi)
dat.liveMpc.tire_angle = list(self.mpc_solution[0].tire_angle)
dat.liveMpc.cost = self.mpc_solution[0].cost
dat.liveMpc.x = list(self.mpc_solution.x)
dat.liveMpc.y = list(self.mpc_solution.y)
dat.liveMpc.psi = list(self.mpc_solution.psi)
dat.liveMpc.curvature = list(self.mpc_solution.curvature)
dat.liveMpc.cost = self.mpc_solution.cost
pm.send('liveMpc', dat)

View File

@ -107,7 +107,7 @@ class Planner():
self.v_acc_future = min([self.mpc1.v_mpc_future, self.mpc2.v_mpc_future, v_cruise_setpoint])
def update(self, sm, CP, VM, PP):
def update(self, sm, CP):
"""Gets called when new radarState is available"""
cur_time = sec_since_boot()
v_ego = sm['carState'].vEgo

View File

@ -4,7 +4,6 @@ from common.params import Params
from common.realtime import Priority, config_realtime_process
from selfdrive.swaglog import cloudlog
from selfdrive.controls.lib.longitudinal_planner import Planner
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.controls.lib.lateral_planner import LateralPlanner
import cereal.messaging as messaging
@ -17,32 +16,25 @@ def plannerd_thread(sm=None, pm=None):
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
cloudlog.info("plannerd got CarParams: %s", CP.carName)
PL = Planner(CP)
PP = LateralPlanner(CP)
VM = VehicleModel(CP)
longitudinal_planner = Planner(CP)
lateral_planner = LateralPlanner(CP)
if sm is None:
sm = messaging.SubMaster(['carState', 'controlsState', 'radarState', 'modelV2', 'liveParameters'],
sm = messaging.SubMaster(['carState', 'controlsState', 'radarState', 'modelV2'],
poll=['radarState', 'modelV2'])
if pm is None:
pm = messaging.PubMaster(['longitudinalPlan', 'liveLongitudinalMpc', 'lateralPlan', 'liveMpc'])
sm['liveParameters'].valid = True
sm['liveParameters'].sensorValid = True
sm['liveParameters'].steerRatio = CP.steerRatio
sm['liveParameters'].stiffnessFactor = 1.0
while True:
sm.update()
if sm.updated['modelV2']:
PP.update(sm, CP, VM)
PP.publish(sm, pm)
lateral_planner.update(sm, CP)
lateral_planner.publish(sm, pm)
if sm.updated['radarState']:
PL.update(sm, CP, VM, PP)
PL.publish(sm, pm)
longitudinal_planner.update(sm, CP)
longitudinal_planner.publish(sm, pm)
def main(sm=None, pm=None):

View File

@ -249,7 +249,7 @@ CONFIGS = [
proc_name="plannerd",
pub_sub={
"modelV2": ["lateralPlan"], "radarState": ["longitudinalPlan"],
"carState": [], "controlsState": [], "liveParameters": [],
"carState": [], "controlsState": [],
},
ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay"],
init_callback=get_car_params,

View File

@ -1 +1 @@
2fac648a63f45ddee16838dabae4490dee13e2a9
31aa88edbb3badbbf2d21c7ffd0ba38f9bb1ae2d

View File

@ -85,13 +85,14 @@ class TestCarModel(unittest.TestCase):
self.assertGreater(self.CP.mass, 1)
self.assertGreater(self.CP.steerRateCost, 1e-3)
tuning = self.CP.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
elif tuning == 'lqr':
self.assertTrue(len(self.CP.lateralTuning.lqr.a))
elif tuning == 'indi':
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV))
if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
tuning = self.CP.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
elif tuning == 'lqr':
self.assertTrue(len(self.CP.lateralTuning.lqr.a))
elif tuning == 'indi':
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV))
self.assertTrue(self.CP.enableCamera)

View File

@ -143,7 +143,7 @@ def ui_thread(addr, frame_address):
plot_arr[:-1] = plot_arr[1:]
plot_arr[-1, name_to_arr_idx['angle_steers']] = sm['carState'].steeringAngleDeg
plot_arr[-1, name_to_arr_idx['angle_steers_des']] = sm['carControl'].actuators.steeringAngleDeg
plot_arr[-1, name_to_arr_idx['angle_steers_des']] = sm['controlsState'].steeringAngleDesiredDeg
plot_arr[-1, name_to_arr_idx['angle_steers_k']] = angle_steers_k
plot_arr[-1, name_to_arr_idx['gas']] = sm['carState'].gas
plot_arr[-1, name_to_arr_idx['computer_gas']] = sm['carControl'].actuators.gas