controlsd: pull out LDW (#33479)

* controlsd: pull out LDW

* cleanup

* good ol mypy
This commit is contained in:
Adeeb Shihadeh 2024-09-04 17:45:09 -07:00 committed by GitHub
parent 77f4f57e73
commit 73d31d50c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 57 additions and 34 deletions

View File

@ -22,6 +22,7 @@ from openpilot.common.gps import get_gps_location_service
from opendbc.car.car_helpers import get_car_interface
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature, get_startup_event
from openpilot.selfdrive.controls.lib.ldw import LaneDepartureWarning
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
@ -34,10 +35,6 @@ from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
from openpilot.system.hardware import HARDWARE
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
CAMERA_OFFSET = 0.04
JOYSTICK_MAX_LAT_ACCEL = 2.5 # m/s^2
REPLAY = "REPLAY" in os.environ
@ -48,7 +45,6 @@ IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}
ThermalStatus = log.DeviceState.ThermalStatus
State = log.SelfdriveState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.Desire
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
EventName = car.OnroadEvent.EventName
@ -130,6 +126,8 @@ class Controls:
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.ldw = LaneDepartureWarning()
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI)
@ -143,7 +141,6 @@ class Controls:
self.active = False
self.mismatch_counter = 0
self.cruise_mismatch_counter = 0
self.last_blinker_frame = 0
self.last_steering_pressed_frame = 0
self.distance_traveled = 0
self.last_functional_fan_frame = 0
@ -382,6 +379,13 @@ class Controls:
if self.sm['modelV2'].frameDropPerc > 20:
self.events.add(EventName.modeldLagging)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents):
self.personality = (self.personality - 1) % 3
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
self.events.add(EventName.personalityChanged)
def data_sample(self):
"""Receive data from sockets"""
@ -557,13 +561,6 @@ class Controls:
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
# decrement personality on distance button press
if self.CP.openpilotLongitudinalControl:
if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents):
self.personality = (self.personality - 1) % 3
self.params.put_nonblocking('LongitudinalPersonality', str(self.personality))
self.events.add(EventName.personalityChanged)
return CC, lac_log
def publish_logs(self, CS, CC, lac_log):
@ -594,27 +591,12 @@ class Controls:
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
recent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \
and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated
model_v2 = self.sm['modelV2']
desire_prediction = model_v2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = model_v2.laneLineProbs[2] > 0.5
left_lane_visible = model_v2.laneLineProbs[1] > 0.5
l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]
r_lane_change_prob = desire_prediction[Desire.laneChangeRight]
lane_lines = model_v2.laneLines
l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))
r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))
hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)
hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)
if hudControl.rightLaneDepart or hudControl.leftLaneDepart:
self.events.add(EventName.ldw)
self.ldw.update(self.sm.frame, self.sm['modelV2'], CS, CC)
if self.is_ldw_enabled and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibrated:
hudControl.leftLaneDepart = self.ldw.left
hudControl.rightLaneDepart = self.ldw.right
if self.ldw.warning:
self.events.add(EventName.ldw)
clear_event_types = set()
if ET.WARNING not in self.state_machine.current_alert_types:

View File

@ -0,0 +1,41 @@
from cereal import log
from openpilot.common.realtime import DT_CTRL
from openpilot.common.conversions import Conversions as CV
CAMERA_OFFSET = 0.04
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
class LaneDepartureWarning:
def __init__(self):
self.left = False
self.right = False
self.last_blinker_frame = 0
def update(self, frame, modelV2, CS, CC):
if CS.leftBlinker or CS.rightBlinker:
self.last_blinker_frame = frame
recent_blinker = (frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = CS.vEgo > LDW_MIN_SPEED and not recent_blinker and not CC.latActive
desire_prediction = modelV2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = modelV2.laneLineProbs[2] > 0.5
left_lane_visible = modelV2.laneLineProbs[1] > 0.5
l_lane_change_prob = desire_prediction[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_prediction[log.Desire.laneChangeRight]
lane_lines = modelV2.laneLines
l_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))
r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))
self.left = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)
self.right = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)
else:
self.left, self.right = False, False
@property
def warning(self) -> bool:
return bool(self.left or self.right)