openpilot1/selfdrive/controls/controlsd.py

1040 lines
46 KiB
Python

#!/usr/bin/env python3
import os
import math
import time
import threading
from typing import SupportsFloat
import cereal.messaging as messaging
from cereal import car, custom, log
from msgq.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
from openpilot.common.git import get_short_branch
from openpilot.common.numpy_fast import clip
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.car.car_helpers import get_car_interface, get_startup_event
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, clip_curvature
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.system.hardware import HARDWARE
from openpilot.selfdrive.frogpilot.controls.lib.frogpilot_acceleration import get_max_allowed_accel
from openpilot.selfdrive.frogpilot.frogpilot_variables import CRUISING_SPEED, NON_DRIVING_GEARS, get_frogpilot_toggles
SOFT_DISABLE_TIME = 3 # seconds
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
CAMERA_OFFSET = 0.04
REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}
ThermalStatus = log.DeviceState.ThermalStatus
State = log.ControlsState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.Desire
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
EventName = car.CarEvent.EventName
ButtonType = car.CarState.ButtonEvent.Type
FrogPilotButtonType = custom.FrogPilotCarState.ButtonEvent.Type
SafetyModel = car.CarParams.SafetyModel
IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError}
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
class Controls:
def __init__(self, CI=None):
self.params = Params()
###################################################
self.params_memory = Params("/dev/shm/params")
self.params_memory.put_bool("KeyResume", False)
self.params_memory.put_bool("KeyCancel", False)
###################################################
if CI is None:
cloudlog.info("controlsd is waiting for CarParams")
with car.CarParams.from_bytes(self.params.get("CarParams", block=True)) as msg:
# TODO: this shouldn't need to be a builder
self.CP = msg.as_builder()
cloudlog.info("controlsd got CarParams")
# Uses car interface helper functions, altering state won't be considered by card for actuation
self.CI = get_car_interface(self.CP)
else:
self.CI, self.CP = CI, CI.CP
# Ensure the current branch is cached, otherwise the first iteration of controlsd lags
self.branch = get_short_branch()
self.block_user = self.branch == "FrogPilot-Development" and self.params.get("DongleId", encoding='utf-8') != "FrogsGoMoo"
# Setup sockets
self.pm = messaging.PubMaster(['controlsState', 'carControl', 'onroadEvents', 'frogpilotCarControl'])
self.sensor_packets = ["accelerometer", "gyroscope"]
self.camera_packets = ["roadCameraState", "driverCameraState", "wideRoadCameraState"]
self.log_sock = messaging.sub_sock('androidLog')
# TODO: de-couple controlsd with card/conflate on carState without introducing controls mismatches
self.car_state_sock = messaging.sub_sock('carState', timeout=20)
ignore = self.sensor_packets + ['testJoystick']
if SIMULATION:
ignore += ['driverCameraState', 'managerState']
if REPLAY:
# no vipc in replay will make them ignored anyways
ignore += ['roadCameraState', 'wideRoadCameraState']
if get_frogpilot_toggles(True).radarless_model:
ignore += ['radarState']
########################
ignore += ['driverMonitoringState']
self.params.put_bool_nonblocking("DmModelInitialized", True)
########################
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'testJoystick', 'frogpilotCarState', 'frogpilotPlan'] + self.camera_packets + self.sensor_packets,
######################################
ignore_alive=ignore, ignore_avg_freq=ignore+['radarState', 'testJoystick', 'driverMonitoringState'], ignore_valid=['testJoystick', ],
######################################
frequency=int(1/DT_CTRL))
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
# read params
self.is_metric = self.params.get_bool("IsMetric")
self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled")
# detect sound card presence and ensure successful init
sounds_available = HARDWARE.get_sound_card_online()
car_recognized = self.CP.carName != 'mock'
# cleanup old params
if not self.CP.experimentalLongitudinalAvailable:
self.params.remove("ExperimentalLongitudinalEnabled")
if not self.CP.openpilotLongitudinalControl:
self.params.remove("ExperimentalMode")
self.CS_prev = car.CarState.new_message()
self.AM = AlertManager()
self.events = Events()
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI)
self.initialized = False
self.state = State.disabled
self.enabled = False
self.active = False
self.soft_disable_timer = 0
self.mismatch_counter = 0
self.cruise_mismatch_counter = 0
self.last_blinker_frame = 0
self.last_steering_pressed_frame = 0
self.distance_traveled = 0
self.last_functional_fan_frame = 0
self.events_prev = []
self.current_alert_types = [ET.PERMANENT]
self.logged_comm_issue = None
self.not_running_prev = None
self.steer_limited = False
self.desired_curvature = 0.0
self.experimental_mode = False
self.personality = self.read_personality_param()
self.v_cruise_helper = VCruiseHelper(self.CP)
self.recalibrating_seen = False
self.can_log_mono_time = 0
self.startup_event = get_startup_event(car_recognized, not self.CP.passive, len(self.CP.carFw) > 0, self.block_user, get_frogpilot_toggles(True))
if not sounds_available:
self.events.add(EventName.soundsUnavailable, static=True)
if not car_recognized:
self.events.add(EventName.carUnrecognized, static=True)
if len(self.CP.carFw) > 0:
set_offroad_alert("Offroad_CarUnrecognized", True)
else:
set_offroad_alert("Offroad_NoFirmware", True)
elif self.CP.passive:
self.events.add(EventName.dashcamMode, static=True)
# controlsd is driven by carState, expected at 100Hz
self.rk = Ratekeeper(100, print_delay_threshold=None)
# FrogPilot variables
self.frogpilot_toggles = get_frogpilot_toggles(True)
self.params_memory = Params("/dev/shm/params")
self.always_on_lateral_active = False
self.always_on_lateral_active_previously = False
self.fcw_event_triggered = False
self.no_entry_alert_triggered = False
self.onroad_distance_pressed = False
self.resume_pressed = False
self.resume_previously_pressed = False
self.steer_saturated_event_triggered = False
self.radarless_model = self.frogpilot_toggles.radarless_model
self.use_old_long = self.CP.carName == "hyundai" and not self.params.get_bool("NewLongAPI")
self.use_old_long |= self.CP.carName == "gm" and not self.params.get_bool("NewLongAPIGM")
self.display_timer = 0
def set_initial_state(self):
if REPLAY:
controls_state = self.params.get("ReplayControlsState")
if controls_state is not None:
with log.ControlsState.from_bytes(controls_state) as controls_state:
self.v_cruise_helper.v_cruise_kph = controls_state.vCruise
if any(ps.controlsAllowed for ps in self.sm['pandaStates']):
self.state = State.enabled
def update_events(self, CS):
"""Compute onroadEvents from carState"""
self.events.clear()
# Add joystick event, static on cars, dynamic on nonCars
if self.joystick_mode:
self.events.add(EventName.joystickDebug)
self.startup_event = None
# Add startup event
if self.startup_event is not None:
self.events.add(self.startup_event)
self.startup_event = None
# Don't add any more events if not initialized
if not self.initialized:
self.events.add(EventName.controlsInitializing)
return
# no more events while in dashcam mode
if self.CP.passive:
return
# Block resume if cruise never previously enabled
self.resume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents)
if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and self.resume_pressed:
self.events.add(EventName.resumeBlocked)
if not self.CP.notCar:
self.events.add_from_msg(self.sm['driverMonitoringState'].events)
# Add car events, ignore if CAN isn't valid
if CS.canValid:
self.events.add_from_msg(CS.events)
# Create events for temperature, disk space, and memory
if self.sm['deviceState'].thermalStatus >= ThermalStatus.red:
self.events.add(EventName.overheat)
if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION:
# under 7% of space free no enable allowed
self.events.add(EventName.outOfSpace)
if self.sm['deviceState'].memoryUsagePercent > 90 and not SIMULATION:
self.events.add(EventName.lowMemory)
# TODO: enable this once loggerd CPU usage is more reasonable
#cpus = list(self.sm['deviceState'].cpuUsagePercent)
#if max(cpus, default=0) > 95 and not SIMULATION:
# self.events.add(EventName.highCpuUsage)
# Alert if fan isn't spinning for 5 seconds
if self.sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:
if self.sm['peripheralState'].fanSpeedRpm < 500 and self.sm['deviceState'].fanSpeedPercentDesired > 50:
# allow enough time for the fan controller in the panda to recover from stalls
if (self.sm.frame - self.last_functional_fan_frame) * DT_CTRL > 15.0:
self.events.add(EventName.fanMalfunction)
else:
self.last_functional_fan_frame = self.sm.frame
# Handle calibration status
cal_status = self.sm['liveCalibration'].calStatus
if cal_status != log.LiveCalibrationData.Status.calibrated:
if cal_status == log.LiveCalibrationData.Status.uncalibrated:
self.events.add(EventName.calibrationIncomplete)
elif cal_status == log.LiveCalibrationData.Status.recalibrating:
if not self.recalibrating_seen:
set_offroad_alert("Offroad_Recalibration", True)
self.recalibrating_seen = True
self.events.add(EventName.calibrationRecalibrating)
else:
self.events.add(EventName.calibrationInvalid)
# Handle lane change
###############################################################
self.ChangeLane_Reminder = self.params.get_bool("ChangeLaneReminder")
###############################################################
if self.sm['modelV2'].meta.laneChangeState == LaneChangeState.preLaneChange:
direction = self.sm['modelV2'].meta.laneChangeDirection
if (CS.leftBlindspot and direction == LaneChangeDirection.left) or \
(CS.rightBlindspot and direction == LaneChangeDirection.right):
if self.frogpilot_toggles.loud_blindspot_alert:
self.events.add(EventName.laneChangeBlockedLoud)
else:
self.events.add(EventName.laneChangeBlocked)
else:
if direction == LaneChangeDirection.left:
if self.sm['frogpilotPlan'].laneWidthLeft >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeLeft)
else:
self.events.add(EventName.noLaneAvailable)
else:
if self.sm['frogpilotPlan'].laneWidthRight >= self.frogpilot_toggles.lane_detection_width:
self.events.add(EventName.preLaneChangeRight)
else:
self.events.add(EventName.noLaneAvailable)
elif self.sm['modelV2'].meta.laneChangeState in (LaneChangeState.laneChangeStarting,
LaneChangeState.laneChangeFinishing):
##########################################################
if self.ChangeLane_Reminder :
self.events.add(EventName.laneChange)
##########################################################
for i, pandaState in enumerate(self.sm['pandaStates']):
# All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutput
if i < len(self.CP.safetyConfigs):
safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or \
pandaState.safetyParam != self.CP.safetyConfigs[i].safetyParam or \
pandaState.alternativeExperience != self.CP.alternativeExperience
else:
safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODES
# safety mismatch allows some time for pandad to set the safety mode and publish it back from panda
if (safety_mismatch and self.sm.frame*DT_CTRL > 10.) or pandaState.safetyRxChecksInvalid or self.mismatch_counter >= 200:
self.events.add(EventName.controlsMismatch)
if log.PandaState.FaultType.relayMalfunction in pandaState.faults:
self.events.add(EventName.relayMalfunction)
# Handle HW and system malfunctions
# Order is very intentional here. Be careful when modifying this.
# All events here should at least have NO_ENTRY and SOFT_DISABLE.
num_events = len(self.events)
not_running = {p.name for p in self.sm['managerState'].processes if not p.running and p.shouldBeRunning}
if self.sm.recv_frame['managerState'] and (not_running - IGNORE_PROCESSES):
self.events.add(EventName.processNotRunning)
if not_running != self.not_running_prev:
cloudlog.event("process_not_running", not_running=not_running, error=True)
self.not_running_prev = not_running
else:
if not SIMULATION and not self.rk.lagging:
if not self.sm.all_alive(self.camera_packets):
self.events.add(EventName.cameraMalfunction)
elif not self.sm.all_freq_ok(self.camera_packets):
self.events.add(EventName.cameraFrameRate)
if not REPLAY and self.rk.lagging:
self.events.add(EventName.controlsdLagging)
if not self.radarless_model:
if len(self.sm['radarState'].radarErrors) or ((not self.rk.lagging or REPLAY) and not self.sm.all_checks(['radarState'])):
self.events.add(EventName.radarFault)
if not self.sm.valid['pandaStates']:
self.events.add(EventName.usbError)
if CS.canTimeout:
self.events.add(EventName.canBusMissing)
elif not CS.canValid:
self.events.add(EventName.canError)
###################################################
speedover_reminder = self.params.get_bool('speedoverreminder')
nav_reminder = self.params.get_bool("NavReminder")
speedover_reminder_status = self.params_memory.get_int("speedoverreminderstatus")
nav_reminder_status = self.params_memory.get_int("NavReminderstatus")
###################################################
#超速提醒
if speedover_reminder :
if self.sm['frogpilotPlan'].speedover:
self.events.add(EventName.speedover)
speedover_reminder_status = 1
# print("[PONTEST][controlsd.py][update_events()] speedover_reminder_status1=", speedover_reminder_status)
else:
speedover_reminder_status = 0
# print("[PONTEST][controlsd.py][update_events()] speedover_reminder_status0=", speedover_reminder_status)
self.params_memory.put_int('speedoverreminderstatus',speedover_reminder_status)
# 依車速調整跟車距離
if self.params.get_bool("Speeddistance") :
v_ego_kph = CS.vEgo*3.6
if v_ego_kph < 60:
if self.params.get_int("LongitudinalPersonality") != 0 :
self.params.put_int("LongitudinalPersonality", 0)
elif v_ego_kph > 60 and v_ego_kph < 90:
if self.params.get_int("LongitudinalPersonality") != 1 :
self.params.put_int("LongitudinalPersonality",1)
elif v_ego_kph > 90 and v_ego_kph < 120:
if self.params.get_int("LongitudinalPersonality") != 1 :
self.params.put_int("LongitudinalPersonality",1)
self.params_memory.put_bool("FrogPilotTogglesUpdated", True)
##################NAV語音#####################################################
if nav_reminder:
if self.params_memory.get_bool("navTurn") and nav_reminder_status == 0:
self.events.add(EventName.navturn)
nav_reminder_status = 1
elif nav_reminder_status == 1:
if self.params_memory.get_bool("navUturn"):
self.events.add(EventName.navuturn)
elif self.params_memory.get_bool("navturnRight"):
self.events.add(EventName.navturnright)
elif self.params_memory.get_bool("navturnLeft"):
self.events.add(EventName.navturnleft)
elif self.params_memory.get_bool("navSharpright"):
self.events.add(EventName.navsharpright)
elif self.params_memory.get_bool("navSharpleft"):
self.events.add(EventName.navsharpleft)
elif self.params_memory.get_bool("navOfframp"):
self.events.add(EventName.navofframp)
nav_reminder_status = 0
self.params_memory.put_int('NavReminderstatus', nav_reminder_status)
#############################################################################
# generic catch-all. ideally, a more specific event should be added above instead
has_disable_events = self.events.contains(ET.NO_ENTRY) and (self.events.contains(ET.SOFT_DISABLE) or self.events.contains(ET.IMMEDIATE_DISABLE))
no_system_errors = (not has_disable_events) or (len(self.events) == num_events)
if not self.sm.all_checks() and no_system_errors:
if not self.sm.all_alive():
self.events.add(EventName.commIssue)
elif not self.sm.all_freq_ok():
self.events.add(EventName.commIssueAvgFreq)
else:
self.events.add(EventName.commIssue)
logs = {
'invalid': [s for s, valid in self.sm.valid.items() if not valid],
'not_alive': [s for s, alive in self.sm.alive.items() if not alive],
'not_freq_ok': [s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
}
if logs != self.logged_comm_issue:
cloudlog.event("commIssue", error=True, **logs)
self.logged_comm_issue = logs
else:
self.logged_comm_issue = None
if not (self.CP.notCar and self.joystick_mode):
if not self.sm['liveLocationKalman'].posenetOK:
self.events.add(EventName.posenetInvalid)
if not self.sm['liveLocationKalman'].deviceStable:
self.events.add(EventName.deviceFalling)
if not self.sm['liveLocationKalman'].inputsOK:
self.events.add(EventName.locationdTemporaryError)
if not self.sm['liveParameters'].valid and not TESTING_CLOSET and (not SIMULATION or REPLAY):
self.events.add(EventName.paramsdTemporaryError)
# conservative HW alert. if the data or frequency are off, locationd will throw an error
if any((self.sm.frame - self.sm.recv_frame[s])*DT_CTRL > 10. for s in self.sensor_packets):
self.events.add(EventName.sensorDataInvalid)
if not REPLAY:
# Check for mismatch between openpilot and car's PCM
cruise_mismatch = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
self.cruise_mismatch_counter = self.cruise_mismatch_counter + 1 if cruise_mismatch else 0
if self.cruise_mismatch_counter > int(6. / DT_CTRL):
self.events.add(EventName.cruiseMismatch)
# Check for FCW
stock_long_is_braking = self.enabled and not self.CP.openpilotLongitudinalControl and CS.aEgo < -1.25
model_fcw = self.sm['modelV2'].meta.hardBrakePredicted and not CS.brakePressed and not stock_long_is_braking
planner_fcw = self.sm['longitudinalPlan'].fcw and self.enabled
if (planner_fcw or model_fcw) and not (self.CP.notCar and self.joystick_mode):
self.events.add(EventName.fcw)
self.fcw_event_triggered = True
for m in messaging.drain_sock(self.log_sock, wait_for_one=False):
try:
msg = m.androidLog.message
if any(err in msg for err in ("ERROR_CRC", "ERROR_ECC", "ERROR_STREAM_UNDERFLOW", "APPLY FAILED")):
csid = msg.split("CSID:")[-1].split(" ")[0]
evt = CSID_MAP.get(csid, None)
if evt is not None:
self.events.add(evt)
except UnicodeDecodeError:
pass
# TODO: fix simulator
if not SIMULATION or REPLAY:
# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutes
if not self.sm['liveLocationKalman'].gpsOK and self.sm['liveLocationKalman'].inputsOK and (self.distance_traveled > 1500):
self.events.add(EventName.noGps)
if self.sm['liveLocationKalman'].gpsOK:
self.distance_traveled = 0
self.distance_traveled += CS.vEgo * DT_CTRL
if self.sm['modelV2'].frameDropPerc > 20:
self.events.add(EventName.modeldLagging)
# Add FrogPilot events
self.events.add_from_msg(self.sm['frogpilotPlan'].frogpilotEvents)
if self.block_user:
return EventName.blockUser
def data_sample(self):
"""Receive data from sockets"""
car_state = messaging.recv_one(self.car_state_sock)
CS = car_state.carState if car_state else self.CS_prev
self.sm.update(0)
if not self.initialized:
all_valid = CS.canValid and self.sm.all_checks()
timed_out = self.sm.frame * DT_CTRL > 6.
if all_valid or timed_out or (SIMULATION and not REPLAY):
available_streams = VisionIpcClient.available_streams("camerad", block=False)
if VisionStreamType.VISION_STREAM_ROAD not in available_streams:
self.sm.ignore_alive.append('roadCameraState')
if VisionStreamType.VISION_STREAM_WIDE_ROAD not in available_streams:
self.sm.ignore_alive.append('wideRoadCameraState')
self.initialized = True
self.set_initial_state()
cloudlog.event(
"controlsd.initialized",
dt=self.sm.frame*DT_CTRL,
timeout=timed_out,
canValid=CS.canValid,
invalid=[s for s, valid in self.sm.valid.items() if not valid],
not_alive=[s for s, alive in self.sm.alive.items() if not alive],
not_freq_ok=[s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],
error=True,
)
# When the panda and controlsd do not agree on controls_allowed
# we want to disengage openpilot. However the status from the panda goes through
# another socket other than the CAN messages and one can arrive earlier than the other.
# Therefore we allow a mismatch for two samples, then we trigger the disengagement.
if not self.enabled:
self.mismatch_counter = 0
# All pandas not in silent mode must have controlsAllowed when openpilot is enabled
if self.enabled and any(not ps.controlsAllowed for ps in self.sm['pandaStates']
if ps.safetyModel not in IGNORED_SAFETY_MODES):
self.mismatch_counter += 1
return CS
def state_transition(self, CS):
"""Compute conditional state transitions and execute actions on state transitions"""
self.v_cruise_helper.update_v_cruise(CS, self.enabled, self.is_metric, self.sm['frogpilotPlan'].speedLimitChanged, self.frogpilot_toggles)
# decrement the soft disable timer at every step, as it's reset on
# entrance in SOFT_DISABLING state
self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
self.current_alert_types = [ET.PERMANENT]
# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
if self.state != State.disabled:
# user and immediate disable always have priority in a non-disabled state
if self.events.contains(ET.USER_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.USER_DISABLE)
elif self.events.contains(ET.IMMEDIATE_DISABLE):
self.state = State.disabled
self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
else:
# ENABLED
if self.state == State.enabled:
if self.events.contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# SOFT DISABLING
elif self.state == State.softDisabling:
if not self.events.contains(ET.SOFT_DISABLE):
# no more soft disabling condition, so go back to ENABLED
self.state = State.enabled
elif self.soft_disable_timer > 0:
self.current_alert_types.append(ET.SOFT_DISABLE)
elif self.soft_disable_timer <= 0:
self.state = State.disabled
# PRE ENABLING
elif self.state == State.preEnabled:
if not self.events.contains(ET.PRE_ENABLE):
self.state = State.enabled
else:
self.current_alert_types.append(ET.PRE_ENABLE)
# OVERRIDING
elif self.state == State.overriding:
if self.events.contains(ET.SOFT_DISABLE):
self.state = State.softDisabling
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.current_alert_types.append(ET.SOFT_DISABLE)
elif not (self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL)):
self.state = State.enabled
else:
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
# DISABLED
elif self.state == State.disabled:
if self.events.contains(ET.ENABLE):
if self.events.contains(ET.NO_ENTRY):
self.current_alert_types.append(ET.NO_ENTRY)
self.no_entry_alert_triggered = True
else:
if self.events.contains(ET.PRE_ENABLE):
self.state = State.preEnabled
elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):
self.state = State.overriding
else:
self.state = State.enabled
self.current_alert_types.append(ET.ENABLE)
self.v_cruise_helper.initialize_v_cruise(CS, self.experimental_mode, self.sm['frogpilotPlan'].unconfirmedSlcSpeedLimit, self.frogpilot_toggles)
# Check if openpilot is engaged and actuators are enabled
self.enabled = self.state in ENABLED_STATES
self.active = self.state in ACTIVE_STATES
if self.active or self.always_on_lateral_active:
self.current_alert_types.append(ET.WARNING)
def state_control(self, CS):
"""Given the state, this function returns a CarControl packet"""
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(self.frogpilot_toggles.steer_ratio, 0.1) if self.frogpilot_toggles.use_custom_steer_ratio else max(lp.steerRatio, 0.1)
self.VM.update_params(x, sr)
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
friction = self.frogpilot_toggles.steer_friction if self.frogpilot_toggles.use_custom_steer_friction else torque_params.frictionCoefficientFiltered
lat_accel_factor = self.frogpilot_toggles.steer_lat_accel_factor if self.frogpilot_toggles.use_custom_lat_accel_factor else torque_params.latAccelFactorFiltered
if self.sm.all_checks(['liveTorqueParameters']) and (torque_params.useParams or self.frogpilot_toggles.force_auto_tune) and not self.frogpilot_toggles.force_auto_tune_off:
self.LaC.update_live_torque_params(lat_accel_factor, torque_params.latAccelOffsetFiltered,
friction)
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.enabled
# Check which actuators can be enabled
standstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
CC.latActive = (self.active or self.always_on_lateral_active) and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
(not standstill or self.joystick_mode) and self.sm['frogpilotPlan'].lateralCheck
CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControl
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if CS.leftBlinker or CS.rightBlinker:
self.last_blinker_frame = self.sm.frame
# State specific actions
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
if self.use_old_long:
self.LoC.reset_old_long(v_pid=CS.vEgo)
else:
self.LoC.reset()
if not self.joystick_mode:
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, self.v_cruise_helper.v_cruise_kph * CV.KPH_TO_MS, self.frogpilot_toggles)
if self.frogpilot_toggles.sport_plus:
pid_accel_limits = (pid_accel_limits[0], get_max_allowed_accel(CS.vEgo))
if self.use_old_long:
t_since_plan = (self.sm.frame - self.sm.recv_frame['longitudinalPlan']) * DT_CTRL
actuators.accel = self.LoC.update_old_long(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan)
else:
actuators.accel = self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop or self.sm['frogpilotPlan'].forcingStopLength <= 0, pid_accel_limits)
if len(long_plan.speeds):
actuators.speed = long_plan.speeds[-1]
# Steering PID loop and lateral MPC
self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature)
actuators.curvature = self.desired_curvature
actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited, self.desired_curvature,
self.sm['liveLocationKalman'],
model_data=self.sm['modelV2'], frogpilot_toggles=self.frogpilot_toggles)
else:
lac_log = log.ControlsState.LateralDebugState.new_message()
if self.sm.recv_frame['testJoystick'] > 0:
# reset joystick if it hasn't been received in a while
should_reset_joystick = (self.sm.frame - self.sm.recv_frame['testJoystick'])*DT_CTRL > 0.2
if not should_reset_joystick:
joystick_axes = self.sm['testJoystick'].axes
else:
joystick_axes = [0.0, 0.0]
if CC.longActive:
actuators.accel = 4.0*clip(joystick_axes[0], -1, 1)
if CC.latActive:
steer = clip(joystick_axes[1], -1, 1)
# max angle is 45 for angle-based cars, max curvature is 0.02
actuators.steer, actuators.steeringAngleDeg, actuators.curvature = steer, steer * 90., steer * -0.02
lac_log.active = self.active
lac_log.steeringAngleDeg = CS.steeringAngleDeg
lac_log.output = actuators.steer
lac_log.saturated = abs(actuators.steer) >= 0.9
if CS.steeringPressed:
self.last_steering_pressed_frame = self.sm.frame
recent_steer_pressed = (self.sm.frame - self.last_steering_pressed_frame)*DT_CTRL < 2.0
# Send a "steering required alert" if saturation count has reached the limit
if lac_log.active and not recent_steer_pressed and not self.CP.notCar:
if self.CP.lateralTuning.which() == 'torque' and not self.joystick_mode:
undershooting = abs(lac_log.desiredLateralAccel) / abs(1e-3 + lac_log.actualLateralAccel) > 1.2
turning = abs(lac_log.desiredLateralAccel) > 1.0
good_speed = CS.vEgo > 5
max_torque = abs(self.sm['carOutput'].actuatorsOutput.steer) > 0.99
if undershooting and turning and good_speed and max_torque:
lac_log.active and self.events.add(EventName.goatSteerSaturated if self.frogpilot_toggles.goat_scream_alert else EventName.steerSaturated)
self.steer_saturated_event_triggered = True
else:
self.steer_saturated_event_triggered = False
elif lac_log.saturated:
# TODO probably should not use dpath_points but curvature
dpath_points = model_v2.position.y
if len(dpath_points):
# Check if we deviated from the path
# TODO use desired vs actual curvature
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
steering_value = actuators.steeringAngleDeg
else:
steering_value = actuators.steer
left_deviation = steering_value > 0 and dpath_points[0] < -0.20
right_deviation = steering_value < 0 and dpath_points[0] > 0.20
if left_deviation or right_deviation:
self.events.add(EventName.steerSaturated)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, SupportsFloat):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents) or self.onroad_distance_pressed:
menu_open = self.display_timer > 0 or not self.sm['frogpilotCarState'].hasMenu
if not (self.sm['frogpilotCarState'].distanceLongPressed or self.params_memory.get_bool("OnroadDistanceButtonPressed")) and menu_open:
self.personality = (self.personality - 1) % 3
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
self.display_timer = 350
self.onroad_distance_pressed = self.params_memory.get_bool("OnroadDistanceButtonPressed")
self.display_timer -= 1
FPCC = self.update_frogpilot_variables(CS)
return CC, lac_log, FPCC
def update_frogpilot_variables(self, CS):
self.always_on_lateral_active |= self.frogpilot_toggles.always_on_lateral_main or CS.cruiseState.enabled
self.always_on_lateral_active &= self.frogpilot_toggles.always_on_lateral and CS.cruiseState.available
self.always_on_lateral_active &= CS.gearShifter not in NON_DRIVING_GEARS
self.always_on_lateral_active &= self.sm['frogpilotPlan'].lateralCheck
self.always_on_lateral_active &= self.sm['liveCalibration'].calPerc >= 1
self.always_on_lateral_active &= not (self.frogpilot_toggles.always_on_lateral_lkas and self.sm['frogpilotCarState'].alwaysOnLateralDisabled)
self.always_on_lateral_active &= not (CS.brakePressed and CS.vEgo < self.frogpilot_toggles.always_on_lateral_pause_speed) or CS.standstill
self.always_on_lateral_active = bool(self.always_on_lateral_active)
if self.frogpilot_toggles.conditional_experimental_mode or self.frogpilot_toggles.slc_fallback_experimental_mode:
self.experimental_mode = self.sm['frogpilotPlan'].experimentalMode
if any(be.pressed and be.type == FrogPilotButtonType.lkas for be in CS.buttonEvents):
if self.frogpilot_toggles.experimental_mode_via_lkas and self.enabled:
if self.frogpilot_toggles.conditional_experimental_mode:
conditional_status = self.params_memory.get_int("CEStatus")
override_value = 0 if conditional_status in {1, 2, 3, 4, 5, 6} else 3 if conditional_status >= 7 else 4
self.params_memory.put_int("CEStatus", override_value)
else:
self.experimental_mode = not self.experimental_mode
self.params.put_bool_nonblocking("ExperimentalMode", self.experimental_mode)
if self.sm.frame % 10 == 0 or self.resume_pressed:
self.resume_previously_pressed = self.resume_pressed
FPCC = custom.FrogPilotCarControl.new_message()
FPCC.alwaysOnLateralActive = self.always_on_lateral_active
FPCC.fcwEventTriggered = self.fcw_event_triggered
FPCC.noEntryEventTriggered = self.no_entry_alert_triggered
FPCC.resumePressed = self.resume_previously_pressed
FPCC.steerSaturatedEventTriggered = self.steer_saturated_event_triggered
return FPCC
def publish_logs(self, CS, start_time, CC, lac_log, FPCC):
"""Send actuators and hud commands to the car, send controlsstate and MPC logging"""
# Orientation and angle rates can be useful for carcontroller
# Only calibrated (car) frame is relevant for the carcontroller
orientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value)
if len(orientation_value) > 2:
CC.orientationNED = orientation_value
angular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value)
if len(angular_rate_value) > 2:
CC.angularVelocity = angular_rate_value
CC.cruiseControl.override = self.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)
if self.joystick_mode and self.sm.recv_frame['testJoystick'] > 0 and self.sm['testJoystick'].buttons[0]:
CC.cruiseControl.cancel = True
speeds = self.sm['longitudinalPlan'].speeds
if len(speeds):
CC.cruiseControl.resume = self.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
hudControl = CC.hudControl
hudControl.setSpeed = float(self.v_cruise_helper.v_cruise_cluster_kph * CV.KPH_TO_MS)
hudControl.speedVisible = self.enabled
hudControl.lanesVisible = self.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.personality + 1
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
recent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \
and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated
model_v2 = self.sm['modelV2']
desire_prediction = model_v2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = model_v2.laneLineProbs[2] > 0.5
left_lane_visible = model_v2.laneLineProbs[1] > 0.5
l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]
r_lane_change_prob = desire_prediction[Desire.laneChangeRight]
lane_lines = model_v2.laneLines
l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))
r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))
hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)
hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)
if hudControl.rightLaneDepart or hudControl.leftLaneDepart:
self.events.add(EventName.ldw)
clear_event_types = set()
if ET.WARNING not in self.current_alert_types:
clear_event_types.add(ET.WARNING)
if self.enabled:
clear_event_types.add(ET.NO_ENTRY)
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer, self.frogpilot_toggles])
self.AM.add_many(self.sm.frame, alerts)
current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)
if current_alert:
hudControl.visualAlert = current_alert.visual_alert
if not self.CP.passive and self.initialized:
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2
force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \
(self.state == State.softDisabling)
# Curvature & Steering angle
lp = self.sm['liveParameters']
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
controlsState = dat.controlsState
if current_alert:
controlsState.alertText1 = current_alert.alert_text_1
controlsState.alertText2 = current_alert.alert_text_2
controlsState.alertSize = current_alert.alert_size
controlsState.alertStatus = current_alert.alert_status
controlsState.alertBlinkingRate = current_alert.alert_rate
controlsState.alertType = current_alert.alert_type
controlsState.alertSound = current_alert.audible_alert
controlsState.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
controlsState.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
controlsState.enabled = self.enabled
controlsState.active = self.active
controlsState.curvature = curvature
controlsState.desiredCurvature = self.desired_curvature
controlsState.state = self.state
controlsState.engageable = not self.events.contains(ET.NO_ENTRY)
controlsState.longControlState = self.LoC.long_control_state
controlsState.vPid = float(self.LoC.v_pid)
controlsState.vCruise = float(self.v_cruise_helper.v_cruise_kph)
controlsState.vCruiseCluster = float(self.v_cruise_helper.v_cruise_cluster_kph)
controlsState.upAccelCmd = float(self.LoC.pid.p)
controlsState.uiAccelCmd = float(self.LoC.pid.i)
controlsState.ufAccelCmd = float(self.LoC.pid.f)
controlsState.cumLagMs = -self.rk.remaining * 1000.
controlsState.startMonoTime = int(start_time * 1e9)
controlsState.forceDecel = bool(force_decel)
controlsState.experimentalMode = self.experimental_mode
controlsState.personality = self.personality
lat_tuning = self.CP.lateralTuning.which()
if self.joystick_mode:
controlsState.lateralControlState.debugState = lac_log
elif self.CP.steerControlType == car.CarParams.SteerControlType.angle:
controlsState.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
controlsState.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
controlsState.lateralControlState.torqueState = lac_log
self.pm.send('controlsState', dat)
# onroadEvents - logged every second or on change
if (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev):
ce_send = messaging.new_message('onroadEvents', len(self.events))
ce_send.valid = True
ce_send.onroadEvents = self.events.to_msg()
self.pm.send('onroadEvents', ce_send)
self.events_prev = self.events.names.copy()
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
# frogpilotCarControl
fpcc_send = messaging.new_message('frogpilotCarControl')
fpcc_send.valid = CS.canValid
fpcc_send.frogpilotCarControl = FPCC
self.pm.send('frogpilotCarControl', fpcc_send)
def step(self):
start_time = time.monotonic()
# Sample data from sockets and get a carState
CS = self.data_sample()
cloudlog.timestamp("Data sampled")
self.update_events(CS)
cloudlog.timestamp("Events updated")
if not self.CP.passive and self.initialized:
# Update control state
self.state_transition(CS)
# Compute actuators (runs PID loops and lateral MPC)
CC, lac_log, FPCC = self.state_control(CS)
# Publish data
self.publish_logs(CS, start_time, CC, lac_log, FPCC)
self.CS_prev = CS
def read_personality_param(self):
try:
return int(self.params.get('LongitudinalPersonality'))
except (ValueError, TypeError):
return log.LongitudinalPersonality.standard
def params_thread(self, evt):
while not evt.is_set():
self.is_metric = self.params.get_bool("IsMetric")
self.experimental_mode = self.params.get_bool("ExperimentalMode") and self.CP.openpilotLongitudinalControl
self.personality = self.read_personality_param()
if self.CP.notCar:
self.joystick_mode = self.params.get_bool("JoystickDebugMode")
time.sleep(0.1)
# Update FrogPilot parameters
if self.sm['frogpilotPlan'].togglesUpdated:
self.frogpilot_toggles = get_frogpilot_toggles()
def controlsd_thread(self):
e = threading.Event()
t = threading.Thread(target=self.params_thread, args=(e, ))
try:
t.start()
while True:
self.step()
self.rk.monitor_time()
except SystemExit:
e.set()
t.join()
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.controlsd_thread()
if __name__ == "__main__":
main()