ods777/selfdrive/controls/controlsd.py

1125 lines
49 KiB
Python

#!/usr/bin/env python3
import os
import math
import random
import time
import threading
from typing import SupportsFloat
import cereal.messaging as messaging
import openpilot.system.sentry as sentry
from cereal import car, custom, log
from msgq.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
from openpilot.common.git import get_short_branch
from openpilot.common.numpy_fast import clip
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.car.car_helpers import get_car_interface, get_startup_event
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, clip_curvature
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.system.hardware import HARDWARE
from openpilot.selfdrive.frogpilot.controls.lib.frogpilot_variables import CRUISING_SPEED, FrogPilotVariables
from openpilot.selfdrive.frogpilot.controls.lib.speed_limit_controller import SpeedLimitController
SOFT_DISABLE_TIME = 3 # seconds
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
CAMERA_OFFSET = 0.04
REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}
ThermalStatus = log.DeviceState.ThermalStatus
State = log.ControlsState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.Desire
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
EventName = car.CarEvent.EventName
ButtonType = car.CarState.ButtonEvent.Type
FrogPilotButtonType = custom.FrogPilotCarState.ButtonEvent.Type
GearShifter = car.CarState.GearShifter
SafetyModel = car.CarParams.SafetyModel
IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError}
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
class Controls:
def __init__(self, CI=None):
# FrogPilot variables
self.frogpilot_toggles = FrogPilotVariables.toggles
self.params_tracking = Params("/persist/tracking")
self.current_total_distance = self.params_tracking.get_float("FrogPilotKilometers")
self.current_total_drives = self.params_tracking.get_int("FrogPilotDrives")
self.current_total_time = self.params_tracking.get_float("FrogPilotMinutes")
self.always_on_lateral_active = False
self.drive_added = False
self.fcw_random_event_triggered = False
self.holiday_theme_alerted = False
self.onroad_distance_pressed = False
self.openpilot_crashed_triggered = False
self.previously_enabled = False
self.random_event_triggered = False
self.speed_check = False
self.speed_limit_changed = False
self.display_timer = 0
self.drive_distance = 0
self.drive_time = 0
self.max_acceleration = 0
self.previous_speed_limit = 0
self.previous_v_cruise = 0
self.random_event_timer = 0
self.speed_limit_timer = 0
self.params = Params()
self.params_memory = Params("/dev/shm/params")
if CI is None:
cloudlog.info("controlsd is waiting for CarParams")
with car.CarParams.from_bytes(self.params.get("CarParams", block=True)) as msg:
# TODO: this shouldn't need to be a builder
self.CP = msg.as_builder()
cloudlog.info("controlsd got CarParams")
# Uses car interface helper functions, altering state won't be considered by card for actuation
self.CI = get_car_interface(self.CP)
else:
self.CI, self.CP = CI, CI.CP
# Ensure the current branch is cached, otherwise the first iteration of controlsd lags
self.branch = get_short_branch()
self.block_user = self.branch == "FrogPilot-Development" and self.params.get("DongleId").decode('utf-8') != "FrogsGoMoo"
# Setup sockets
self.pm = messaging.PubMaster(['controlsState', 'carControl', 'onroadEvents', 'frogpilotCarControl'])
self.sensor_packets = ["accelerometer", "gyroscope"]
self.camera_packets = ["roadCameraState", "driverCameraState", "wideRoadCameraState"]
self.log_sock = messaging.sub_sock('androidLog')
# TODO: de-couple controlsd with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + ['testJoystick']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
# no vipc in replay will make them ignored anyways
ignore += ['roadCameraState', 'wideRoadCameraState']
if self.frogpilot_toggles.radarless_model:
ignore += ['radarState']
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'testJoystick', 'frogpilotCarState', 'frogpilotPlan'] + self.camera_packets + self.sensor_packets,
ignore_alive=ignore, ignore_avg_freq=ignore+['radarState', 'testJoystick'], ignore_valid=['testJoystick', ],
frequency=int(1/DT_CTRL))
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
# read params
self.is_metric = self.params.get_bool("IsMetric")
self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled")
# detect sound card presence and ensure successful init
sounds_available = HARDWARE.get_sound_card_online()
car_recognized = self.CP.carName != 'mock'
# cleanup old params
if not self.CP.experimentalLongitudinalAvailable:
self.params.remove("ExperimentalLongitudinalEnabled")
if not self.CP.openpilotLongitudinalControl:
self.params.remove("ExperimentalMode")
self.CS_prev = car.CarState.new_message()
self.AM = AlertManager()
self.events = Events()
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI)
self.initialized = False
self.state = State.disabled
self.enabled = False
self.active = False
self.soft_disable_timer = 0
self.mismatch_counter = 0
self.cruise_mismatch_counter = 0
self.last_blinker_frame = 0
self.last_steering_pressed_frame = 0
self.distance_traveled = 0
self.last_functional_fan_frame = 0
self.events_prev = []
self.current_alert_types = [ET.PERMANENT]
self.logged_comm_issue = None
self.not_running_prev = None
self.steer_limited = False
self.desired_curvature = 0.0
self.experimental_mode = False
self.personality = self.read_personality_param()
self.v_cruise_helper = VCruiseHelper(self.CP)
self.recalibrating_seen = False
self.can_log_mono_time = 0
self.startup_event = get_startup_event(car_recognized, not self.CP.passive, len(self.CP.carFw) > 0, self.block_user)
if not sounds_available:
self.events.add(EventName.soundsUnavailable, static=True)
if not car_recognized:
self.events.add(EventName.carUnrecognized, static=True)
if len(self.CP.carFw) > 0:
set_offroad_alert("Offroad_CarUnrecognized", True)
else:
set_offroad_alert("Offroad_NoFirmware", True)
elif self.CP.passive:
self.events.add(EventName.dashcamMode, static=True)
# controlsd is driven by carState, expected at 100Hz
self.rk = Ratekeeper(100, print_delay_threshold=None)
def set_initial_state(self):
if REPLAY:
controls_state = self.params.get("ReplayControlsState")
if controls_state is not None:
with log.ControlsState.from_bytes(controls_state) as controls_state:
self.v_cruise_helper.v_cruise_kph = controls_state.vCruise
if any(ps.controlsAllowed for ps in self.sm['pandaStates']):
self.state = State.enabled
def update_events(self, CS):
"""Compute onroadEvents from carState"""
self.events.clear()
# Add joystick event, static on cars, dynamic on nonCars
if self.joystick_mode:
self.events.add(EventName.joystickDebug)
self.startup_event = None
# Add startup event
if self.startup_event is not None:
self.events.add(self.startup_event)
self.startup_event = None
# Don't add any more events if not initialized
if not self.initialized:
self.events.add(EventName.controlsInitializing)
return
# no more events while in dashcam mode
if self.CP.passive:
return
# Block resume if cruise never previously enabled
resume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents)
if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and resume_pressed:
self.events.add(EventName.resumeBlocked)
if not self.CP.notCar:
self.events.add_from_msg(self.sm['driverMonitoringState'].events)
# Add car events, ignore if CAN isn't valid
if CS.canValid:
self.events.add_from_msg(CS.events)
# Create events for temperature, disk space, and memory
if self.sm['deviceState'].thermalStatus >= ThermalStatus.red:
if not self.frogpilot_toggles.increase_thermal_limits or self.sm['deviceState'].thermalStatus == ThermalStatus.danger:
self.events.add(EventName.overheat)
if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION:
# under 7% of space free no enable allowed
self.events.add(EventName.outOfSpace)
if self.sm['deviceState'].memoryUsagePercent > 90 and not SIMULATION:
self.events.add(EventName.lowMemory)
# TODO: enable this once loggerd CPU usage is more reasonable
#cpus = list(self.sm['deviceState'].cpuUsagePercent)
#if max(cpus, default=0) > 95 and not SIMULATION:
# self.events.add(EventName.highCpuUsage)
# Alert if fan isn't spinning for 5 seconds
if self.sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:
if self.sm['peripheralState'].fanSpeedRpm < 500 and self.sm['deviceState'].fanSpeedPercentDesired > 50:
# allow enough time for the fan controller in the panda to recover from stalls
if (self.sm.frame - self.last_functional_fan_frame) * DT_CTRL > 15.0:
self.events.add(EventName.fanMalfunction)
else:
self.last_functional_fan_frame = self.sm.frame
# Handle calibration status
cal_status = self.sm['liveCalibration'].calStatus
if cal_status != log.LiveCalibrationData.Status.calibrated:
if cal_status == log.LiveCalibrationData.Status.uncalibrated:
self.events.add(EventName.calibrationIncomplete)
elif cal_status == log.LiveCalibrationData.Status.recalibrating:
if not self.recalibrating_seen:
set_offroad_alert("Offroad_Recalibration", True)
self.recalibrating_seen = True
self.events.add(EventName.calibrationRecalibrating)
else:
self.events.add(EventName.calibrationInvalid)
# Handle lane change
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
if self.frogpilot_toggles.loud_blindspot_alert:
self.events.add(EventName.laneChangeBlockedLoud)
else:
self.events.add(EventName.laneChangeBlocked)
else:
if direction == LaneChangeDirection.left:
if self.sm['frogpilotPlan'].laneWidthLeft >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeLeft)
else:
self.events.add(EventName.noLaneAvailable)
else:
if self.sm['frogpilotPlan'].laneWidthRight >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeRight)
else:
self.events.add(EventName.noLaneAvailable)
elif self.sm['modelV2'].meta.laneChangeState in (LaneChangeState.laneChangeStarting,
LaneChangeState.laneChangeFinishing):
self.events.add(EventName.laneChange)
for i, pandaState in enumerate(self.sm['pandaStates']):
# All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutput
if i < len(self.CP.safetyConfigs):
safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or \
pandaState.safetyParam != self.CP.safetyConfigs[i].safetyParam or \
pandaState.alternativeExperience != self.CP.alternativeExperience
else:
safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODES
# safety mismatch allows some time for pandad to set the safety mode and publish it back from panda
if (safety_mismatch and self.sm.frame*DT_CTRL > 10.) or pandaState.safetyRxChecksInvalid or self.mismatch_counter >= 200:
self.events.add(EventName.controlsMismatch)
if log.PandaState.FaultType.relayMalfunction in pandaState.faults:
self.events.add(EventName.relayMalfunction)
# Handle HW and system malfunctions
# Order is very intentional here. Be careful when modifying this.
# All events here should at least have NO_ENTRY and SOFT_DISABLE.
num_events = len(self.events)
not_running = {p.name for p in self.sm['managerState'].processes if not p.running and p.shouldBeRunning}
if self.sm.recv_frame['managerState'] and (not_running - IGNORE_PROCESSES):
self.events.add(EventName.processNotRunning)
if not_running != self.not_running_prev:
cloudlog.event("process_not_running", not_running=not_running, error=True)
self.not_running_prev = not_running
else:
if not SIMULATION and not self.rk.lagging:
if not self.sm.all_alive(self.camera_packets):
self.events.add(EventName.cameraMalfunction)
elif not self.sm.all_freq_ok(self.camera_packets):
self.events.add(EventName.cameraFrameRate)
if not REPLAY and self.rk.lagging:
self.events.add(EventName.controlsdLagging)
if not self.frogpilot_toggles.radarless_model:
if len(self.sm['radarState'].radarErrors) or ((not self.rk.lagging or REPLAY) and not self.sm.all_checks(['radarState'])):
self.events.add(EventName.radarFault)
if not self.sm.valid['pandaStates']:
self.events.add(EventName.usbError)
if CS.canTimeout:
self.events.add(EventName.canBusMissing)
elif not CS.canValid:
pass#self.events.add(EventName.canError)
# generic catch-all. ideally, a more specific event should be added above instead
has_disable_events = self.events.contains(ET.NO_ENTRY) and (self.events.contains(ET.SOFT_DISABLE) or self.events.contains(ET.IMMEDIATE_DISABLE))
no_system_errors = (not has_disable_events) or (len(self.events) == num_events)
if not self.sm.all_checks() and no_system_errors:
if not self.sm.all_alive():
self.events.add(EventName.commIssue)
elif not self.sm.all_freq_ok():
self.events.add(EventName.commIssueAvgFreq)
else:
self.events.add(EventName.commIssue)
logs = {
'invalid': [s for s, valid in self.sm.valid.items() if not valid],
'not_alive': [s for s, alive in self.sm.alive.items() if not alive],
'not_freq_ok': [s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
}
if logs != self.logged_comm_issue:
cloudlog.event("commIssue", error=True, **logs)
self.logged_comm_issue = logs
else:
self.logged_comm_issue = None
if not (self.CP.notCar and self.joystick_mode):
if not self.sm['liveLocationKalman'].posenetOK:
self.events.add(EventName.posenetInvalid)
if not self.sm['liveLocationKalman'].deviceStable:
self.events.add(EventName.deviceFalling)
if not self.sm['liveLocationKalman'].inputsOK:
self.events.add(EventName.locationdTemporaryError)
if not self.sm['liveParameters'].valid and not TESTING_CLOSET and (not SIMULATION or REPLAY):
self.events.add(EventName.paramsdTemporaryError)
# conservative HW alert. if the data or frequency are off, locationd will throw an error
if any((self.sm.frame - self.sm.recv_frame[s])*DT_CTRL > 10. for s in self.sensor_packets):
self.events.add(EventName.sensorDataInvalid)
if not REPLAY:
# Check for mismatch between openpilot and car's PCM
cruise_mismatch = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
self.cruise_mismatch_counter = self.cruise_mismatch_counter + 1 if cruise_mismatch else 0
if self.cruise_mismatch_counter > int(6. / DT_CTRL):
pass#self.events.add(EventName.cruiseMismatch)
# Check for FCW
stock_long_is_braking = self.enabled and not self.CP.openpilotLongitudinalControl and CS.aEgo < -1.25
model_fcw = self.sm['modelV2'].meta.hardBrakePredicted and not CS.brakePressed and not stock_long_is_braking
planner_fcw = self.sm['longitudinalPlan'].fcw and self.enabled
if (planner_fcw or model_fcw) and not (self.CP.notCar and self.joystick_mode):
self.events.add(EventName.fcw)
self.fcw_random_event_triggered = not self.random_event_triggered
elif self.fcw_random_event_triggered and self.frogpilot_toggles.random_events:
self.events.add(EventName.yourFrogTriedToKillMe)
self.fcw_random_event_triggered = False
self.random_event_triggered = True
for m in messaging.drain_sock(self.log_sock, wait_for_one=False):
try:
msg = m.androidLog.message
if any(err in msg for err in ("ERROR_CRC", "ERROR_ECC", "ERROR_STREAM_UNDERFLOW", "APPLY FAILED")):
csid = msg.split("CSID:")[-1].split(" ")[0]
evt = CSID_MAP.get(csid, None)
if evt is not None:
self.events.add(evt)
except UnicodeDecodeError:
pass
# TODO: fix simulator
if not SIMULATION or REPLAY:
# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutes
if not self.sm['liveLocationKalman'].gpsOK and self.sm['liveLocationKalman'].inputsOK and (self.distance_traveled > 1500):
self.events.add(EventName.noGps)
if self.sm['liveLocationKalman'].gpsOK:
self.distance_traveled = 0
self.distance_traveled += CS.vEgo * DT_CTRL
if self.sm['modelV2'].frameDropPerc > 20:
self.events.add(EventName.modeldLagging)
# Update FrogPilot events
self.update_frogpilot_events(CS)
def data_sample(self):
"""Receive data from sockets"""
car_state = messaging.recv_one(self.car_state_sock)
CS = car_state.carState if car_state else self.CS_prev
self.sm.update(0)
if not self.initialized:
all_valid = CS.canValid and self.sm.all_checks()
timed_out = self.sm.frame * DT_CTRL > 6.
if all_valid or timed_out or (SIMULATION and not REPLAY):
available_streams = VisionIpcClient.available_streams("camerad", block=False)
if VisionStreamType.VISION_STREAM_ROAD not in available_streams:
self.sm.ignore_alive.append('roadCameraState')
if VisionStreamType.VISION_STREAM_WIDE_ROAD not in available_streams:
self.sm.ignore_alive.append('wideRoadCameraState')
self.initialized = True
self.set_initial_state()
cloudlog.event(
"controlsd.initialized",
dt=self.sm.frame*DT_CTRL,
timeout=timed_out,
canValid=CS.canValid,
invalid=[s for s, valid in self.sm.valid.items() if not valid],
not_alive=[s for s, alive in self.sm.alive.items() if not alive],
not_freq_ok=[s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
error=True,
)
# When the panda and controlsd do not agree on controls_allowed
# we want to disengage openpilot. However the status from the panda goes through
# another socket other than the CAN messages and one can arrive earlier than the other.
# Therefore we allow a mismatch for two samples, then we trigger the disengagement.
if not self.enabled:
self.mismatch_counter = 0
# All pandas not in silent mode must have controlsAllowed when openpilot is enabled
if self.enabled and any(not ps.controlsAllowed for ps in self.sm['pandaStates']
if ps.safetyModel not in IGNORED_SAFETY_MODES):
self.mismatch_counter += 1
return CS
def state_transition(self, CS):
"""Compute conditional state transitions and execute actions on state transitions"""
self.v_cruise_helper.update_v_cruise(CS, self.enabled, self.is_metric, self.speed_limit_changed, self.frogpilot_toggles)
# decrement the soft disable timer at every step, as it's reset on
# entrance in SOFT_DISABLING state
self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
self.current_alert_types = [ET.PERMANENT]
# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
if self.state != State.disabled:
# user and immediate disable always have priority in a non-disabled state
if self.events.contains(ET.USER_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.USER_DISABLE)
elif self.events.contains(ET.IMMEDIATE_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
else:
# ENABLED
if self.state == State.enabled:
if self.events.contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# SOFT DISABLING
elif self.state == State.softDisabling:
if not self.events.contains(ET.SOFT_DISABLE):
# no more soft disabling condition, so go back to ENABLED
self.state = State.enabled
elif self.soft_disable_timer > 0:
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.soft_disable_timer <= 0:
self.state = State.disabled
# PRE ENABLING
elif self.state == State.preEnabled:
if not self.events.contains(ET.PRE_ENABLE):
self.state = State.enabled
else:
self.current_alert_types.append(ET.PRE_ENABLE)
# OVERRIDING
elif self.state == State.overriding:
if self.events.contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif not (self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL)):
self.state = State.enabled
else:
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# DISABLED
elif self.state == State.disabled:
if self.events.contains(ET.ENABLE):
if self.events.contains(ET.NO_ENTRY):
self.current_alert_types.append(ET.NO_ENTRY)
else:
if self.events.contains(ET.PRE_ENABLE):
self.state = State.preEnabled
elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
else:
self.state = State.enabled
self.current_alert_types.append(ET.ENABLE)
self.v_cruise_helper.initialize_v_cruise(CS, self.experimental_mode, self.sm['frogpilotPlan'].unconfirmedSlcSpeedLimit, self.frogpilot_toggles)
# Check if openpilot is engaged and actuators are enabled
self.enabled = self.state in ENABLED_STATES
self.active = self.state in ACTIVE_STATES
if self.active or self.always_on_lateral_active:
self.current_alert_types.append(ET.WARNING)
def state_control(self, CS):
"""Given the state, this function returns a CarControl packet"""
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(self.frogpilot_toggles.steer_ratio, 0.1) if self.frogpilot_toggles.use_custom_steer_ratio else max(lp.steerRatio, 0.1)
self.VM.update_params(x, sr)
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
if self.sm.all_checks(['liveTorqueParameters']) and (torque_params.useParams or self.frogpilot_toggles.force_auto_tune):
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
torque_params.frictionCoefficientFiltered)
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.enabled
# Check which actuators can be enabled
standstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
CC.latActive = (self.active or self.always_on_lateral_active) and self.speed_check and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
(not standstill or self.joystick_mode)
CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControl
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if CS.leftBlinker or CS.rightBlinker:
self.last_blinker_frame = self.sm.frame
# State specific actions
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
self.LoC.reset()
if not self.joystick_mode:
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, self.v_cruise_helper.v_cruise_kph * CV.KPH_TO_MS, self.frogpilot_toggles)
actuators.accel = self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits)
if len(long_plan.speeds):
actuators.speed = long_plan.speeds[-1]
# Steering PID loop and lateral MPC
self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature)
actuators.curvature = self.desired_curvature
actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited, self.desired_curvature,
self.sm['liveLocationKalman'],
model_data=self.sm['modelV2'])
else:
lac_log = log.ControlsState.LateralDebugState.new_message()
if self.sm.recv_frame['testJoystick'] > 0:
# reset joystick if it hasn't been received in a while
should_reset_joystick = (self.sm.frame - self.sm.recv_frame['testJoystick'])*DT_CTRL > 0.2
if not should_reset_joystick:
joystick_axes = self.sm['testJoystick'].axes
else:
joystick_axes = [0.0, 0.0]
if CC.longActive:
actuators.accel = 4.0*clip(joystick_axes[0], -1, 1)
if CC.latActive:
steer = clip(joystick_axes[1], -1, 1)
# max angle is 45 for angle-based cars, max curvature is 0.02
actuators.steer, actuators.steeringAngleDeg, actuators.curvature = steer, steer * 90., steer * -0.02
lac_log.active = self.active
lac_log.steeringAngleDeg = CS.steeringAngleDeg
lac_log.output = actuators.steer
lac_log.saturated = abs(actuators.steer) >= 0.9
if CS.steeringPressed:
self.last_steering_pressed_frame = self.sm.frame
recent_steer_pressed = (self.sm.frame - self.last_steering_pressed_frame)*DT_CTRL < 2.0
# Send a "steering required alert" if saturation count has reached the limit
if lac_log.active and not recent_steer_pressed and not self.CP.notCar:
if self.CP.lateralTuning.which() == 'torque' and not self.joystick_mode:
undershooting = abs(lac_log.desiredLateralAccel) / abs(1e-3 + lac_log.actualLateralAccel) > 1.2
turning = abs(lac_log.desiredLateralAccel) > 1.0
good_speed = CS.vEgo > 5
max_torque = abs(self.sm['carOutput'].actuatorsOutput.steer) > 0.99
if undershooting and turning and good_speed and max_torque and not self.random_event_triggered:
event_choices = [1, 2]
if self.sm.frame % (10000 // len(event_choices)) == 0 and self.frogpilot_toggles.random_events:
event_choice = random.choice(event_choices)
if event_choice == 1:
lac_log.active and self.events.add(EventName.firefoxSteerSaturated)
self.params_memory.put_int("CurrentRandomEvent", 1)
elif event_choice == 2:
lac_log.active and self.events.add(EventName.goatSteerSaturated)
self.random_event_triggered = True
else:
lac_log.active and self.events.add(EventName.goatSteerSaturated if self.frogpilot_toggles.goat_scream else EventName.steerSaturated)
elif lac_log.saturated:
# TODO probably should not use dpath_points but curvature
dpath_points = model_v2.position.y
if len(dpath_points):
# Check if we deviated from the path
# TODO use desired vs actual curvature
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
steering_value = actuators.steeringAngleDeg
else:
steering_value = actuators.steer
left_deviation = steering_value > 0 and dpath_points[0] < -0.20
right_deviation = steering_value < 0 and dpath_points[0] > 0.20
if left_deviation or right_deviation:
self.events.add(EventName.steerSaturated)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, SupportsFloat):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents) or self.onroad_distance_pressed:
menu_open = self.display_timer > 0 or not self.sm['frogpilotCarState'].hasMenu
if not (self.sm['frogpilotCarState'].distanceLongPressed or self.params_memory.get_bool("OnroadDistanceButtonPressed")) and menu_open:
self.personality = (self.personality - 1) % 3
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
self.display_timer = 350
self.onroad_distance_pressed = self.params_memory.get_bool("OnroadDistanceButtonPressed")
self.display_timer -= 1
FPCC = self.update_frogpilot_variables(CS)
return CC, lac_log, FPCC
def publish_logs(self, CS, start_time, CC, lac_log, FPCC):
"""Send actuators and hud commands to the car, send controlsstate and MPC logging"""
# Orientation and angle rates can be useful for carcontroller
# Only calibrated (car) frame is relevant for the carcontroller
orientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value)
if len(orientation_value) > 2:
CC.orientationNED = orientation_value
angular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value)
if len(angular_rate_value) > 2:
CC.angularVelocity = angular_rate_value
CC.cruiseControl.override = self.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
if self.joystick_mode and self.sm.recv_frame['testJoystick'] > 0 and self.sm['testJoystick'].buttons[0]:
CC.cruiseControl.cancel = True
speeds = self.sm['longitudinalPlan'].speeds
if len(speeds):
CC.cruiseControl.resume = self.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
hudControl = CC.hudControl
hudControl.setSpeed = float(self.v_cruise_helper.v_cruise_cluster_kph * CV.KPH_TO_MS)
hudControl.speedVisible = self.enabled
hudControl.lanesVisible = self.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.personality + 1
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
recent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \
and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated
model_v2 = self.sm['modelV2']
desire_prediction = model_v2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = model_v2.laneLineProbs[2] > 0.5
left_lane_visible = model_v2.laneLineProbs[1] > 0.5
l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]
r_lane_change_prob = desire_prediction[Desire.laneChangeRight]
lane_lines = model_v2.laneLines
l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))
r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))
hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)
hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)
if hudControl.rightLaneDepart or hudControl.leftLaneDepart:
self.events.add(EventName.ldw)
clear_event_types = set()
if ET.WARNING not in self.current_alert_types:
clear_event_types.add(ET.WARNING)
if self.enabled:
clear_event_types.add(ET.NO_ENTRY)
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer])
self.AM.add_many(self.sm.frame, alerts)
current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)
if current_alert:
hudControl.visualAlert = current_alert.visual_alert
if not self.CP.passive and self.initialized:
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2
force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \
(self.state == State.softDisabling)
# Curvature & Steering angle
lp = self.sm['liveParameters']
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
controlsState = dat.controlsState
if current_alert:
controlsState.alertText1 = current_alert.alert_text_1
controlsState.alertText2 = current_alert.alert_text_2
controlsState.alertSize = current_alert.alert_size
controlsState.alertStatus = current_alert.alert_status
controlsState.alertBlinkingRate = current_alert.alert_rate
controlsState.alertType = current_alert.alert_type
controlsState.alertSound = current_alert.audible_alert
controlsState.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
controlsState.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
controlsState.enabled = self.enabled
controlsState.active = self.active
controlsState.curvature = curvature
controlsState.desiredCurvature = self.desired_curvature
controlsState.state = self.state
controlsState.engageable = not self.events.contains(ET.NO_ENTRY)
controlsState.longControlState = self.LoC.long_control_state
controlsState.vCruise = float(self.v_cruise_helper.v_cruise_kph)
controlsState.vCruiseCluster = float(self.v_cruise_helper.v_cruise_cluster_kph)
controlsState.upAccelCmd = float(self.LoC.pid.p)
controlsState.uiAccelCmd = float(self.LoC.pid.i)
controlsState.ufAccelCmd = float(self.LoC.pid.f)
controlsState.cumLagMs = -self.rk.remaining * 1000.
controlsState.startMonoTime = int(start_time * 1e9)
controlsState.forceDecel = bool(force_decel)
controlsState.experimentalMode = self.experimental_mode
controlsState.personality = self.personality
lat_tuning = self.CP.lateralTuning.which()
if self.joystick_mode:
controlsState.lateralControlState.debugState = lac_log
elif self.CP.steerControlType == car.CarParams.SteerControlType.angle:
controlsState.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
controlsState.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
controlsState.lateralControlState.torqueState = lac_log
self.pm.send('controlsState', dat)
# onroadEvents - logged every second or on change
if (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev):
ce_send = messaging.new_message('onroadEvents', len(self.events))
ce_send.valid = True
ce_send.onroadEvents = self.events.to_msg()
self.pm.send('onroadEvents', ce_send)
self.events_prev = self.events.names.copy()
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
# frogpilotCarControl
fpcc_send = messaging.new_message('frogpilotCarControl')
fpcc_send.valid = CS.canValid
fpcc_send.frogpilotCarControl = FPCC
self.pm.send('frogpilotCarControl', fpcc_send)
def step(self):
start_time = time.monotonic()
# Sample data from sockets and get a carState
CS = self.data_sample()
cloudlog.timestamp("Data sampled")
self.update_events(CS)
cloudlog.timestamp("Events updated")
if not self.CP.passive and self.initialized:
# Update control state
self.state_transition(CS)
# Compute actuators (runs PID loops and lateral MPC)
CC, lac_log, FPCC = self.state_control(CS)
# Publish data
self.publish_logs(CS, start_time, CC, lac_log, FPCC)
self.CS_prev = CS
def read_personality_param(self):
try:
return int(self.params.get('LongitudinalPersonality'))
except (ValueError, TypeError):
return log.LongitudinalPersonality.standard
def params_thread(self, evt):
while not evt.is_set():
self.is_metric = self.params.get_bool("IsMetric")
if self.CP.openpilotLongitudinalControl and not self.frogpilot_toggles.conditional_experimental_mode:
self.experimental_mode = self.params.get_bool("ExperimentalMode") or self.frogpilot_toggles.speed_limit_controller and SpeedLimitController.experimental_mode
self.personality = self.read_personality_param()
if self.CP.notCar:
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
time.sleep(0.1)
# Update FrogPilot parameters
if FrogPilotVariables.toggles_updated:
FrogPilotVariables.update_frogpilot_params()
def controlsd_thread(self):
e = threading.Event()
t = threading.Thread(target=self.params_thread, args=(e, ))
try:
t.start()
while True:
self.step()
self.rk.monitor_time()
except SystemExit:
e.set()
t.join()
def update_frogpilot_events(self, CS):
if self.frogpilot_toggles.green_light_alert:
green_light = self.sm['modelV2'].position.x[ModelConstants.IDX_N - 1] > ModelConstants.IDX_N
green_light &= not CS.gasPressed
green_light &= not self.sm['longitudinalPlan'].hasLead
green_light &= self.previously_enabled
green_light &= CS.standstill
if green_light:
self.events.add(EventName.greenLight)
if self.sm.frame * DT_CTRL >= 10 and self.frogpilot_toggles.current_holiday_theme != 0 and not self.holiday_theme_alerted:
self.events.add(EventName.holidayActive)
self.holiday_theme_alerted = True
if self.frogpilot_toggles.lead_departing_alert and self.sm['frogpilotPlan'].leadDeparting:
self.events.add(EventName.leadDeparting)
if not self.openpilot_crashed_triggered and os.path.isfile(os.path.join(sentry.CRASHES_DIR, 'error.txt')):
if self.frogpilot_toggles.random_events:
self.events.add(EventName.openpilotCrashedRandomEvents)
else:
self.events.add(EventName.openpilotCrashed)
self.openpilot_crashed_triggered = True
if self.frogpilot_toggles.random_events:
acceleration = CS.aEgo
if not CS.gasPressed:
self.max_acceleration = max(acceleration, self.max_acceleration)
else:
self.max_acceleration = 0
if 3.5 > self.max_acceleration >= 3.0 and acceleration < 1.5:
self.events.add(EventName.accel30)
self.params_memory.put_int("CurrentRandomEvent", 2)
self.random_event_triggered = True
self.max_acceleration = 0
elif 4.0 > self.max_acceleration >= 3.5 and acceleration < 1.5:
self.events.add(EventName.accel35)
self.params_memory.put_int("CurrentRandomEvent", 3)
self.random_event_triggered = True
self.max_acceleration = 0
elif self.max_acceleration >= 4.0 and acceleration < 1.5:
self.events.add(EventName.accel40)
self.params_memory.put_int("CurrentRandomEvent", 4)
self.random_event_triggered = True
self.max_acceleration = 0
if self.sm['frogpilotPlan'].takingCurveQuickly:
self.events.add(EventName.dejaVuCurve)
self.params_memory.put_int("CurrentRandomEvent", 5)
self.random_event_triggered = True
conversion = 1 if self.is_metric else CV.KPH_TO_MPH
v_cruise = max(self.v_cruise_helper.v_cruise_kph, self.v_cruise_helper.v_cruise_cluster_kph) * conversion
if 70 > v_cruise >= 69:
if self.sm.frame % 25 == 0:
if v_cruise == self.previous_v_cruise and not self.vCruise69_alert_played:
self.events.add(EventName.vCruise69)
self.vCruise69_alert_played = True
self.previous_v_cruise = v_cruise
else:
self.vCruise69_alert_played = False
self.previous_v_cruise = v_cruise
if self.frogpilot_toggles.speed_limit_alert and self.speed_limit_changed:
self.events.add(EventName.speedLimitChanged)
if self.sm.frame * DT_CTRL == 5.5 and self.CP.lateralTuning.which() == 'torque' and self.CI.use_nnff:
self.events.add(EventName.torqueNNLoad)
if self.sm['modelV2'].meta.turnDirection == Desire.turnLeft:
self.events.add(EventName.turningLeft)
elif self.sm['modelV2'].meta.turnDirection == Desire.turnRight:
self.events.add(EventName.turningRight)
def update_frogpilot_variables(self, CS):
driving_gear = CS.gearShifter not in (GearShifter.neutral, GearShifter.park, GearShifter.reverse, GearShifter.unknown)
self.always_on_lateral_active |= CS.cruiseState.enabled or self.frogpilot_toggles.always_on_lateral_main
self.always_on_lateral_active &= CS.cruiseState.available
self.always_on_lateral_active &= self.frogpilot_toggles.always_on_lateral
self.always_on_lateral_active &= driving_gear
self.always_on_lateral_active &= self.speed_check
self.always_on_lateral_active &= not (CS.brakePressed and CS.vEgo < self.frogpilot_toggles.always_on_lateral_pause_speed) or CS.standstill
self.drive_distance += CS.vEgo * DT_CTRL
self.drive_time += DT_CTRL
if self.drive_time > 60 and CS.standstill:
distance_to_add = self.drive_distance / 1000
self.current_total_distance += distance_to_add
self.params_tracking.put_float_nonblocking("FrogPilotKilometers", self.current_total_distance)
self.drive_distance = 0
time_to_add = self.drive_time / 60
self.current_total_time += time_to_add
self.params_tracking.put_float_nonblocking("FrogPilotMinutes", self.current_total_time)
self.drive_time = 0
if self.sm.frame * DT_CTRL > 60 * 5 and not self.drive_added:
self.current_total_drives += 1
self.params_tracking.put_int_nonblocking("FrogPilotDrives", self.current_total_drives)
self.drive_added = True
if self.frogpilot_toggles.conditional_experimental_mode:
self.experimental_mode = self.sm['frogpilotPlan'].conditionalExperimentalActive
if any(be.pressed and be.type == FrogPilotButtonType.lkas for be in CS.buttonEvents) and self.frogpilot_toggles.experimental_mode_via_lkas:
if self.frogpilot_toggles.conditional_experimental_mode:
conditional_status = self.params_memory.get_int("CEStatus")
override_value = 0 if conditional_status in {1, 2, 3, 4, 5, 6} else 3 if conditional_status >= 7 else 4
self.params_memory.put_int("CEStatus", override_value)
else:
self.params.put_bool_nonblocking("ExperimentalMode", not self.experimental_mode)
self.previously_enabled |= (self.enabled or self.always_on_lateral_active) and CS.vEgo > CRUISING_SPEED
self.previously_enabled &= driving_gear
if self.random_event_triggered:
self.random_event_timer += DT_CTRL
if self.random_event_timer >= 4:
self.random_event_triggered = False
self.random_event_timer = 0
self.params_memory.remove("CurrentRandomEvent")
self.speed_check = CS.vEgo >= self.frogpilot_toggles.pause_lateral_below_speed
self.speed_check |= self.frogpilot_toggles.pause_lateral_below_signal and not (CS.leftBlinker or CS.rightBlinker)
self.speed_check |= CS.standstill
if self.frogpilot_toggles.speed_limit_confirmation:
current_speed_limit = self.sm['frogpilotPlan'].slcSpeedLimit
desired_speed_limit = self.sm['frogpilotPlan'].unconfirmedSlcSpeedLimit
limit_changed = desired_speed_limit != self.previous_speed_limit and abs(current_speed_limit - desired_speed_limit) > 1
speed_limit_decreased = limit_changed and self.previous_speed_limit > desired_speed_limit
speed_limit_increased = limit_changed and self.previous_speed_limit < desired_speed_limit
self.previous_speed_limit = desired_speed_limit
if self.CP.pcmCruise and self.speed_limit_changed:
if any(be.type == ButtonType.accelCruise for be in CS.buttonEvents):
self.params_memory.put_bool("SLCConfirmed", True)
self.params_memory.put_bool("SLCConfirmedPressed", True)
elif any(be.type == ButtonType.decelCruise for be in CS.buttonEvents):
self.params_memory.put_bool("SLCConfirmed", False)
self.params_memory.put_bool("SLCConfirmedPressed", True)
if speed_limit_decreased:
if self.frogpilot_toggles.speed_limit_confirmation_lower:
self.speed_limit_changed = True
else:
self.params_memory.put_bool("SLCConfirmed", True)
elif speed_limit_increased:
if self.frogpilot_toggles.speed_limit_confirmation_higher:
self.speed_limit_changed = True
else:
self.params_memory.put_bool("SLCConfirmed", True)
if self.params_memory.get_bool("SLCConfirmedPressed") or not abs(current_speed_limit - desired_speed_limit) > 1:
self.speed_limit_changed = False
self.params_memory.put_bool("SLCConfirmedPressed", False)
if self.speed_limit_changed:
self.speed_limit_timer += DT_CTRL
if self.speed_limit_timer >= 10:
self.speed_limit_changed = False
self.speed_limit_timer = 0
else:
self.speed_limit_timer = 0
else:
self.speed_limit_changed = False
FPCC = custom.FrogPilotCarControl.new_message()
FPCC.alwaysOnLateral = self.always_on_lateral_active
FPCC.speedLimitChanged = self.speed_limit_changed
return FPCC
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.controlsd_thread()
if __name__ == "__main__":
main()