Files
onepilot/sunnypilot/modeld/fill_model_msg.py
github-actions[bot] 7fa972be6a sunnypilot v2026.02.09-4080
version: sunnypilot v2025.003.000 (dev)
date: 2026-02-09T02:04:38
master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
2026-02-09 02:04:38 +00:00

224 lines
11 KiB
Python

import os
import capnp
import numpy as np
from cereal import log
from openpilot.sunnypilot.modeld.constants import ModelConstants, Plan
from openpilot.sunnypilot.models.helpers import plan_x_idxs_helper
from openpilot.sunnypilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, get_lag_adjusted_curvature, MIN_SPEED
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
ConfidenceClass = log.ModelDataV2.ConfidenceClass
class PublishState:
def __init__(self):
self.disengage_buffer = np.zeros(ModelConstants.CONFIDENCE_BUFFER_LEN*ModelConstants.DISENGAGE_WIDTH, dtype=np.float32)
self.prev_brake_5ms2_probs = np.zeros(ModelConstants.FCW_5MS2_PROBS_WIDTH, dtype=np.float32)
self.prev_brake_3ms2_probs = np.zeros(ModelConstants.FCW_3MS2_PROBS_WIDTH, dtype=np.float32)
def fill_xyzt(builder, t, x, y, z, x_std=None, y_std=None, z_std=None):
builder.t = t
builder.x = x.tolist()
builder.y = y.tolist()
builder.z = z.tolist()
if x_std is not None:
builder.xStd = x_std.tolist()
if y_std is not None:
builder.yStd = y_std.tolist()
if z_std is not None:
builder.zStd = z_std.tolist()
def fill_xyvat(builder, t, x, y, v, a, x_std=None, y_std=None, v_std=None, a_std=None):
builder.t = t
builder.x = x.tolist()
builder.y = y.tolist()
builder.v = v.tolist()
builder.a = a.tolist()
if x_std is not None:
builder.xStd = x_std.tolist()
if y_std is not None:
builder.yStd = y_std.tolist()
if v_std is not None:
builder.vStd = v_std.tolist()
if a_std is not None:
builder.aStd = a_std.tolist()
def fill_xyz_poly(builder, degree, x, y, z):
xyz = np.stack([x, y, z], axis=1)
coeffs = np.polynomial.polynomial.polyfit(ModelConstants.T_IDXS, xyz, deg=degree)
builder.xCoefficients = coeffs[:, 0].tolist()
builder.yCoefficients = coeffs[:, 1].tolist()
builder.zCoefficients = coeffs[:, 2].tolist()
def fill_model_msg(base_msg: capnp._DynamicStructBuilder, extended_msg: capnp._DynamicStructBuilder,
net_output_data: dict[str, np.ndarray], action: log.ModelDataV2.Action, publish_state: PublishState,
vipc_frame_id: int, vipc_frame_id_extra: int, frame_id: int, frame_drop: float,
timestamp_eof: int, model_execution_time: float, valid: bool,
v_ego: float, steer_delay: float, meta_const) -> None:
frame_age = frame_id - vipc_frame_id if frame_id > vipc_frame_id else 0
frame_drop_perc = frame_drop * 100
extended_msg.valid = valid
base_msg.valid = valid
if 'lat_planner_solution' in net_output_data:
x, y, yaw, yawRate = [net_output_data['lat_planner_solution'][0, :, i].tolist() for i in range(4)]
x_sol = np.column_stack([x, y, yaw, yawRate])
v_ego = max(MIN_SPEED, v_ego)
psis = x_sol[0:CONTROL_N, 2].tolist()
curvatures = (x_sol[0:CONTROL_N, 3] / v_ego).tolist()
desired_curvature = get_lag_adjusted_curvature(steer_delay, v_ego, psis, curvatures)
else:
desired_curvature = float(net_output_data['desired_curvature'][0, 0])
driving_model_data = base_msg.drivingModelData
driving_model_data.frameId = vipc_frame_id
driving_model_data.frameIdExtra = vipc_frame_id_extra
driving_model_data.frameDropPerc = frame_drop_perc
driving_model_data.modelExecutionTime = model_execution_time
# Populate drivingModelData.action
driving_model_data_action = driving_model_data.action
driving_model_data_action.desiredAcceleration = action.desiredAcceleration
driving_model_data_action.shouldStop = action.shouldStop
driving_model_data_action.desiredCurvature = desired_curvature
modelV2 = extended_msg.modelV2
modelV2.frameId = vipc_frame_id
modelV2.frameIdExtra = vipc_frame_id_extra
modelV2.frameAge = frame_age
modelV2.frameDropPerc = frame_drop_perc
modelV2.timestampEof = timestamp_eof
modelV2.modelExecutionTime = model_execution_time
# plan
position = modelV2.position
fill_xyzt(position, ModelConstants.T_IDXS, *net_output_data['plan'][0,:,Plan.POSITION].T, *net_output_data['plan_stds'][0,:,Plan.POSITION].T)
velocity = modelV2.velocity
fill_xyzt(velocity, ModelConstants.T_IDXS, *net_output_data['plan'][0,:,Plan.VELOCITY].T)
acceleration = modelV2.acceleration
fill_xyzt(acceleration, ModelConstants.T_IDXS, *net_output_data['plan'][0,:,Plan.ACCELERATION].T)
orientation = modelV2.orientation
fill_xyzt(orientation, ModelConstants.T_IDXS, *net_output_data['plan'][0,:,Plan.T_FROM_CURRENT_EULER].T)
orientation_rate = modelV2.orientationRate
fill_xyzt(orientation_rate, ModelConstants.T_IDXS, *net_output_data['plan'][0,:,Plan.ORIENTATION_RATE].T)
# temporal pose
temporal_pose = modelV2.temporalPoseDEPRECATED
temporal_pose.trans = net_output_data['plan'][0,0,Plan.VELOCITY].tolist()
temporal_pose.transStd = net_output_data['plan_stds'][0,0,Plan.VELOCITY].tolist()
temporal_pose.rot = net_output_data['plan'][0,0,Plan.ORIENTATION_RATE].tolist()
temporal_pose.rotStd = net_output_data['plan_stds'][0,0,Plan.ORIENTATION_RATE].tolist()
# poly path
poly_path = driving_model_data.path
fill_xyz_poly(poly_path, ModelConstants.POLY_PATH_DEGREE, *net_output_data['plan'][0,:,Plan.POSITION].T)
# lateral planning
modelV2_action = modelV2.action
modelV2_action.desiredAcceleration = action.desiredAcceleration
modelV2_action.shouldStop = action.shouldStop
modelV2_action.desiredCurvature = desired_curvature
# times at X_IDXS according to model plan
PLAN_T_IDXS: list[float] = plan_x_idxs_helper(ModelConstants, Plan, net_output_data)
# lane lines
modelV2.init('laneLines', 4)
for i in range(4):
lane_line = modelV2.laneLines[i]
fill_xyzt(lane_line, PLAN_T_IDXS, np.array(ModelConstants.X_IDXS), net_output_data['lane_lines'][0,i,:,0], net_output_data['lane_lines'][0,i,:,1])
modelV2.laneLineStds = net_output_data['lane_lines_stds'][0,:,0,0].tolist()
modelV2.laneLineProbs = net_output_data['lane_lines_prob'][0,1::2].tolist()
lane_line_meta = driving_model_data.laneLineMeta
lane_line_meta.leftY = modelV2.laneLines[1].y[0]
lane_line_meta.leftProb = modelV2.laneLineProbs[1]
lane_line_meta.rightY = modelV2.laneLines[2].y[0]
lane_line_meta.rightProb = modelV2.laneLineProbs[2]
# road edges
modelV2.init('roadEdges', 2)
for i in range(2):
road_edge = modelV2.roadEdges[i]
fill_xyzt(road_edge, PLAN_T_IDXS, np.array(ModelConstants.X_IDXS), net_output_data['road_edges'][0,i,:,0], net_output_data['road_edges'][0,i,:,1])
modelV2.roadEdgeStds = net_output_data['road_edges_stds'][0,:,0,0].tolist()
# leads
modelV2.init('leadsV3', 3)
for i in range(3):
lead = modelV2.leadsV3[i]
fill_xyvat(lead, ModelConstants.LEAD_T_IDXS, *net_output_data['lead'][0,i].T, *net_output_data['lead_stds'][0,i].T)
lead.prob = net_output_data['lead_prob'][0,i].tolist()
lead.probTime = ModelConstants.LEAD_T_OFFSETS[i]
# meta
meta = modelV2.meta
meta.desireState = net_output_data['desire_state'][0].reshape(-1).tolist()
meta.desirePrediction = net_output_data['desire_pred'][0].reshape(-1).tolist()
meta.engagedProb = net_output_data['meta'][0,meta_const.ENGAGED].item()
meta.init('disengagePredictions')
disengage_predictions = meta.disengagePredictions
disengage_predictions.t = ModelConstants.META_T_IDXS
disengage_predictions.brakeDisengageProbs = net_output_data['meta'][0,meta_const.BRAKE_DISENGAGE].tolist()
disengage_predictions.gasDisengageProbs = net_output_data['meta'][0,meta_const.GAS_DISENGAGE].tolist()
disengage_predictions.steerOverrideProbs = net_output_data['meta'][0,meta_const.STEER_OVERRIDE].tolist()
disengage_predictions.brake3MetersPerSecondSquaredProbs = net_output_data['meta'][0,meta_const.HARD_BRAKE_3].tolist()
disengage_predictions.brake4MetersPerSecondSquaredProbs = net_output_data['meta'][0,meta_const.HARD_BRAKE_4].tolist()
disengage_predictions.brake5MetersPerSecondSquaredProbs = net_output_data['meta'][0,meta_const.HARD_BRAKE_5].tolist()
if 'sim_pose' not in net_output_data:
disengage_predictions.gasPressProbs = net_output_data['meta'][0,meta_const.GAS_PRESS].tolist()
disengage_predictions.brakePressProbs = net_output_data['meta'][0,meta_const.BRAKE_PRESS].tolist()
publish_state.prev_brake_5ms2_probs[:-1] = publish_state.prev_brake_5ms2_probs[1:]
publish_state.prev_brake_5ms2_probs[-1] = net_output_data['meta'][0,meta_const.HARD_BRAKE_5][0]
publish_state.prev_brake_3ms2_probs[:-1] = publish_state.prev_brake_3ms2_probs[1:]
publish_state.prev_brake_3ms2_probs[-1] = net_output_data['meta'][0,meta_const.HARD_BRAKE_3][0]
hard_brake_predicted = (publish_state.prev_brake_5ms2_probs > ModelConstants.FCW_THRESHOLDS_5MS2).all() and \
(publish_state.prev_brake_3ms2_probs > ModelConstants.FCW_THRESHOLDS_3MS2).all()
meta.hardBrakePredicted = hard_brake_predicted.item()
# confidence
if vipc_frame_id % (2*ModelConstants.MODEL_FREQ) == 0:
# any disengage prob
brake_disengage_probs = net_output_data['meta'][0,meta_const.BRAKE_DISENGAGE]
gas_disengage_probs = net_output_data['meta'][0,meta_const.GAS_DISENGAGE]
steer_override_probs = net_output_data['meta'][0,meta_const.STEER_OVERRIDE]
any_disengage_probs = 1-((1-brake_disengage_probs)*(1-gas_disengage_probs)*(1-steer_override_probs))
# independent disengage prob for each 2s slice
ind_disengage_probs = np.r_[any_disengage_probs[0], np.diff(any_disengage_probs) / (1 - any_disengage_probs[:-1])]
# rolling buf for 2, 4, 6, 8, 10s
publish_state.disengage_buffer[:-ModelConstants.DISENGAGE_WIDTH] = publish_state.disengage_buffer[ModelConstants.DISENGAGE_WIDTH:]
publish_state.disengage_buffer[-ModelConstants.DISENGAGE_WIDTH:] = ind_disengage_probs
score = 0.
for i in range(ModelConstants.DISENGAGE_WIDTH):
score += publish_state.disengage_buffer[i*ModelConstants.DISENGAGE_WIDTH+ModelConstants.DISENGAGE_WIDTH-1-i].item() / ModelConstants.DISENGAGE_WIDTH
if score < ModelConstants.RYG_GREEN:
modelV2.confidence = ConfidenceClass.green
elif score < ModelConstants.RYG_YELLOW:
modelV2.confidence = ConfidenceClass.yellow
else:
modelV2.confidence = ConfidenceClass.red
# raw prediction if enabled
if SEND_RAW_PRED:
modelV2.rawPredictions = net_output_data['raw_pred'].tobytes()
def fill_pose_msg(msg: capnp._DynamicStructBuilder, net_output_data: dict[str, np.ndarray],
vipc_frame_id: int, vipc_dropped_frames: int, timestamp_eof: int, live_calib_seen: bool) -> None:
msg.valid = live_calib_seen & (vipc_dropped_frames < 1)
cameraOdometry = msg.cameraOdometry
cameraOdometry.frameId = vipc_frame_id
cameraOdometry.timestampEof = timestamp_eof
cameraOdometry.trans = net_output_data['pose'][0,:3].tolist()
cameraOdometry.rot = net_output_data['pose'][0,3:].tolist()
cameraOdometry.wideFromDeviceEuler = net_output_data['wide_from_device_euler'][0,:].tolist()
cameraOdometry.roadTransformTrans = net_output_data['road_transform'][0,:3].tolist()
cameraOdometry.transStd = net_output_data['pose_stds'][0,:3].tolist()
cameraOdometry.rotStd = net_output_data['pose_stds'][0,3:].tolist()
cameraOdometry.wideFromDeviceEulerStd = net_output_data['wide_from_device_euler_stds'][0,:].tolist()
cameraOdometry.roadTransformTransStd = net_output_data['road_transform_stds'][0,:3].tolist()