Files
sunnypilot/selfdrive/locationd/calibrationd.py
Adeeb Shihadeh e0004d0981 Reduce paramsd and calibrationd CPU usage (#2119)
* reduce paramsd cpu

* reduce calibrationd cpu usage

* calibration_helpers was mostly unused

* more calibration cleanup

* update refs

* fix thresholds in CPU test
2020-09-10 12:16:29 -07:00

174 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
'''
This process finds calibration values. More info on what these calibration values
are can be found here https://github.com/commaai/openpilot/tree/master/common/transformations
While the roll calibration is a real value that can be estimated, here we assume it's zero,
and the image input into the neural network is not corrected for roll.
'''
import os
import copy
import json
import numpy as np
import cereal.messaging as messaging
from selfdrive.config import Conversions as CV
from selfdrive.swaglog import cloudlog
from common.params import Params, put_nonblocking
from common.transformations.model import model_height
from common.transformations.camera import get_view_frame_from_road_frame
from common.transformations.orientation import rot_from_euler, euler_from_rot
MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS
MAX_VEL_ANGLE_STD = np.radians(0.25)
MAX_YAW_RATE_FILTER = np.radians(2) # per second
# This is all 20Hz, blocks needed for efficiency
BLOCK_SIZE = 100
INPUTS_NEEDED = 5 # Minimum blocks needed for valid calibration
INPUTS_WANTED = 50 # We want a little bit more than we need for stability
RPY_INIT = np.array([0,0,0])
# These values are needed to accomodate biggest modelframe
PITCH_LIMITS = np.array([-0.09074112085129739, 0.14907572052989657])
YAW_LIMITS = np.array([-0.06912048084718224, 0.06912048084718235])
DEBUG = os.getenv("DEBUG") is not None
class Calibration:
UNCALIBRATED = 0
CALIBRATED = 1
INVALID = 2
def is_calibration_valid(rpy):
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])
def sanity_clip(rpy):
if np.isnan(rpy).any():
rpy = RPY_INIT
return np.array([rpy[0],
np.clip(rpy[1], PITCH_LIMITS[0] - .005, PITCH_LIMITS[1] + .005),
np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)])
class Calibrator():
def __init__(self, param_put=False):
self.param_put = param_put
self.rpy = copy.copy(RPY_INIT)
self.rpys = np.zeros((INPUTS_WANTED, 3))
self.idx = 0
self.block_idx = 0
self.valid_blocks = 0
self.cal_status = Calibration.UNCALIBRATED
self.just_calibrated = False
self.v_ego = 0
# Read saved calibration
calibration_params = Params().get("CalibrationParams")
if param_put and calibration_params:
try:
calibration_params = json.loads(calibration_params)
self.rpy = calibration_params["calib_radians"]
if not np.isfinite(self.rpy).all():
self.rpy = copy.copy(RPY_INIT)
self.rpys = np.tile(self.rpy, (INPUTS_WANTED, 1))
self.valid_blocks = calibration_params['valid_blocks']
if not np.isfinite(self.valid_blocks) or self.valid_blocks < 0:
self.valid_blocks = 0
self.update_status()
except Exception:
cloudlog.exception("CalibrationParams file found but error encountered")
def update_status(self):
start_status = self.cal_status
if self.valid_blocks < INPUTS_NEEDED:
self.cal_status = Calibration.UNCALIBRATED
else:
self.cal_status = Calibration.CALIBRATED if is_calibration_valid(self.rpy) else Calibration.INVALID
self.just_calibrated = start_status == Calibration.UNCALIBRATED and self.cal_status != Calibration.UNCALIBRATED
def handle_v_ego(self, v_ego):
self.v_ego = v_ego
def handle_cam_odom(self, trans, rot, trans_std, rot_std):
straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
certain_if_calib = ((np.arctan2(trans_std[1], trans[0]) < MAX_VEL_ANGLE_STD) or
(self.valid_blocks < INPUTS_NEEDED))
if straight_and_fast and certain_if_calib:
observed_rpy = np.array([0,
-np.arctan2(trans[2], trans[0]),
np.arctan2(trans[1], trans[0])])
new_rpy = euler_from_rot(rot_from_euler(self.rpy).dot(rot_from_euler(observed_rpy)))
new_rpy = sanity_clip(new_rpy)
self.rpys[self.block_idx] = (self.idx*self.rpys[self.block_idx] + (BLOCK_SIZE - self.idx) * new_rpy) / float(BLOCK_SIZE)
self.idx = (self.idx + 1) % BLOCK_SIZE
if self.idx == 0:
self.block_idx += 1
self.valid_blocks = max(self.block_idx, self.valid_blocks)
self.block_idx = self.block_idx % INPUTS_WANTED
if self.valid_blocks > 0:
self.rpy = np.mean(self.rpys[:self.valid_blocks], axis=0)
self.update_status()
# TODO: this should use the liveCalibration struct from cereal
if self.param_put and ((self.idx == 0 and self.block_idx == 0) or self.just_calibrated):
cal_params = {"calib_radians": list(self.rpy),
"valid_blocks": self.valid_blocks}
put_nonblocking("CalibrationParams", json.dumps(cal_params).encode('utf8'))
return new_rpy
else:
return None
def send_data(self, pm):
if self.valid_blocks > 0:
max_rpy_calib = np.array(np.max(self.rpys[:self.valid_blocks], axis=0))
min_rpy_calib = np.array(np.min(self.rpys[:self.valid_blocks], axis=0))
calib_spread = np.abs(max_rpy_calib - min_rpy_calib)
else:
calib_spread = np.zeros(3)
extrinsic_matrix = get_view_frame_from_road_frame(0, self.rpy[1], self.rpy[2], model_height)
cal_send = messaging.new_message('liveCalibration')
cal_send.liveCalibration.validBlocks = self.valid_blocks
cal_send.liveCalibration.calStatus = self.cal_status
cal_send.liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100)
cal_send.liveCalibration.extrinsicMatrix = [float(x) for x in extrinsic_matrix.flatten()]
cal_send.liveCalibration.rpyCalib = [float(x) for x in self.rpy]
cal_send.liveCalibration.rpyCalibSpread = [float(x) for x in calib_spread]
pm.send('liveCalibration', cal_send)
def calibrationd_thread(sm=None, pm=None):
if sm is None:
sm = messaging.SubMaster(['cameraOdometry', 'carState'], poll=['cameraOdometry'])
if pm is None:
pm = messaging.PubMaster(['liveCalibration'])
calibrator = Calibrator(param_put=True)
while 1:
sm.update(100)
if sm.updated['cameraOdometry']:
calibrator.handle_v_ego(sm['carState'].vEgo)
new_rpy = calibrator.handle_cam_odom(sm['cameraOdometry'].trans,
sm['cameraOdometry'].rot,
sm['cameraOdometry'].transStd,
sm['cameraOdometry'].rotStd)
if DEBUG and new_rpy is not None:
print('got new rpy', new_rpy)
# 4Hz driven by cameraOdometry
if sm.frame % 5 == 0:
calibrator.send_data(pm)
def main(sm=None, pm=None):
calibrationd_thread(sm, pm)
if __name__ == "__main__":
main()