DEC: Kalman and the Magic Mode Switcheroo (#1026)
* dec with kalmanfilter * updat pytest * Update sunnypilot/selfdrive/controls/lib/dec/constants.py Co-authored-by: DevTekVE <devtekve@gmail.com> --------- Co-authored-by: DevTekVE <devtekve@gmail.com>
This commit is contained in:
@@ -1,25 +1,17 @@
|
||||
class WMACConstants:
|
||||
LEAD_WINDOW_SIZE = 5
|
||||
LEAD_PROB = 0.5
|
||||
# Lead detection parameters
|
||||
LEAD_WINDOW_SIZE = 6 # Stable detection window
|
||||
LEAD_PROB = 0.45 # Balanced threshold for lead detection
|
||||
|
||||
SLOW_DOWN_WINDOW_SIZE = 5
|
||||
SLOW_DOWN_PROB = 0.6
|
||||
# Slow down detection parameters
|
||||
SLOW_DOWN_WINDOW_SIZE = 5 # Responsive but stable
|
||||
SLOW_DOWN_PROB = 0.3 # Balanced threshold for slow down scenarios
|
||||
|
||||
# Optimized slow down distance curve - smooth and progressive
|
||||
SLOW_DOWN_BP = [0., 10., 20., 30., 40., 50., 55., 60.]
|
||||
SLOW_DOWN_DIST = [25., 38., 55., 75., 95., 115., 130., 150.]
|
||||
SLOW_DOWN_DIST = [32., 46., 64., 86., 108., 130., 145., 165.]
|
||||
|
||||
SLOWNESS_WINDOW_SIZE = 12
|
||||
SLOWNESS_PROB = 0.5
|
||||
SLOWNESS_CRUISE_OFFSET = 1.05
|
||||
|
||||
DANGEROUS_TTC_WINDOW_SIZE = 3
|
||||
DANGEROUS_TTC = 2.3
|
||||
|
||||
MPC_FCW_WINDOW_SIZE = 10
|
||||
MPC_FCW_PROB = 0.5
|
||||
|
||||
|
||||
class SNG_State:
|
||||
off = 0
|
||||
stopped = 1
|
||||
going = 2
|
||||
# Slowness detection parameters
|
||||
SLOWNESS_WINDOW_SIZE = 10 # Stable slowness detection
|
||||
SLOWNESS_PROB = 0.55 # Clear threshold for slowness
|
||||
SLOWNESS_CRUISE_OFFSET = 1.025 # Conservative cruise speed offset
|
||||
|
||||
@@ -1,88 +1,133 @@
|
||||
# The MIT License
|
||||
#
|
||||
# Copyright (c) 2019-, Rick Lan, dragonpilot community, and a number of other of contributors.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
#
|
||||
# Version = 2025-1-18
|
||||
"""
|
||||
Copyright (c) 2021-, rav4kumar, Haibin Wen, sunnypilot, and a number of other contributors.
|
||||
|
||||
import numpy as np
|
||||
This file is part of sunnypilot and is licensed under the MIT License.
|
||||
See the LICENSE.md file in the root directory for more details.
|
||||
"""
|
||||
# Version = 2025-6-30
|
||||
|
||||
from cereal import messaging
|
||||
from opendbc.car import structs
|
||||
from numpy import interp
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.realtime import DT_MDL
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants, SNG_State
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants
|
||||
from typing import Literal
|
||||
|
||||
# d-e2e, from modeldata.h
|
||||
TRAJECTORY_SIZE = 33
|
||||
SET_MODE_TIMEOUT = 15
|
||||
|
||||
HIGHWAY_CRUISE_KPH = 70
|
||||
|
||||
STOP_AND_GO_FRAME = 60
|
||||
|
||||
SET_MODE_TIMEOUT = 10
|
||||
|
||||
V_ACC_MIN = 9.72
|
||||
# Define the valid mode types
|
||||
ModeType = Literal['acc', 'blended']
|
||||
|
||||
|
||||
class GenericMovingAverageCalculator:
|
||||
def __init__(self, window_size):
|
||||
self.window_size = window_size
|
||||
self.data = []
|
||||
self.total = 0
|
||||
class SmoothKalmanFilter:
|
||||
"""Enhanced Kalman filter with smoothing for stable decision making."""
|
||||
|
||||
def add_data(self, value: float) -> None:
|
||||
if len(self.data) == self.window_size:
|
||||
self.total -= self.data.pop(0)
|
||||
self.data.append(value)
|
||||
self.total += value
|
||||
def __init__(self, initial_value=0, measurement_noise=0.1, process_noise=0.01,
|
||||
alpha=1.0, smoothing_factor=0.85):
|
||||
self.x = initial_value
|
||||
self.P = 1.0
|
||||
self.R = measurement_noise
|
||||
self.Q = process_noise
|
||||
self.alpha = alpha
|
||||
self.smoothing_factor = smoothing_factor
|
||||
self.initialized = False
|
||||
self.history = []
|
||||
self.max_history = 10
|
||||
self.confidence = 0.0
|
||||
|
||||
def get_moving_average(self) -> float | None:
|
||||
return None if len(self.data) == 0 else self.total / len(self.data)
|
||||
def add_data(self, measurement):
|
||||
if len(self.history) >= self.max_history:
|
||||
self.history.pop(0)
|
||||
self.history.append(measurement)
|
||||
|
||||
def reset_data(self) -> None:
|
||||
self.data = []
|
||||
self.total = 0
|
||||
if not self.initialized:
|
||||
self.x = measurement
|
||||
self.initialized = True
|
||||
self.confidence = 0.1
|
||||
return
|
||||
|
||||
self.P = self.alpha * self.P + self.Q
|
||||
|
||||
K = self.P / (self.P + self.R)
|
||||
effective_K = K * (1.0 - self.smoothing_factor) + self.smoothing_factor * 0.1
|
||||
|
||||
innovation = measurement - self.x
|
||||
self.x = self.x + effective_K * innovation
|
||||
self.P = (1 - effective_K) * self.P
|
||||
|
||||
if abs(innovation) < 0.1:
|
||||
self.confidence = min(1.0, self.confidence + 0.05)
|
||||
else:
|
||||
self.confidence = max(0.1, self.confidence - 0.02)
|
||||
|
||||
def get_value(self):
|
||||
return self.x if self.initialized else None
|
||||
|
||||
def get_confidence(self):
|
||||
return self.confidence
|
||||
|
||||
def reset_data(self):
|
||||
self.initialized = False
|
||||
self.history = []
|
||||
self.confidence = 0.0
|
||||
|
||||
|
||||
class WeightedMovingAverageCalculator:
|
||||
def __init__(self, window_size):
|
||||
self.window_size = window_size
|
||||
self.data = []
|
||||
self.weights = np.linspace(1, 3, window_size) # Linear weights, adjust as needed
|
||||
class ModeTransitionManager:
|
||||
"""Manages smooth transitions between driving modes with hysteresis."""
|
||||
|
||||
def add_data(self, value: float) -> None:
|
||||
if len(self.data) == self.window_size:
|
||||
self.data.pop(0)
|
||||
self.data.append(value)
|
||||
def __init__(self):
|
||||
self.current_mode: ModeType = 'acc'
|
||||
self.mode_confidence = {'acc': 1.0, 'blended': 0.0}
|
||||
self.transition_timeout = 0
|
||||
self.min_mode_duration = 10
|
||||
self.mode_duration = 0
|
||||
self.emergency_override = False
|
||||
|
||||
def get_weighted_average(self) -> float | None:
|
||||
if len(self.data) == 0:
|
||||
return None
|
||||
weighted_sum: float = float(np.dot(self.data, self.weights[-len(self.data):]))
|
||||
weight_total: float = float(np.sum(self.weights[-len(self.data):]))
|
||||
return weighted_sum / weight_total
|
||||
def request_mode(self, mode: ModeType, confidence: float = 1.0, emergency: bool = False):
|
||||
# Emergency override for critical situations (stops, collisions)
|
||||
if emergency:
|
||||
self.emergency_override = True
|
||||
self.current_mode = mode
|
||||
self.transition_timeout = SET_MODE_TIMEOUT
|
||||
self.mode_duration = 0
|
||||
return
|
||||
|
||||
def reset_data(self) -> None:
|
||||
self.data = []
|
||||
self.mode_confidence[mode] = min(1.0, self.mode_confidence[mode] + 0.1 * confidence)
|
||||
for m in self.mode_confidence:
|
||||
if m != mode:
|
||||
self.mode_confidence[m] = max(0.0, self.mode_confidence[m] - 0.05)
|
||||
|
||||
# Require minimum duration in current mode (unless emergency)
|
||||
if self.mode_duration < self.min_mode_duration and not self.emergency_override:
|
||||
return
|
||||
|
||||
# Hysteresis: higher threshold for mode changes
|
||||
confidence_threshold = 0.6 if mode != self.current_mode else 0.3 # Lower threshold for faster response
|
||||
|
||||
if self.mode_confidence[mode] > confidence_threshold:
|
||||
if mode != self.current_mode and self.transition_timeout == 0:
|
||||
self.transition_timeout = SET_MODE_TIMEOUT
|
||||
self.current_mode = mode
|
||||
self.mode_duration = 0
|
||||
|
||||
def update(self):
|
||||
if self.transition_timeout > 0:
|
||||
self.transition_timeout -= 1
|
||||
self.mode_duration += 1
|
||||
|
||||
# Reset emergency override after some time
|
||||
if self.emergency_override and self.mode_duration > 20:
|
||||
self.emergency_override = False
|
||||
|
||||
# Gradual confidence decay
|
||||
for mode in self.mode_confidence:
|
||||
self.mode_confidence[mode] *= 0.98
|
||||
|
||||
def get_mode(self) -> ModeType:
|
||||
return self.current_mode
|
||||
|
||||
|
||||
class DynamicExperimentalController:
|
||||
@@ -92,51 +137,59 @@ class DynamicExperimentalController:
|
||||
self._params = params or Params()
|
||||
self._enabled: bool = self._params.get_bool("DynamicExperimentalControl")
|
||||
self._active: bool = False
|
||||
self._mode: str = 'acc'
|
||||
self._frame: int = 0
|
||||
self._urgency = 0.0
|
||||
|
||||
# Use weighted moving average for filtering leads
|
||||
self._lead_gmac = WeightedMovingAverageCalculator(window_size=WMACConstants.LEAD_WINDOW_SIZE)
|
||||
self._mode_manager = ModeTransitionManager()
|
||||
|
||||
# Smooth filters for stable decision making with faster response for critical scenarios
|
||||
self._lead_filter = SmoothKalmanFilter(
|
||||
measurement_noise=0.15,
|
||||
process_noise=0.05,
|
||||
alpha=1.02,
|
||||
smoothing_factor=0.8
|
||||
)
|
||||
|
||||
self._slow_down_filter = SmoothKalmanFilter(
|
||||
measurement_noise=0.1,
|
||||
process_noise=0.1,
|
||||
alpha=1.05,
|
||||
smoothing_factor=0.7
|
||||
)
|
||||
|
||||
self._slowness_filter = SmoothKalmanFilter(
|
||||
measurement_noise=0.1,
|
||||
process_noise=0.06,
|
||||
alpha=1.015,
|
||||
smoothing_factor=0.92
|
||||
)
|
||||
|
||||
self._mpc_fcw_filter = SmoothKalmanFilter(
|
||||
measurement_noise=0.2,
|
||||
process_noise=0.1,
|
||||
alpha=1.1,
|
||||
smoothing_factor=0.5
|
||||
)
|
||||
self._has_lead_filtered = False
|
||||
self._has_lead_filtered_prev = False
|
||||
|
||||
self._slow_down_gmac = WeightedMovingAverageCalculator(window_size=WMACConstants.SLOW_DOWN_WINDOW_SIZE)
|
||||
self._has_slow_down: bool = False
|
||||
self._slow_down_confidence: float = 0.0
|
||||
|
||||
self._has_blinkers = False
|
||||
|
||||
self._slowness_gmac = WeightedMovingAverageCalculator(window_size=WMACConstants.SLOWNESS_WINDOW_SIZE)
|
||||
self._has_slowness: bool = False
|
||||
|
||||
self._has_nav_instruction = False
|
||||
|
||||
self._dangerous_ttc_gmac = WeightedMovingAverageCalculator(window_size=WMACConstants.DANGEROUS_TTC_WINDOW_SIZE)
|
||||
self._has_dangerous_ttc: bool = False
|
||||
|
||||
self._v_ego_kph = 0.
|
||||
self._v_cruise_kph = 0.
|
||||
|
||||
self._has_lead = False
|
||||
|
||||
self._has_slow_down = False
|
||||
self._has_slowness = False
|
||||
self._has_mpc_fcw = False
|
||||
self._v_ego_kph = 0.0
|
||||
self._v_cruise_kph = 0.0
|
||||
self._has_standstill = False
|
||||
self._has_standstill_prev = False
|
||||
|
||||
self._sng_transit_frame = 0
|
||||
self._sng_state = SNG_State.off
|
||||
|
||||
self._mpc_fcw_gmac = WeightedMovingAverageCalculator(window_size=WMACConstants.MPC_FCW_WINDOW_SIZE)
|
||||
self._has_mpc_fcw: bool = False
|
||||
self._mpc_fcw_crash_cnt = 0
|
||||
|
||||
self._set_mode_timeout = 0
|
||||
self._standstill_count = 0
|
||||
# debug
|
||||
self._endpoint_x = float('inf')
|
||||
self._expected_distance = 0.0
|
||||
self._trajectory_valid = False
|
||||
|
||||
def _read_params(self) -> None:
|
||||
if self._frame % int(1. / DT_MDL) == 0:
|
||||
self._enabled = self._params.get_bool("DynamicExperimentalControl")
|
||||
|
||||
def mode(self) -> str:
|
||||
return str(self._mode)
|
||||
return self._mode_manager.get_mode()
|
||||
|
||||
def enabled(self) -> bool:
|
||||
return self._enabled
|
||||
@@ -144,46 +197,9 @@ class DynamicExperimentalController:
|
||||
def active(self) -> bool:
|
||||
return self._active
|
||||
|
||||
@staticmethod
|
||||
def _anomaly_detection(recent_data: list[float], threshold: float = 2.0, context_check: bool = True) -> bool:
|
||||
"""
|
||||
Basic anomaly detection using standard deviation.
|
||||
"""
|
||||
if len(recent_data) < 5:
|
||||
return False
|
||||
mean: float = float(np.mean(recent_data))
|
||||
std_dev: float = float(np.std(recent_data))
|
||||
anomaly: bool = bool(recent_data[-1] > mean + threshold * std_dev)
|
||||
|
||||
# Context check to ensure repeated anomaly
|
||||
if context_check:
|
||||
return np.count_nonzero(np.array(recent_data) > mean + threshold * std_dev) > 1
|
||||
return anomaly
|
||||
|
||||
def _adaptive_slowdown_threshold(self) -> float:
|
||||
"""
|
||||
Adapts the slow-down threshold based on vehicle speed and recent behavior.
|
||||
"""
|
||||
slowdown_scaling_factor: float = (1.0 + 0.03 * np.log(1 + len(self._slow_down_gmac.data)))
|
||||
adaptive_threshold: float = float(
|
||||
interp(self._v_ego_kph, WMACConstants.SLOW_DOWN_BP, WMACConstants.SLOW_DOWN_DIST) * slowdown_scaling_factor
|
||||
)
|
||||
return adaptive_threshold
|
||||
|
||||
def _smoothed_lead_detection(self, lead_prob: float, smoothing_factor: float = 0.2):
|
||||
"""
|
||||
Smoothing the lead detection to avoid erratic behavior.
|
||||
"""
|
||||
lead_filtering: float = (1 - smoothing_factor) * self._has_lead_filtered + smoothing_factor * lead_prob
|
||||
return lead_filtering > WMACConstants.LEAD_PROB
|
||||
|
||||
def _adaptive_lead_prob_threshold(self) -> float:
|
||||
"""
|
||||
Adapts lead probability threshold based on driving conditions.
|
||||
"""
|
||||
if self._v_ego_kph > HIGHWAY_CRUISE_KPH:
|
||||
return float(WMACConstants.LEAD_PROB + 0.1) # Increase the threshold on highways
|
||||
return float(WMACConstants.LEAD_PROB)
|
||||
def set_mpc_fcw_crash_cnt(self) -> None:
|
||||
"""Set MPC FCW crash count"""
|
||||
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
|
||||
|
||||
def _update_calculations(self, sm: messaging.SubMaster) -> None:
|
||||
car_state = sm['carState']
|
||||
@@ -192,186 +208,168 @@ class DynamicExperimentalController:
|
||||
|
||||
self._v_ego_kph = car_state.vEgo * 3.6
|
||||
self._v_cruise_kph = car_state.vCruise
|
||||
self._has_lead = lead_one.status
|
||||
self._has_standstill = car_state.standstill
|
||||
|
||||
# fcw detection
|
||||
self._mpc_fcw_gmac.add_data(self._mpc_fcw_crash_cnt > 0)
|
||||
if _mpc_fcw_weighted_average := self._mpc_fcw_gmac.get_weighted_average():
|
||||
self._has_mpc_fcw = _mpc_fcw_weighted_average > WMACConstants.MPC_FCW_PROB
|
||||
else:
|
||||
self._has_mpc_fcw = False
|
||||
|
||||
# nav enable detection
|
||||
# self._has_nav_instruction = md.navEnabledDEPRECATED and maneuver_distance / max(car_state.vEgo, 1) < 13
|
||||
|
||||
# lead detection with smoothing
|
||||
self._lead_gmac.add_data(lead_one.status)
|
||||
self._has_lead_filtered = (self._lead_gmac.get_weighted_average() or -1.) > WMACConstants.LEAD_PROB
|
||||
#lead_prob = self._lead_gmac.get_weighted_average() or 0
|
||||
#self._has_lead_filtered = self._smoothed_lead_detection(lead_prob)
|
||||
|
||||
# adaptive slow down detection
|
||||
adaptive_threshold = self._adaptive_slowdown_threshold()
|
||||
slow_down_trigger = len(md.orientation.x) == len(md.position.x) == TRAJECTORY_SIZE and md.position.x[TRAJECTORY_SIZE - 1] < adaptive_threshold
|
||||
self._slow_down_gmac.add_data(slow_down_trigger)
|
||||
if _has_slow_down_weighted_average := self._slow_down_gmac.get_weighted_average():
|
||||
self._has_slow_down = _has_slow_down_weighted_average > WMACConstants.SLOW_DOWN_PROB
|
||||
self._slow_down_confidence = _has_slow_down_weighted_average # Store confidence level
|
||||
else:
|
||||
self._has_slow_down = False
|
||||
self._slow_down_confidence = 0.0 # No confidence if no slowdown
|
||||
|
||||
# anomaly detection for slow down events
|
||||
if self._anomaly_detection(self._slow_down_gmac.data):
|
||||
self._slow_down_confidence *= 0.85 # Reduce confidence
|
||||
self._has_slow_down = self._slow_down_confidence > WMACConstants.SLOW_DOWN_PROB
|
||||
|
||||
# blinker detection
|
||||
self._has_blinkers = car_state.leftBlinker or car_state.rightBlinker
|
||||
|
||||
# sng detection
|
||||
# standstill detection
|
||||
if self._has_standstill:
|
||||
self._sng_state = SNG_State.stopped
|
||||
self._sng_transit_frame = 0
|
||||
self._standstill_count = min(20, self._standstill_count + 1)
|
||||
else:
|
||||
if self._sng_transit_frame == 0:
|
||||
if self._sng_state == SNG_State.stopped:
|
||||
self._sng_state = SNG_State.going
|
||||
self._sng_transit_frame = STOP_AND_GO_FRAME
|
||||
elif self._sng_state == SNG_State.going:
|
||||
self._sng_state = SNG_State.off
|
||||
elif self._sng_transit_frame > 0:
|
||||
self._sng_transit_frame -= 1
|
||||
self._standstill_count = max(0, self._standstill_count - 1)
|
||||
|
||||
# slowness detection
|
||||
if not self._has_standstill:
|
||||
self._slowness_gmac.add_data(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
|
||||
if _slowness_weighted_average := self._slowness_gmac.get_weighted_average():
|
||||
self._has_slowness = _slowness_weighted_average > WMACConstants.SLOWNESS_PROB
|
||||
else:
|
||||
self._has_slowness = False
|
||||
# Lead detection
|
||||
self._lead_filter.add_data(float(lead_one.status))
|
||||
lead_value = self._lead_filter.get_value() or 0.0
|
||||
self._has_lead_filtered = lead_value > WMACConstants.LEAD_PROB
|
||||
|
||||
# dangerous TTC detection
|
||||
if not self._has_lead_filtered and self._has_lead_filtered_prev:
|
||||
self._dangerous_ttc_gmac.reset_data()
|
||||
self._has_dangerous_ttc = False
|
||||
# MPC FCW detection
|
||||
fcw_filtered_value = self._mpc_fcw_filter.get_value() or 0.0
|
||||
self._mpc_fcw_filter.add_data(float(self._mpc_fcw_crash_cnt > 0))
|
||||
self._has_mpc_fcw = fcw_filtered_value > 0.5
|
||||
|
||||
if self._has_lead and car_state.vEgo >= 0.01:
|
||||
self._dangerous_ttc_gmac.add_data(lead_one.dRel / car_state.vEgo)
|
||||
# Slow down detection
|
||||
self._calculate_slow_down(md)
|
||||
|
||||
if _dangerous_ttc_weighted_average := self._dangerous_ttc_gmac.get_weighted_average():
|
||||
self._has_dangerous_ttc = _dangerous_ttc_weighted_average <= WMACConstants.DANGEROUS_TTC
|
||||
else:
|
||||
self._has_dangerous_ttc = False
|
||||
# Slowness detection
|
||||
if not (self._standstill_count > 5) and not self._has_slow_down:
|
||||
current_slowness = float(self._v_ego_kph <= (self._v_cruise_kph * WMACConstants.SLOWNESS_CRUISE_OFFSET))
|
||||
self._slowness_filter.add_data(current_slowness)
|
||||
slowness_value = self._slowness_filter.get_value() or 0.0
|
||||
|
||||
# keep prev values
|
||||
self._has_standstill_prev = self._has_standstill
|
||||
self._has_lead_filtered_prev = self._has_lead_filtered
|
||||
# Hysteresis for slowness
|
||||
threshold = WMACConstants.SLOWNESS_PROB * (0.8 if self._has_slowness else 1.1)
|
||||
self._has_slowness = slowness_value > threshold
|
||||
|
||||
def _calculate_slow_down(self, md):
|
||||
"""Calculate urgency based on trajectory endpoint vs expected distance."""
|
||||
|
||||
# Reset to safe defaults
|
||||
urgency = 0.0
|
||||
self._endpoint_x = float('inf')
|
||||
self._trajectory_valid = False
|
||||
|
||||
#Require exact trajectory size
|
||||
position_valid = len(md.position.x) == TRAJECTORY_SIZE
|
||||
orientation_valid = len(md.orientation.x) == TRAJECTORY_SIZE
|
||||
|
||||
if not (position_valid and orientation_valid):
|
||||
# Invalid trajectory - this itself might indicate a stop scenario
|
||||
# Apply moderate urgency for incomplete trajectories at speed
|
||||
if self._v_ego_kph > 20.0:
|
||||
urgency = 0.3
|
||||
|
||||
self._slow_down_filter.add_data(urgency)
|
||||
urgency_filtered = self._slow_down_filter.get_value() or 0.0
|
||||
self._has_slow_down = urgency_filtered > WMACConstants.SLOW_DOWN_PROB
|
||||
self._urgency = urgency_filtered
|
||||
return
|
||||
|
||||
# We have a valid full trajectory
|
||||
self._trajectory_valid = True
|
||||
|
||||
# Use the exact endpoint (33rd point, index 32)
|
||||
endpoint_x = md.position.x[TRAJECTORY_SIZE - 1]
|
||||
self._endpoint_x = endpoint_x
|
||||
|
||||
# Get expected distance based on current speed using tuned constants
|
||||
expected_distance = interp(self._v_ego_kph,
|
||||
WMACConstants.SLOW_DOWN_BP,
|
||||
WMACConstants.SLOW_DOWN_DIST)
|
||||
self._expected_distance = expected_distance
|
||||
|
||||
# Calculate urgency based on trajectory shortage
|
||||
if endpoint_x < expected_distance:
|
||||
shortage = expected_distance - endpoint_x
|
||||
shortage_ratio = shortage / expected_distance
|
||||
|
||||
# Base urgency on shortage ratio
|
||||
urgency = min(1.0, shortage_ratio * 2.0)
|
||||
|
||||
# Increase urgency for very short trajectories (imminent stops)
|
||||
critical_distance = expected_distance * 0.3
|
||||
if endpoint_x < critical_distance:
|
||||
urgency = min(1.0, urgency * 2.0)
|
||||
|
||||
# Speed-based urgency adjustment
|
||||
if self._v_ego_kph > 25.0:
|
||||
speed_factor = 1.0 + (self._v_ego_kph - 25.0) / 80.0
|
||||
urgency = min(1.0, urgency * speed_factor)
|
||||
|
||||
# Apply filtering but with less smoothing for stops
|
||||
self._slow_down_filter.add_data(urgency)
|
||||
urgency_filtered = self._slow_down_filter.get_value() or 0.0
|
||||
|
||||
# Update state with lower threshold for better stop detection
|
||||
self._has_slow_down = urgency_filtered > (WMACConstants.SLOW_DOWN_PROB * 0.8)
|
||||
self._urgency = urgency_filtered
|
||||
|
||||
def _radarless_mode(self) -> None:
|
||||
# when mpc fcw crash prob is high
|
||||
# use blended to slow down quickly
|
||||
"""Radarless mode decision logic with emergency handling."""
|
||||
|
||||
# EMERGENCY: MPC FCW - immediate blended mode
|
||||
if self._has_mpc_fcw:
|
||||
self._set_mode('blended')
|
||||
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
|
||||
return
|
||||
|
||||
# Nav enabled and distance to upcoming turning is 300 or below
|
||||
# if self._has_nav_instruction:
|
||||
# self._set_mode('blended')
|
||||
# return
|
||||
|
||||
# when blinker is on and speed is driving below V_ACC_MIN: blended
|
||||
# we don't want it to switch mode at higher speed, blended may trigger hard brake
|
||||
# if self._has_blinkers and self._v_ego_kph < V_ACC_MIN:
|
||||
# self._set_mode('blended')
|
||||
# return
|
||||
|
||||
# when at highway cruise and SNG: blended
|
||||
# ensuring blended mode is used because acc is bad at catching SNG lead car
|
||||
# especially those who accel very fast and then brake very hard.
|
||||
# if self._sng_state == SNG_State.going and self._v_cruise_kph >= V_ACC_MIN:
|
||||
# self._set_mode('blended')
|
||||
# return
|
||||
|
||||
# when standstill: blended
|
||||
# in case of lead car suddenly move away under traffic light, acc mode won't brake at traffic light.
|
||||
if self._has_standstill:
|
||||
self._set_mode('blended')
|
||||
# Standstill: use blended
|
||||
if self._standstill_count > 3:
|
||||
self._mode_manager.request_mode('blended', confidence=0.9)
|
||||
return
|
||||
|
||||
# when detecting slow down scenario: blended
|
||||
# e.g. traffic light, curve, stop sign etc.
|
||||
# Slow down scenarios: emergency for high urgency, normal for lower urgency
|
||||
if self._has_slow_down:
|
||||
self._set_mode('blended')
|
||||
if self._urgency > 0.7:
|
||||
# Emergency: immediate blended mode for high urgency stops
|
||||
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
|
||||
else:
|
||||
# Normal: blended with urgency-based confidence
|
||||
confidence = min(1.0, self._urgency * 1.5)
|
||||
self._mode_manager.request_mode('blended', confidence=confidence)
|
||||
return
|
||||
|
||||
# when detecting lead slow down: blended
|
||||
# use blended for higher braking capability
|
||||
if self._has_dangerous_ttc:
|
||||
self._set_mode('blended')
|
||||
# Driving slow: use ACC (but not if actively slowing down)
|
||||
if self._has_slowness and not self._has_slow_down:
|
||||
self._mode_manager.request_mode('acc', confidence=0.8)
|
||||
return
|
||||
|
||||
# car driving at speed lower than set speed: acc
|
||||
if self._has_slowness:
|
||||
self._set_mode('acc')
|
||||
return
|
||||
|
||||
self._set_mode('acc')
|
||||
# Default: ACC
|
||||
self._mode_manager.request_mode('acc', confidence=0.7)
|
||||
|
||||
def _radar_mode(self) -> None:
|
||||
# when mpc fcw crash prob is high
|
||||
# use blended to slow down quickly
|
||||
"""Radar mode with emergency handling."""
|
||||
|
||||
# EMERGENCY: MPC FCW - immediate blended mode
|
||||
if self._has_mpc_fcw:
|
||||
self._set_mode('blended')
|
||||
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
|
||||
return
|
||||
|
||||
# If there is a filtered lead, the vehicle is not in standstill, and the lead vehicle's yRel meets the condition,
|
||||
if self._has_lead_filtered and not self._has_standstill:
|
||||
self._set_mode('acc')
|
||||
# If lead detected and not in standstill: always use ACC
|
||||
if self._has_lead_filtered and not (self._standstill_count > 3):
|
||||
self._mode_manager.request_mode('acc', confidence=1.0)
|
||||
return
|
||||
|
||||
# when blinker is on and speed is driving below V_ACC_MIN: blended
|
||||
# we don't want it to switch mode at higher speed, blended may trigger hard brake
|
||||
# if self._has_blinkers and self._v_ego_kph < V_ACC_MIN:
|
||||
# self._set_mode('blended')
|
||||
# return
|
||||
|
||||
# when standstill: blended
|
||||
# in case of lead car suddenly move away under traffic light, acc mode won't brake at traffic light.
|
||||
if self._has_standstill:
|
||||
self._set_mode('blended')
|
||||
return
|
||||
|
||||
# when detecting slow down scenario: blended
|
||||
# e.g. traffic light, curve, stop sign etc.
|
||||
# Slow down scenarios: emergency for high urgency, normal for lower urgency
|
||||
if self._has_slow_down:
|
||||
self._set_mode('blended')
|
||||
if self._urgency > 0.7:
|
||||
# Emergency: immediate blended mode for high urgency stops
|
||||
self._mode_manager.request_mode('blended', confidence=1.0, emergency=True)
|
||||
else:
|
||||
# Normal: blended with urgency-based confidence
|
||||
confidence = min(1.0, self._urgency * 1.3)
|
||||
self._mode_manager.request_mode('blended', confidence=confidence)
|
||||
return
|
||||
|
||||
# car driving at speed lower than set speed: acc
|
||||
if self._has_slowness:
|
||||
self._set_mode('acc')
|
||||
# Standstill: use blended
|
||||
if self._standstill_count > 3:
|
||||
self._mode_manager.request_mode('blended', confidence=0.9)
|
||||
return
|
||||
|
||||
# Nav enabled and distance to upcoming turning is 300 or below
|
||||
# if self._has_nav_instruction:
|
||||
# self._set_mode('blended')
|
||||
# return
|
||||
# Driving slow: use ACC (but not if actively slowing down)
|
||||
if self._has_slowness and not self._has_slow_down:
|
||||
self._mode_manager.request_mode('acc', confidence=0.8)
|
||||
return
|
||||
|
||||
self._set_mode('acc')
|
||||
|
||||
def set_mpc_fcw_crash_cnt(self) -> None:
|
||||
self._mpc_fcw_crash_cnt = self._mpc.crash_cnt
|
||||
|
||||
def _set_mode(self, mode: str) -> None:
|
||||
if self._set_mode_timeout == 0:
|
||||
self._mode = mode
|
||||
if mode == 'blended':
|
||||
self._set_mode_timeout = SET_MODE_TIMEOUT
|
||||
|
||||
if self._set_mode_timeout > 0:
|
||||
self._set_mode_timeout -= 1
|
||||
# Default: ACC
|
||||
self._mode_manager.request_mode('acc', confidence=0.7)
|
||||
|
||||
def update(self, sm: messaging.SubMaster) -> None:
|
||||
self._read_params()
|
||||
@@ -385,6 +383,6 @@ class DynamicExperimentalController:
|
||||
else:
|
||||
self._radar_mode()
|
||||
|
||||
self._mode_manager.update()
|
||||
self._active = sm['selfdriveState'].experimentalMode and self._enabled
|
||||
|
||||
self._frame += 1
|
||||
|
||||
@@ -1,248 +1,94 @@
|
||||
import pytest
|
||||
import numpy as np
|
||||
from openpilot.common.params import Params
|
||||
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.dec.constants import WMACConstants, SNG_State
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController, TRAJECTORY_SIZE, STOP_AND_GO_FRAME
|
||||
|
||||
class MockInterp:
|
||||
def __call__(self, x, xp, fp):
|
||||
return np.interp(x, xp, fp)
|
||||
|
||||
class MockCarState:
|
||||
def __init__(self, v_ego=0., standstill=False, left_blinker=False, right_blinker=False):
|
||||
self.vEgo = v_ego
|
||||
self.standstill = standstill
|
||||
self.leftBlinker = left_blinker
|
||||
self.rightBlinker = right_blinker
|
||||
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
|
||||
|
||||
class MockLeadOne:
|
||||
def __init__(self, status=False, d_rel=0):
|
||||
def __init__(self, status=0.0):
|
||||
self.status = status
|
||||
self.dRel = d_rel
|
||||
|
||||
class MockRadarState:
|
||||
def __init__(self, status=0.0):
|
||||
self.leadOne = MockLeadOne(status=status)
|
||||
|
||||
class MockCarState:
|
||||
def __init__(self, vEgo=0.0, vCruise=0.0, standstill=False):
|
||||
self.vEgo = vEgo
|
||||
self.vCruise = vCruise
|
||||
self.standstill = standstill
|
||||
|
||||
class MockModelData:
|
||||
def __init__(self, x_vals=None, positions=None):
|
||||
self.orientation = type('Orientation', (), {'x': x_vals})()
|
||||
self.position = type('Position', (), {'x': positions})()
|
||||
def __init__(self, valid=True):
|
||||
size = 33 if valid else 10 # incomplete if invalid
|
||||
self.position = type("Pos", (), {"x": [0.0] * size})()
|
||||
self.orientation = type("Ori", (), {"x": [0.0] * size})()
|
||||
|
||||
class MockControlState:
|
||||
def __init__(self, v_cruise=0):
|
||||
self.vCruise = v_cruise
|
||||
class MockSelfDriveState:
|
||||
def __init__(self, experimentalMode=False):
|
||||
self.experimentalMode = experimentalMode
|
||||
|
||||
class MockParams:
|
||||
def get_bool(self, name):
|
||||
return True
|
||||
|
||||
@pytest.fixture
|
||||
def interp(monkeypatch):
|
||||
mock_interp = MockInterp()
|
||||
monkeypatch.setattr('numpy.interp', mock_interp)
|
||||
return mock_interp
|
||||
def default_sm():
|
||||
sm = {
|
||||
'carState': MockCarState(vEgo=10.0, vCruise=20.0),
|
||||
'radarState': MockRadarState(status=1.0),
|
||||
'modelV2': MockModelData(valid=True),
|
||||
'selfdriveState': MockSelfDriveState(experimentalMode=True),
|
||||
}
|
||||
return sm
|
||||
|
||||
@pytest.fixture
|
||||
def controller(interp):
|
||||
params = Params()
|
||||
params.put_bool("DynamicExperimentalControl", True)
|
||||
return DynamicExperimentalController()
|
||||
def mock_cp():
|
||||
class CP:
|
||||
radarUnavailable = False
|
||||
return CP()
|
||||
|
||||
def test_initial_state(controller):
|
||||
"""Test initial state of the controller"""
|
||||
assert controller._mode == 'acc'
|
||||
assert not controller._has_lead
|
||||
assert not controller._has_standstill
|
||||
assert controller._sng_state == SNG_State.off
|
||||
assert not controller._has_lead_filtered
|
||||
assert not controller._has_slow_down
|
||||
assert not controller._has_dangerous_ttc
|
||||
assert not controller._has_mpc_fcw
|
||||
@pytest.fixture
|
||||
def mock_mpc():
|
||||
class MPC:
|
||||
crash_cnt = 0
|
||||
return MPC()
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_standstill_detection(controller, has_radar):
|
||||
"""Test standstill detection and state transitions"""
|
||||
car_state = MockCarState(standstill=True)
|
||||
lead_one = MockLeadOne()
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[150] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState()
|
||||
# Fake Kalman Filter that always returns a given value
|
||||
class FakeKalman:
|
||||
def __init__(self, value=1.0):
|
||||
self.value = value
|
||||
def add_data(self, v): pass
|
||||
def get_value(self): return self.value
|
||||
def get_confidence(self): return 1.0
|
||||
def reset_data(self): pass
|
||||
|
||||
# Test transition to standstill
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
assert controller._sng_state == SNG_State.stopped
|
||||
assert controller.get_mpc_mode() == 'blended'
|
||||
def test_initial_mode_is_acc(mock_cp, mock_mpc):
|
||||
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
|
||||
assert controller.mode() == "acc"
|
||||
|
||||
# Test transition from standstill to moving
|
||||
car_state.standstill = False
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
assert controller._sng_state == SNG_State.going
|
||||
def test_standstill_triggers_blended(mock_cp, mock_mpc, default_sm):
|
||||
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
|
||||
default_sm['carState'].standstill = True
|
||||
for _ in range(10):
|
||||
controller.update(default_sm)
|
||||
assert controller.mode() == "blended"
|
||||
|
||||
# Test complete transition to normal driving
|
||||
for _ in range(STOP_AND_GO_FRAME + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
assert controller._sng_state == SNG_State.off
|
||||
def test_emergency_blended_on_fcw(mock_cp, mock_mpc, default_sm):
|
||||
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
|
||||
mock_mpc.crash_cnt = 1 # simulate FCW
|
||||
for _ in range(2):
|
||||
controller.update(default_sm)
|
||||
assert controller.mode() == "blended"
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_lead_detection(controller, has_radar):
|
||||
"""Test lead vehicle detection and filtering"""
|
||||
car_state = MockCarState(v_ego=20) # 72 kph
|
||||
lead_one = MockLeadOne(status=True, d_rel=50) # Safe distance
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[150] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState(v_cruise=72)
|
||||
def test_radarless_slowdown_triggers_blended(mock_cp, mock_mpc, default_sm):
|
||||
mock_cp.radarUnavailable = True
|
||||
controller = DynamicExperimentalController(mock_cp, mock_mpc, params=MockParams())
|
||||
|
||||
# Let moving average stabilize
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
# Force conditions to simulate slowdown
|
||||
controller._slow_down_filter = FakeKalman(value=1.0) # Ensure urgency triggers slowdown
|
||||
controller._v_ego_kph = 35.0
|
||||
default_sm['modelV2'] = MockModelData(valid=False) # Incomplete trajectory
|
||||
|
||||
assert controller._has_lead_filtered
|
||||
expected_mode = 'acc' if has_radar else 'blended'
|
||||
assert controller.get_mpc_mode() == expected_mode
|
||||
for _ in range(3):
|
||||
controller.update(default_sm)
|
||||
|
||||
# Test lead loss detection
|
||||
lead_one.status = False
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert not controller._has_lead_filtered
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_slow_down_detection(controller, has_radar):
|
||||
"""Test slow down detection based on trajectory"""
|
||||
car_state = MockCarState(v_ego=10/3.6) # 10 kph
|
||||
lead_one = MockLeadOne()
|
||||
x_vals = [0] * TRAJECTORY_SIZE
|
||||
positions = [20] * TRAJECTORY_SIZE # Position within slow down threshold
|
||||
md = MockModelData(x_vals=x_vals, positions=positions)
|
||||
controls_state = MockControlState(v_cruise=30)
|
||||
|
||||
# Test slow down detection
|
||||
for _ in range(WMACConstants.SLOW_DOWN_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_slow_down
|
||||
assert controller.get_mpc_mode() == 'blended'
|
||||
|
||||
# Test slow down recovery
|
||||
positions = [200] * TRAJECTORY_SIZE # Position outside slow down threshold
|
||||
md = MockModelData(x_vals=x_vals, positions=positions)
|
||||
for _ in range(WMACConstants.SLOW_DOWN_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert not controller._has_slow_down
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_dangerous_ttc_detection(controller, has_radar):
|
||||
"""Test Time-To-Collision detection and handling"""
|
||||
car_state = MockCarState(v_ego=10) # 36 kph
|
||||
lead_one = MockLeadOne(status=True)
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[150] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState(v_cruise=36)
|
||||
|
||||
# First establish normal conditions with lead
|
||||
lead_one.dRel = 100 # Safe distance
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1): # First establish lead detection
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_lead_filtered # Verify lead is detected
|
||||
|
||||
# Now test dangerous TTC detection
|
||||
lead_one.dRel = 10 # 10m distance - should trigger dangerous TTC
|
||||
# TTC = dRel/vEgo = 10/10 = 1s (which is less than DANGEROUS_TTC = 2.3s)
|
||||
|
||||
# Need to update multiple times to allow the weighted average to stabilize
|
||||
for _ in range(WMACConstants.DANGEROUS_TTC_WINDOW_SIZE * 2):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_dangerous_ttc, "TTC of 1s should be considered dangerous"
|
||||
expected_mode = 'acc' if has_radar else 'blended'
|
||||
assert controller.get_mpc_mode() == expected_mode, f"Should be in [{expected_mode}] mode with dangerous TTC"
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_mode_transitions(controller, has_radar):
|
||||
"""Test comprehensive mode transitions under different conditions"""
|
||||
# Initialize with normal driving conditions
|
||||
car_state = MockCarState(v_ego=25) # 90 kph
|
||||
lead_one = MockLeadOne(status=False)
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[200] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState(v_cruise=100)
|
||||
|
||||
def stabilize_filters():
|
||||
"""Helper to let all moving averages stabilize"""
|
||||
for _ in range(max(WMACConstants.LEAD_WINDOW_SIZE, WMACConstants.SLOW_DOWN_WINDOW_SIZE,
|
||||
WMACConstants.DANGEROUS_TTC_WINDOW_SIZE, WMACConstants.MPC_FCW_WINDOW_SIZE) + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
# Test 1: Normal driving -> ACC mode
|
||||
stabilize_filters()
|
||||
assert controller.get_mpc_mode() == 'acc', "Should be in ACC mode under normal driving conditions"
|
||||
|
||||
# Test 2: Standstill -> Blended mode
|
||||
car_state.standstill = True
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
assert controller.get_mpc_mode() == 'blended', "Should be in blended mode during standstill"
|
||||
|
||||
# Test 3: Lead car appears -> ACC mode
|
||||
car_state = MockCarState(v_ego=20) # Reset car state
|
||||
lead_one.status = True
|
||||
lead_one.dRel = 50 # Safe distance
|
||||
stabilize_filters()
|
||||
assert not controller._has_dangerous_ttc, "Should not have dangerous TTC"
|
||||
assert controller.get_mpc_mode() == 'acc', "Should be in ACC mode with safe lead distance"
|
||||
|
||||
# Test 4: Dangerous TTC -> Blended mode
|
||||
car_state = MockCarState(v_ego=20) # 72 kph
|
||||
lead_one.status = True
|
||||
lead_one.dRel = 50 # First establish normal lead detection
|
||||
|
||||
# First establish lead detection
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_lead_filtered # Verify lead is detected
|
||||
|
||||
# Now create dangerous TTC condition
|
||||
lead_one.dRel = 20 # This creates a TTC of 1s, well below DANGEROUS_TTC
|
||||
|
||||
for _ in range(WMACConstants.DANGEROUS_TTC_WINDOW_SIZE * 2):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_dangerous_ttc, "Should detect dangerous TTC condition"
|
||||
expected_mode = 'acc' if has_radar else 'blended'
|
||||
assert controller.get_mpc_mode() == expected_mode, f"Should be in [{expected_mode}] mode with dangerous TTC"
|
||||
|
||||
@pytest.mark.parametrize("has_radar", [True, False], ids=["with_radar", "without_radar"])
|
||||
def test_mpc_fcw_handling(controller, has_radar):
|
||||
"""Test MPC FCW crash count handling and mode transitions"""
|
||||
car_state = MockCarState(v_ego=20)
|
||||
lead_one = MockLeadOne()
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[150] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState(v_cruise=72)
|
||||
|
||||
# Test FCW activation
|
||||
controller.set_mpc_fcw_crash_cnt(5)
|
||||
for _ in range(WMACConstants.MPC_FCW_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert controller._has_mpc_fcw
|
||||
assert controller.get_mpc_mode() == 'blended'
|
||||
|
||||
# Test FCW recovery
|
||||
controller.set_mpc_fcw_crash_cnt(0)
|
||||
for _ in range(WMACConstants.MPC_FCW_WINDOW_SIZE + 1):
|
||||
controller.update(not has_radar, car_state, lead_one, md, controls_state)
|
||||
|
||||
assert not controller._has_mpc_fcw
|
||||
|
||||
def test_radar_unavailable_handling(controller):
|
||||
"""Test behavior transitions between radar available and unavailable states"""
|
||||
car_state = MockCarState(v_ego=27.78) # 100 kph
|
||||
lead_one = MockLeadOne(status=True, d_rel=50)
|
||||
md = MockModelData(x_vals=[0] * TRAJECTORY_SIZE, positions=[150] * TRAJECTORY_SIZE)
|
||||
controls_state = MockControlState(v_cruise=100)
|
||||
|
||||
# Test with radar available
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1):
|
||||
controller.update(False, car_state, lead_one, md, controls_state)
|
||||
radar_mode = controller.get_mpc_mode()
|
||||
|
||||
# Test with radar unavailable
|
||||
for _ in range(WMACConstants.LEAD_WINDOW_SIZE + 1):
|
||||
controller.update(True, car_state, lead_one, md, controls_state)
|
||||
radarless_mode = controller.get_mpc_mode()
|
||||
|
||||
assert radar_mode is not None
|
||||
assert radarless_mode is not None
|
||||
assert controller.mode() == "blended"
|
||||
|
||||
Reference in New Issue
Block a user