Files
sunnypilot/selfdrive/modeld/parse_model_outputs.py
Harald Schäfer 2c162d9b75 Tomb raider 2 (#35029)
* db56b8fb-9135-4ab6-af18-99b7df7b2245/400

* fixes

* linter unhappy

* 6dbe0991-baa1-49ad-836a-ab370d1f0d92/400

* This one is good: 19387087-1005-475e-9015-9458dd8e7c5f/400

* Better every day: 39ed911c-0937-417f-97d2-58a8bb3caa53/400

* Actually end-to-end

* typo

* smooooooth: 94e23541-eb84-4fef-9f51-6a2d82aff314/360

* Revert "smooooooth: 94e23541-eb84-4fef-9f51-6a2d82aff314/360"

This reverts commit edd4f02386d83d82dd8a188985cde80ed1646b7f.

* 11632ef7-f555-489c-8480-e3bf97d9285e/400

* 08712d27-f6bd-4536-a30e-c729e5f62356/400

* 0a92a35e-1f72-476a-8cb6-c9f103f36822/400

* ee6d2394-2072-420c-a664-b4c0d4ed0b61/400

* no prev curv

* No double work

* fix bug

* smooth

* update prev action

* whitespace

* add little accel

* new ref

* Update plant.py
2025-04-17 23:21:25 -07:00

115 lines
5.0 KiB
Python

import numpy as np
from openpilot.selfdrive.modeld.constants import ModelConstants
def safe_exp(x, out=None):
# -11 is around 10**14, more causes float16 overflow
return np.exp(np.clip(x, -np.inf, 11), out=out)
def sigmoid(x):
return 1. / (1. + safe_exp(-x))
def softmax(x, axis=-1):
x -= np.max(x, axis=axis, keepdims=True)
if x.dtype == np.float32 or x.dtype == np.float64:
safe_exp(x, out=x)
else:
x = safe_exp(x)
x /= np.sum(x, axis=axis, keepdims=True)
return x
class Parser:
def __init__(self, ignore_missing=False):
self.ignore_missing = ignore_missing
def check_missing(self, outs, name):
if name not in outs and not self.ignore_missing:
raise ValueError(f"Missing output {name}")
return name not in outs
def parse_categorical_crossentropy(self, name, outs, out_shape=None):
if self.check_missing(outs, name):
return
raw = outs[name]
if out_shape is not None:
raw = raw.reshape((raw.shape[0],) + out_shape)
outs[name] = softmax(raw, axis=-1)
def parse_binary_crossentropy(self, name, outs):
if self.check_missing(outs, name):
return
raw = outs[name]
outs[name] = sigmoid(raw)
def parse_mdn(self, name, outs, in_N=0, out_N=1, out_shape=None):
if self.check_missing(outs, name):
return
raw = outs[name]
raw = raw.reshape((raw.shape[0], max(in_N, 1), -1))
n_values = (raw.shape[2] - out_N)//2
pred_mu = raw[:,:,:n_values]
pred_std = safe_exp(raw[:,:,n_values: 2*n_values])
if in_N > 1:
weights = np.zeros((raw.shape[0], in_N, out_N), dtype=raw.dtype)
for i in range(out_N):
weights[:,:,i - out_N] = softmax(raw[:,:,i - out_N], axis=-1)
if out_N == 1:
for fidx in range(weights.shape[0]):
idxs = np.argsort(weights[fidx][:,0])[::-1]
weights[fidx] = weights[fidx][idxs]
pred_mu[fidx] = pred_mu[fidx][idxs]
pred_std[fidx] = pred_std[fidx][idxs]
full_shape = tuple([raw.shape[0], in_N] + list(out_shape))
outs[name + '_weights'] = weights
outs[name + '_hypotheses'] = pred_mu.reshape(full_shape)
outs[name + '_stds_hypotheses'] = pred_std.reshape(full_shape)
pred_mu_final = np.zeros((raw.shape[0], out_N, n_values), dtype=raw.dtype)
pred_std_final = np.zeros((raw.shape[0], out_N, n_values), dtype=raw.dtype)
for fidx in range(weights.shape[0]):
for hidx in range(out_N):
idxs = np.argsort(weights[fidx,:,hidx])[::-1]
pred_mu_final[fidx, hidx] = pred_mu[fidx, idxs[0]]
pred_std_final[fidx, hidx] = pred_std[fidx, idxs[0]]
else:
pred_mu_final = pred_mu
pred_std_final = pred_std
if out_N > 1:
final_shape = tuple([raw.shape[0], out_N] + list(out_shape))
else:
final_shape = tuple([raw.shape[0],] + list(out_shape))
outs[name] = pred_mu_final.reshape(final_shape)
outs[name + '_stds'] = pred_std_final.reshape(final_shape)
def parse_vision_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
self.parse_mdn('pose', outs, in_N=0, out_N=0, out_shape=(ModelConstants.POSE_WIDTH,))
self.parse_mdn('wide_from_device_euler', outs, in_N=0, out_N=0, out_shape=(ModelConstants.WIDE_FROM_DEVICE_WIDTH,))
self.parse_mdn('road_transform', outs, in_N=0, out_N=0, out_shape=(ModelConstants.POSE_WIDTH,))
self.parse_mdn('lane_lines', outs, in_N=0, out_N=0, out_shape=(ModelConstants.NUM_LANE_LINES,ModelConstants.IDX_N,ModelConstants.LANE_LINES_WIDTH))
self.parse_mdn('road_edges', outs, in_N=0, out_N=0, out_shape=(ModelConstants.NUM_ROAD_EDGES,ModelConstants.IDX_N,ModelConstants.LANE_LINES_WIDTH))
self.parse_mdn('lead', outs, in_N=ModelConstants.LEAD_MHP_N, out_N=ModelConstants.LEAD_MHP_SELECTION,
out_shape=(ModelConstants.LEAD_TRAJ_LEN,ModelConstants.LEAD_WIDTH))
for k in ['lead_prob', 'lane_lines_prob']:
self.parse_binary_crossentropy(k, outs)
self.parse_categorical_crossentropy('desire_pred', outs, out_shape=(ModelConstants.DESIRE_PRED_LEN,ModelConstants.DESIRE_PRED_WIDTH))
self.parse_binary_crossentropy('meta', outs)
return outs
def parse_policy_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
self.parse_mdn('plan', outs, in_N=ModelConstants.PLAN_MHP_N, out_N=ModelConstants.PLAN_MHP_SELECTION,
out_shape=(ModelConstants.IDX_N,ModelConstants.PLAN_WIDTH))
if 'lat_planner_solution' in outs:
self.parse_mdn('lat_planner_solution', outs, in_N=0, out_N=0, out_shape=(ModelConstants.IDX_N,ModelConstants.LAT_PLANNER_SOLUTION_WIDTH))
if 'desired_curvature' in outs:
self.parse_mdn('desired_curvature', outs, in_N=0, out_N=0, out_shape=(ModelConstants.DESIRED_CURV_WIDTH,))
self.parse_categorical_crossentropy('desire_state', outs, out_shape=(ModelConstants.DESIRE_PRED_WIDTH,))
return outs
def parse_outputs(self, outs: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
outs = self.parse_vision_outputs(outs)
outs = self.parse_policy_outputs(outs)
return outs