# functions common among cars from collections import namedtuple from dataclasses import dataclass from enum import IntFlag, ReprEnum, EnumType from dataclasses import replace import capnp from cereal import car from openpilot.common.numpy_fast import clip, interp from openpilot.common.utils import Freezable from openpilot.selfdrive.car.docs_definitions import CarDocs # kg of standard extra cargo to count for drive, gas, etc... STD_CARGO_KG = 136. ButtonType = car.CarState.ButtonEvent.Type EventName = car.CarEvent.EventName AngleRateLimit = namedtuple('AngleRateLimit', ['speed_bp', 'angle_v']) def apply_hysteresis(val: float, val_steady: float, hyst_gap: float) -> float: if val > val_steady + hyst_gap: val_steady = val - hyst_gap elif val < val_steady - hyst_gap: val_steady = val + hyst_gap return val_steady def create_button_events(cur_btn: int, prev_btn: int, buttons_dict: dict[int, capnp.lib.capnp._EnumModule], unpressed_btn: int = 0) -> list[capnp.lib.capnp._DynamicStructBuilder]: events: list[capnp.lib.capnp._DynamicStructBuilder] = [] if cur_btn == prev_btn: return events # Add events for button presses, multiple when a button switches without going to unpressed for pressed, btn in ((False, prev_btn), (True, cur_btn)): if btn != unpressed_btn: events.append(car.CarState.ButtonEvent(pressed=pressed, type=buttons_dict.get(btn, ButtonType.unknown))) return events def gen_empty_fingerprint(): return {i: {} for i in range(8)} # these params were derived for the Civic and used to calculate params for other cars class VehicleDynamicsParams: MASS = 1326. + STD_CARGO_KG WHEELBASE = 2.70 CENTER_TO_FRONT = WHEELBASE * 0.4 CENTER_TO_REAR = WHEELBASE - CENTER_TO_FRONT ROTATIONAL_INERTIA = 2500 TIRE_STIFFNESS_FRONT = 192150 TIRE_STIFFNESS_REAR = 202500 # TODO: get actual value, for now starting with reasonable value for # civic and scaling by mass and wheelbase def scale_rot_inertia(mass, wheelbase): return VehicleDynamicsParams.ROTATIONAL_INERTIA * mass * wheelbase ** 2 / (VehicleDynamicsParams.MASS * VehicleDynamicsParams.WHEELBASE ** 2) # TODO: start from empirically derived lateral slip stiffness for the civic and scale by # mass and CG position, so all cars will have approximately similar dyn behaviors def scale_tire_stiffness(mass, wheelbase, center_to_front, tire_stiffness_factor): center_to_rear = wheelbase - center_to_front tire_stiffness_front = (VehicleDynamicsParams.TIRE_STIFFNESS_FRONT * tire_stiffness_factor) * mass / VehicleDynamicsParams.MASS * \ (center_to_rear / wheelbase) / (VehicleDynamicsParams.CENTER_TO_REAR / VehicleDynamicsParams.WHEELBASE) tire_stiffness_rear = (VehicleDynamicsParams.TIRE_STIFFNESS_REAR * tire_stiffness_factor) * mass / VehicleDynamicsParams.MASS * \ (center_to_front / wheelbase) / (VehicleDynamicsParams.CENTER_TO_FRONT / VehicleDynamicsParams.WHEELBASE) return tire_stiffness_front, tire_stiffness_rear DbcDict = dict[str, str] def dbc_dict(pt_dbc, radar_dbc, chassis_dbc=None, body_dbc=None) -> DbcDict: return {'pt': pt_dbc, 'radar': radar_dbc, 'chassis': chassis_dbc, 'body': body_dbc} def apply_driver_steer_torque_limits(apply_torque, apply_torque_last, driver_torque, LIMITS): # limits due to driver torque driver_max_torque = LIMITS.STEER_MAX + (LIMITS.STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS.STEER_DRIVER_FACTOR) * LIMITS.STEER_DRIVER_MULTIPLIER driver_min_torque = -LIMITS.STEER_MAX + (-LIMITS.STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS.STEER_DRIVER_FACTOR) * LIMITS.STEER_DRIVER_MULTIPLIER max_steer_allowed = max(min(LIMITS.STEER_MAX, driver_max_torque), 0) min_steer_allowed = min(max(-LIMITS.STEER_MAX, driver_min_torque), 0) apply_torque = clip(apply_torque, min_steer_allowed, max_steer_allowed) # slow rate if steer torque increases in magnitude if apply_torque_last > 0: apply_torque = clip(apply_torque, max(apply_torque_last - LIMITS.STEER_DELTA_DOWN, -LIMITS.STEER_DELTA_UP), apply_torque_last + LIMITS.STEER_DELTA_UP) else: apply_torque = clip(apply_torque, apply_torque_last - LIMITS.STEER_DELTA_UP, min(apply_torque_last + LIMITS.STEER_DELTA_DOWN, LIMITS.STEER_DELTA_UP)) return int(round(float(apply_torque))) def apply_dist_to_meas_limits(val, val_last, val_meas, STEER_DELTA_UP, STEER_DELTA_DOWN, STEER_ERROR_MAX, STEER_MAX): # limits due to comparison of commanded val VS measured val (torque/angle/curvature) max_lim = min(max(val_meas + STEER_ERROR_MAX, STEER_ERROR_MAX), STEER_MAX) min_lim = max(min(val_meas - STEER_ERROR_MAX, -STEER_ERROR_MAX), -STEER_MAX) val = clip(val, min_lim, max_lim) # slow rate if val increases in magnitude if val_last > 0: val = clip(val, max(val_last - STEER_DELTA_DOWN, -STEER_DELTA_UP), val_last + STEER_DELTA_UP) else: val = clip(val, val_last - STEER_DELTA_UP, min(val_last + STEER_DELTA_DOWN, STEER_DELTA_UP)) return float(val) def apply_meas_steer_torque_limits(apply_torque, apply_torque_last, motor_torque, LIMITS): return int(round(apply_dist_to_meas_limits(apply_torque, apply_torque_last, motor_torque, LIMITS.STEER_DELTA_UP, LIMITS.STEER_DELTA_DOWN, LIMITS.STEER_ERROR_MAX, LIMITS.STEER_MAX))) def apply_std_steer_angle_limits(apply_angle, apply_angle_last, v_ego, LIMITS): # pick angle rate limits based on wind up/down steer_up = apply_angle_last * apply_angle >= 0. and abs(apply_angle) > abs(apply_angle_last) rate_limits = LIMITS.ANGLE_RATE_LIMIT_UP if steer_up else LIMITS.ANGLE_RATE_LIMIT_DOWN angle_rate_lim = interp(v_ego, rate_limits.speed_bp, rate_limits.angle_v) return clip(apply_angle, apply_angle_last - angle_rate_lim, apply_angle_last + angle_rate_lim) def common_fault_avoidance(fault_condition: bool, request: bool, above_limit_frames: int, max_above_limit_frames: int, max_mismatching_frames: int = 1): """ Several cars have the ability to work around their EPS limits by cutting the request bit of their LKAS message after a certain number of frames above the limit. """ # Count up to max_above_limit_frames, at which point we need to cut the request for above_limit_frames to avoid a fault if request and fault_condition: above_limit_frames += 1 else: above_limit_frames = 0 # Once we cut the request bit, count additionally to max_mismatching_frames before setting the request bit high again. # Some brands do not respect our workaround without multiple messages on the bus, for example if above_limit_frames > max_above_limit_frames: request = False if above_limit_frames >= max_above_limit_frames + max_mismatching_frames: above_limit_frames = 0 return above_limit_frames, request def crc8_pedal(data): crc = 0xFF # standard init value poly = 0xD5 # standard crc8: x8+x7+x6+x4+x2+1 size = len(data) for i in range(size - 1, -1, -1): crc ^= data[i] for _ in range(8): if ((crc & 0x80) != 0): crc = ((crc << 1) ^ poly) & 0xFF else: crc <<= 1 return crc def create_gas_interceptor_command(packer, gas_amount, idx): # Common gas pedal msg generator enable = gas_amount > 0.001 values = { "ENABLE": enable, "COUNTER_PEDAL": idx & 0xF, } if enable: values["GAS_COMMAND"] = gas_amount * 255. values["GAS_COMMAND2"] = gas_amount * 255. dat = packer.make_can_msg("GAS_COMMAND", 0, values)[2] checksum = crc8_pedal(dat[:-1]) values["CHECKSUM_PEDAL"] = checksum return packer.make_can_msg("GAS_COMMAND", 0, values) def make_can_msg(addr, dat, bus): return [addr, 0, dat, bus] def get_safety_config(safety_model, safety_param = None): ret = car.CarParams.SafetyConfig.new_message() ret.safetyModel = safety_model if safety_param is not None: ret.safetyParam = safety_param return ret class CanBusBase: offset: int def __init__(self, CP, fingerprint: dict[int, dict[int, int]] | None) -> None: if CP is None: assert fingerprint is not None num = max([k for k, v in fingerprint.items() if len(v)], default=0) // 4 + 1 else: num = len(CP.safetyConfigs) self.offset = 4 * (num - 1) class CanSignalRateCalculator: """ Calculates the instantaneous rate of a CAN signal by using the counter variable and the known frequency of the CAN message that contains it. """ def __init__(self, frequency): self.frequency = frequency self.previous_counter = 0 self.previous_value = 0 self.rate = 0 def update(self, current_value, current_counter): if current_counter != self.previous_counter: self.rate = (current_value - self.previous_value) * self.frequency self.previous_counter = current_counter self.previous_value = current_value return self.rate @dataclass(frozen=True, kw_only=True) class CarSpecs: mass: float # kg, curb weight wheelbase: float # meters steerRatio: float centerToFrontRatio: float = 0.5 minSteerSpeed: float = 0.0 # m/s minEnableSpeed: float = -1.0 # m/s tireStiffnessFactor: float = 1.0 def override(self, **kwargs): return replace(self, **kwargs) @dataclass(order=True) class PlatformConfig(Freezable): car_docs: list[CarDocs] specs: CarSpecs dbc_dict: DbcDict flags: int = 0 platform_str: str | None = None def __hash__(self) -> int: return hash(self.platform_str) def override(self, **kwargs): return replace(self, **kwargs) def init(self): pass def __post_init__(self): self.init() class PlatformsType(EnumType): def __new__(metacls, cls, bases, classdict, *, boundary=None, _simple=False, **kwds): for key in classdict._member_names.keys(): cfg: PlatformConfig = classdict[key] cfg.platform_str = key cfg.freeze() return super().__new__(metacls, cls, bases, classdict, boundary=boundary, _simple=_simple, **kwds) class Platforms(str, ReprEnum, metaclass=PlatformsType): config: PlatformConfig def __new__(cls, platform_config: PlatformConfig): member = str.__new__(cls, platform_config.platform_str) member.config = platform_config member._value_ = platform_config.platform_str return member def __repr__(self): return f"<{self.__class__.__name__}.{self.name}>" @classmethod def create_dbc_map(cls) -> dict[str, DbcDict]: return {p: p.config.dbc_dict for p in cls} @classmethod def with_flags(cls, flags: IntFlag) -> set['Platforms']: return {p for p in cls if p.config.flags & flags}