modeld_v2: decouple planplus scaling from accel (#1730)

Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
This commit is contained in:
James Vecellio-Grant
2026-02-28 22:03:43 -08:00
committed by GitHub
parent de0790f912
commit d1005f3b69
2 changed files with 25 additions and 17 deletions

View File

@@ -163,13 +163,12 @@ class ModelState(ModelStateBase):
def get_action_from_model(self, model_output: dict[str, np.ndarray], prev_action: log.ModelDataV2.Action,
lat_action_t: float, long_action_t: float, v_ego: float) -> log.ModelDataV2.Action:
plan = model_output['plan'][0]
if 'planplus' in model_output and self.PLANPLUS_CONTROL != 1.0:
plan = plan + (self.PLANPLUS_CONTROL - 1.0) * model_output['planplus'][0]
desired_accel, should_stop = get_accel_from_plan(plan[:, Plan.VELOCITY][:, 0], plan[:, Plan.ACCELERATION][:, 0], self.constants.T_IDXS,
action_t=long_action_t)
desired_accel = smooth_value(desired_accel, prev_action.desiredAcceleration, self.LONG_SMOOTH_SECONDS)
desired_curvature = get_curvature_from_output(model_output, plan, v_ego, lat_action_t, self.mlsim)
curvature_plan = plan + (self.PLANPLUS_CONTROL - 1.0) * model_output['planplus'][0] if 'planplus' in model_output and self.PLANPLUS_CONTROL != 1.0 else plan
desired_curvature = get_curvature_from_output(model_output, curvature_plan, v_ego, lat_action_t, self.mlsim)
if self.generation is not None and self.generation >= 10: # smooth curvature for post FOF models
if v_ego > self.MIN_LAT_CONTROL_SPEED:
desired_curvature = smooth_value(desired_curvature, prev_action.desiredCurvature, self.LAT_SMOOTH_SECONDS)

View File

@@ -15,7 +15,7 @@ class MockStruct:
def test_recovery_power_scaling():
state = MockStruct(
PLANPLUS_CONTROL=1.0,
PLANPLUS_CONTROL=0.75,
LONG_SMOOTH_SECONDS=0.3,
LAT_SMOOTH_SECONDS=0.1,
MIN_LAT_CONTROL_SPEED=0.3,
@@ -25,37 +25,46 @@ def test_recovery_power_scaling():
)
prev_action = log.ModelDataV2.Action()
recorded_vel: list = []
recorded_curv_plans: list = []
def mock_accel(plan_vel, plan_accel, t_idxs, action_t=0.0):
recorded_vel.append(plan_vel.copy())
return 0.0, False
def mock_curvature(output, plan, vego, lat_action_t, mlsim):
recorded_curv_plans.append(plan.copy())
return 0.0
modeld.get_accel_from_plan = mock_accel
modeld.get_curvature_from_output = lambda *args: 0.0
modeld.get_curvature_from_output = mock_curvature
plan = np.random.rand(1, 100, 15).astype(np.float32)
planplus = np.random.rand(1, 100, 15).astype(np.float32)
merged_plan = plan + planplus
model_output: dict = {
'plan': plan.copy(),
'plan': merged_plan.copy(),
'planplus': planplus.copy()
}
test_cases: list = [
# (control, v_ego, expected_factor)
(0.55, 20.0, 1.0),
(1.0, 25.0, .75),
(1.5, 25.1, 0.75),
(2.0, 20.0, 1.0),
(0.75, 19.0, 1.0),
(0.8, 25.1, 0.75),
# (control, v_ego)
(0.55, 20.0),
(1.0, 25.0),
(1.5, 25.1),
(2.0, 20.0),
(0.75, 19.0),
(0.8, 25.1),
]
for control, v_ego, factor in test_cases:
for control, v_ego in test_cases:
state.PLANPLUS_CONTROL = control
recorded_vel.clear()
recorded_curv_plans.clear()
ModelState.get_action_from_model(state, model_output, prev_action, 0.0, 0.0, v_ego)
expected_recovery_power = control * factor
expected_plan_vel = plan[0, :, Plan.VELOCITY][:, 0] + expected_recovery_power * planplus[0, :, Plan.VELOCITY][:, 0]
expected_accel_plan_vel = plan[0, :, Plan.VELOCITY][:, 0] + planplus[0, :, Plan.VELOCITY][:, 0]
np.testing.assert_allclose(recorded_vel[0], expected_accel_plan_vel, rtol=1e-5, atol=1e-6)
np.testing.assert_allclose(recorded_vel[0], expected_plan_vel, rtol=1e-5, atol=1e-6)
# For the below, yes, I know this isn't the same slicing as fillmodlmsg. This is to show that the values are only scaled on curv
expected_curv_plan_vel = plan[0, :, Plan.VELOCITY][:, 0] + control * planplus[0, :, Plan.VELOCITY][:, 0]
np.testing.assert_allclose(recorded_curv_plans[0][:, Plan.VELOCITY][:, 0], expected_curv_plan_vel, rtol=1e-5, atol=1e-6)