slc in another pr

This commit is contained in:
Jason Wen
2025-06-06 18:54:05 -04:00
parent 89d332105f
commit 3a6987e6ba
16 changed files with 3 additions and 835 deletions

View File

@@ -195,14 +195,4 @@ inline static std::unordered_map<std::string, uint32_t> keys = {
{"OsmLocationUrl", PERSISTENT},
{"OsmWayTest", PERSISTENT},
{"Offroad_OSMUpdateRequired", CLEAR_ON_MANAGER_START},
// Speed Limit Control
{"SpeedLimitControl", PERSISTENT | BACKUP},
{"SpeedLimitControlPolicy", PERSISTENT | BACKUP},
{"SpeedLimitEngageType", PERSISTENT | BACKUP},
{"SpeedLimitOffsetType", PERSISTENT | BACKUP},
{"SpeedLimitValueOffset", PERSISTENT | BACKUP},
{"SpeedLimitWarningType", PERSISTENT | BACKUP},
{"SpeedLimitWarningOffsetType", PERSISTENT | BACKUP},
{"SpeedLimitWarningValueOffset", PERSISTENT | BACKUP},
};

View File

@@ -144,9 +144,6 @@ class LongitudinalPlanner(LongitudinalPlannerSP):
clipped_accel_coast_interp = np.interp(v_ego, [MIN_ALLOW_THROTTLE_SPEED, MIN_ALLOW_THROTTLE_SPEED*2], [accel_clip[1], clipped_accel_coast])
accel_clip[1] = min(accel_clip[1], clipped_accel_coast_interp)
# Get new v_cruise from Speed Limit Control
v_cruise = LongitudinalPlannerSP.update_v_cruise(self, sm, not reset_state, self.v_desired_filter.x, self.a_desired, v_cruise)
if force_slow_decel:
v_cruise = 0.0

View File

@@ -19,8 +19,7 @@ def main():
ldw = LaneDepartureWarning()
longitudinal_planner = LongitudinalPlanner(CP)
pm = messaging.PubMaster(['longitudinalPlan', 'driverAssistance', 'longitudinalPlanSP'])
sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'liveParameters', 'radarState', 'modelV2', 'selfdriveState',
"liveMapDataSP", "carStateSP"],
sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'liveParameters', 'radarState', 'modelV2', 'selfdriveState'],
poll='modelV2')
while True:

View File

@@ -88,7 +88,7 @@ class SelfdriveD(CruiseHelper):
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'livePose', 'liveDelay',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'controlsState', 'carControl', 'driverAssistance', 'alertDebug', 'userFlag'] + \
self.camera_packets + self.sensor_packets + self.gps_packets + ["longitudinalPlanSP"],
self.camera_packets + self.sensor_packets + self.gps_packets,
ignore_alive=ignore, ignore_avg_freq=ignore,
ignore_valid=ignore, frequency=int(1/DT_CTRL))
@@ -196,7 +196,6 @@ class SelfdriveD(CruiseHelper):
if not self.CP.notCar:
self.events.add_from_msg(self.sm['driverMonitoringState'].events)
self.events_sp.add_from_msg(self.sm['longitudinalPlanSP'].events)
# Add car events, ignore if CAN isn't valid
if CS.canValid:

View File

@@ -1,4 +1,3 @@
from cereal import custom
from openpilot.common.swaglog import cloudlog
LOOK_AHEAD_HORIZON_TIME = 15. # s. Time horizon for look ahead of turn speed sections to provide on liveMapDataSP msg.

View File

@@ -7,20 +7,14 @@ See the LICENSE.md file in the root directory for more details.
from cereal import messaging, custom
from opendbc.car import structs
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
from openpilot.sunnypilot.selfdrive.controls.lib.dec.dec import DynamicExperimentalController
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.speed_limit_controller import SpeedLimitController
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
DecState = custom.LongitudinalPlanSP.DynamicExperimentalControl.DynamicExperimentalControlState
class LongitudinalPlannerSP:
def __init__(self, CP: structs.CarParams, mpc):
self.events_sp = EventsSP()
self.dec = DynamicExperimentalController(CP, mpc)
self.slc = SpeedLimitController(CP)
def get_mpc_mode(self) -> str | None:
if not self.dec.active():
@@ -28,17 +22,6 @@ class LongitudinalPlannerSP:
return self.dec.mode()
def update_v_cruise(self, sm: messaging.SubMaster, long_enabled: bool, v_ego: float, a_ego: float, v_cruise: float) -> float:
self.events_sp.clear()
self.slc.update(long_enabled, v_ego, a_ego, sm, v_cruise, self.events_sp)
v_cruise_slc = self.slc.speed_limit_offseted if self.slc.is_active else V_CRUISE_UNSET
v_cruise_final = min(v_cruise, v_cruise_slc)
return v_cruise_final
def update(self, sm: messaging.SubMaster) -> None:
self.dec.update(sm)
@@ -48,7 +31,6 @@ class LongitudinalPlannerSP:
plan_sp_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
longitudinalPlanSP = plan_sp_send.longitudinalPlanSP
longitudinalPlanSP.events = self.events_sp.to_msg()
# Dynamic Experimental Control
dec = longitudinalPlanSP.dec
@@ -56,13 +38,4 @@ class LongitudinalPlannerSP:
dec.enabled = self.dec.enabled()
dec.active = self.dec.active()
# Speed Limit Control
slc = longitudinalPlanSP.slc
slc.state = self.slc.state
slc.enabled = self.slc.is_enabled
slc.active = self.slc.is_active
slc.speedLimit = float(self.slc.speed_limit)
slc.speedLimitOffset = float(self.slc.speed_limit_offset)
slc.distToSpeedLimit = float(self.slc.distance)
pm.send('longitudinalPlanSP', plan_sp_send)

View File

@@ -1,19 +0,0 @@
from cereal import custom
DEBUG = True
PARAMS_UPDATE_PERIOD = 2. # secs. Time between parameter updates.
TEMP_INACTIVE_GUARD_PERIOD = 1. # secs. Time to wait after activation before considering temp deactivation signal.
# Lookup table for speed limit percent offset depending on speed.
LIMIT_PERC_OFFSET_V = [0.1, 0.05, 0.038] # 55, 105, 135 km/h
LIMIT_PERC_OFFSET_BP = [13.9, 27.8, 36.1] # 50, 100, 130 km/h
# Constants for Limit controllers.
LIMIT_ADAPT_ACC = -1. # m/s^2 Ideal acceleration for the adapting (braking) phase when approaching speed limits.
LIMIT_MIN_ACC = -1.5 # m/s^2 Maximum deceleration allowed for limit controllers to provide.
LIMIT_MAX_ACC = 1.0 # m/s^2 Maximum acceleration allowed for limit controllers to provide while active.
LIMIT_MIN_SPEED = 8.33 # m/s, Minimum speed limit to provide as solution on limit controllers.
LIMIT_SPEED_OFFSET_TH = -1. # m/s Maximum offset between speed limit and current speed for adapting state.
LIMIT_MAX_MAP_DATA_AGE = 10. # s Maximum time to hold to map data, then consider it invalid inside limits controllers.
SpeedLimitControlState = custom.LongitudinalPlanSP.SpeedLimitControlState

View File

@@ -1,26 +0,0 @@
from enum import IntEnum
class Source(IntEnum):
none = 0
car_state = 1
map_data = 2
class Policy(IntEnum):
map_data_only = 0
car_state_only = 1
map_data_priority = 2
car_state_priority = 3
combined = 4
class Engage(IntEnum):
auto = 0
user_confirm = 1
class OffsetType(IntEnum):
default = 0
fixed = 1
percentage = 2

View File

@@ -1,19 +0,0 @@
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller import DEBUG, SpeedLimitControlState
from openpilot.common.swaglog import cloudlog
def debug(msg):
if not DEBUG:
return
cloudlog.debug(msg)
def description_for_state(speed_limit_control_state):
if speed_limit_control_state == SpeedLimitControlState.inactive:
return 'INACTIVE'
if speed_limit_control_state == SpeedLimitControlState.tempInactive:
return 'TEMP_INACTIVE'
if speed_limit_control_state == SpeedLimitControlState.adapting:
return 'ADAPTING'
if speed_limit_control_state == SpeedLimitControlState.active:
return 'ACTIVE'

View File

@@ -1,344 +0,0 @@
from collections.abc import Callable
import numpy as np
import time
from cereal import messaging, custom
from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller import LIMIT_PERC_OFFSET_BP, LIMIT_PERC_OFFSET_V, \
PARAMS_UPDATE_PERIOD, TEMP_INACTIVE_GUARD_PERIOD, LIMIT_SPEED_OFFSET_TH, SpeedLimitControlState
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.common import Source, Policy, Engage, OffsetType
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.helpers import description_for_state, debug
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.speed_limit_resolver import SpeedLimitResolver
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
from openpilot.selfdrive.modeld.constants import ModelConstants
EventNameSP = custom.OnroadEventSP.EventName
ACTIVE_STATES = (SpeedLimitControlState.active, SpeedLimitControlState.adapting)
ENABLED_STATES = (SpeedLimitControlState.preActive, SpeedLimitControlState.tempInactive, *ACTIVE_STATES)
class SpeedLimitController:
_speed_limit: float
_distance: float
_source: Source
_v_ego: float
_a_ego: float
_v_offset: float
def __init__(self, CP):
self._params = Params()
self._CP = CP
self._policy = self._read_policy_param()
self._resolver = SpeedLimitResolver(self._policy)
self._last_params_update = 0.0
self._last_op_engaged_time = 0.0
self._is_metric = self._params.get_bool("IsMetric")
self._is_enabled = self._params.get_bool("SpeedLimitControl")
self._op_engaged = False
self._op_engaged_prev = False
self._v_ego = 0.
self._a_ego = 0.
self._v_offset = 0.
self._v_cruise_setpoint = 0.
self._v_cruise_setpoint_prev = 0.
self._v_cruise_setpoint_changed = False
self._speed_limit = 0.
self._speed_limit_prev = 0.
self._speed_limit_changed = False
self._distance = 0.
self._source = Source.none
self._state = SpeedLimitControlState.inactive
self._state_prev = SpeedLimitControlState.inactive
self._gas_pressed = False
self._pcm_cruise_op_long = CP.openpilotLongitudinalControl and CP.pcmCruise
self._offset_type = OffsetType(self._read_int_param("SpeedLimitOffsetType"))
self._offset_value = self._read_int_param("SpeedLimitValueOffset")
self._warning_type = self._read_int_param("SpeedLimitWarningType")
self._warning_offset_type = OffsetType(self._read_int_param("SpeedLimitWarningOffsetType"))
self._warning_offset_value = self._read_int_param("SpeedLimitWarningValueOffset")
self._engage_type = self._read_engage_type_param()
self._current_time = 0.
self._v_cruise_rounded = 0.
self._v_cruise_prev_rounded = 0.
self._speed_limit_offsetted_rounded = 0.
self._speed_limit_warning_offsetted_rounded = 0.
self._speed_factor = CV.MS_TO_KPH if self._is_metric else CV.MS_TO_MPH
# Mapping functions to state transitions
self.state_transition_strategy = {
# Transition functions for each state
SpeedLimitControlState.inactive: self.transition_state_from_inactive,
SpeedLimitControlState.tempInactive: self.transition_state_from_temp_inactive,
SpeedLimitControlState.adapting: self.transition_state_from_adapting,
SpeedLimitControlState.active: self.transition_state_from_active,
SpeedLimitControlState.preActive: self.transition_state_from_pre_active,
}
# FIXME-SP: unused?
# Solution functions mapped to respective states
self.acceleration_solutions = {
# Solution functions for each state
SpeedLimitControlState.tempInactive: self.get_current_acceleration_as_target,
SpeedLimitControlState.inactive: self.get_current_acceleration_as_target,
SpeedLimitControlState.adapting: self.get_adapting_state_target_acceleration,
SpeedLimitControlState.active: self.get_active_state_target_acceleration,
SpeedLimitControlState.preActive: self.get_current_acceleration_as_target,
}
@property
def state(self) -> SpeedLimitControlState:
return self._state
@state.setter
def state(self, value) -> None:
if value != self._state:
debug(f'Speed Limit Controller state: {description_for_state(value)}')
if value == SpeedLimitControlState.tempInactive:
# Reset previous speed limit to current value as to prevent going out of tempInactive in
# a single cycle when the speed limit changes at the same time the user has temporarily deactivated it.
self._speed_limit_prev = self._speed_limit
self._state = value
@property
def is_enabled(self) -> bool:
return self.state in ENABLED_STATES and self._is_enabled
@property
def is_active(self) -> bool:
return self.state in ACTIVE_STATES and self._is_enabled
@property
def speed_limit_offseted(self) -> float:
return self._speed_limit + self.speed_limit_offset
@property
def speed_limit_offset(self) -> float:
return self._get_offset(self._offset_type, self._offset_value)
@property
def speed_limit_warning_offset(self) -> float:
return self._get_offset(self._warning_offset_type, self._warning_offset_value)
@property
def speed_limit(self) -> float:
return self._speed_limit
@property
def distance(self) -> float:
return self._distance
@property
def source(self) -> Source:
return self._source
def _get_offset(self, offset_type: OffsetType, offset_value: int) -> float:
if offset_type == OffsetType.default:
return float(np.interp(self._speed_limit, LIMIT_PERC_OFFSET_BP, LIMIT_PERC_OFFSET_V) * self._speed_limit)
elif offset_type == OffsetType.fixed:
return offset_value * (CV.KPH_TO_MS if self._is_metric else CV.MPH_TO_MS)
elif offset_type == OffsetType.percentage:
return offset_value * 0.01 * self._speed_limit
else:
raise NotImplementedError("Offset not supported")
def _update_v_cruise_setpoint_prev(self) -> None:
self._v_cruise_setpoint_prev = self._v_cruise_setpoint
def _update_params(self) -> None:
if self._current_time > self._last_params_update + PARAMS_UPDATE_PERIOD:
self._is_enabled = self._params.get_bool("SpeedLimitControl")
self._offset_type = OffsetType(self._read_int_param("SpeedLimitOffsetType"))
self._offset_value = self._read_int_param("SpeedLimitValueOffset")
self._warning_type = self._read_int_param("SpeedLimitWarningType")
self._warning_offset_type = OffsetType(self._read_int_param("SpeedLimitWarningOffsetType"))
self._warning_offset_value = self._read_int_param("SpeedLimitWarningValueOffset")
self._policy = self._read_policy_param()
self._is_metric = self._params.get_bool("IsMetric")
self._speed_factor = CV.MS_TO_KPH if self._is_metric else CV.MS_TO_MPH
self._resolver.change_policy(self._policy)
self._engage_type = self._read_engage_type_param()
self._last_params_update = self._current_time
def _read_policy_param(self) -> Policy:
try:
return Policy(int(self._params.get("SpeedLimitControlPolicy", encoding='utf8')))
except (ValueError, TypeError):
return Policy.car_state_priority
def _read_engage_type_param(self) -> Engage:
if self._pcm_cruise_op_long:
return Engage.auto
return Engage(self._read_int_param("SpeedLimitEngageType", validator=lambda x: np.clip(x, Engage.auto, Engage.user_confirm)))
def _read_int_param(self, key: str, default: int = 0, validator: Callable[[int], int] = None) -> int:
try:
val = int(self._params.get(key, encoding='utf8'))
if validator is not None:
return validator(val)
return val
except (ValueError, TypeError):
return default
def _update_calculations(self) -> None:
# Update current velocity offset (error)
self._v_offset = self.speed_limit_offseted - self._v_ego
# Track the time op becomes active to prevent going to tempInactive right away after
# op enabling since controlsd will change the cruise speed every time on enabling and this will
# cause a temp inactive transition if the controller is updated before controlsd sets actual cruise
# speed.
if not self._op_engaged_prev and self._op_engaged:
self._last_op_engaged_time = self._current_time
# Update change tracking variables
self._speed_limit_changed = self._speed_limit != self._speed_limit_prev
self._v_cruise_setpoint_changed = self._v_cruise_setpoint != self._v_cruise_setpoint_prev
self._speed_limit_prev = self._speed_limit
if self._engage_type != Engage.user_confirm:
self._update_v_cruise_setpoint_prev()
self._op_engaged_prev = self._op_engaged
self._v_cruise_rounded = int(round(self._v_cruise_setpoint * self._speed_factor))
self._v_cruise_prev_rounded = int(round(self._v_cruise_setpoint_prev * self._speed_factor))
self._speed_limit_offsetted_rounded = 0 if self._speed_limit == 0 else int(round((self._speed_limit + self.speed_limit_offset) * self._speed_factor))
self._speed_limit_warning_offsetted_rounded = 0 if self._speed_limit == 0 else \
int(round((self._speed_limit + self.speed_limit_warning_offset) * self._speed_factor))
def transition_state_from_inactive(self) -> None:
""" Make state transition from inactive state """
if self._engage_type == Engage.user_confirm:
if (((self._last_op_engaged_time + 7.) >= self._current_time >= (self._last_op_engaged_time + 2.)) or
self._speed_limit_changed):
if self._speed_limit_changed:
self._last_op_engaged_time = self._current_time - 2. # immediately prompt confirmation
self.state = SpeedLimitControlState.preActive
elif self._engage_type == Engage.auto:
if self._v_offset < LIMIT_SPEED_OFFSET_TH:
self.state = SpeedLimitControlState.adapting
else:
self.state = SpeedLimitControlState.active
def transition_state_from_temp_inactive(self) -> None:
""" Make state transition from temporary inactive state """
if self._speed_limit_changed:
if self._engage_type == Engage.user_confirm:
self._last_op_engaged_time = self._current_time - 2. # immediately prompt confirmation
self.state = SpeedLimitControlState.preActive
elif self._engage_type == Engage.auto:
self.state = SpeedLimitControlState.inactive
def transition_state_from_pre_active(self) -> None:
""" Make state transition from preActive state """
if self._current_time >= (self._last_op_engaged_time + 7.):
self.state = SpeedLimitControlState.inactive
elif (self._last_op_engaged_time + 7.) > self._current_time > (self._last_op_engaged_time + 2.):
if self._speed_limit_changed:
self._last_op_engaged_time = self._current_time - 2. # immediately prompt confirmation
elif self._v_cruise_prev_rounded < self._speed_limit_offsetted_rounded:
if self._v_cruise_setpoint > self._v_cruise_setpoint_prev:
self.state = SpeedLimitControlState.active
elif self._v_cruise_prev_rounded > self._speed_limit_offsetted_rounded:
if self._v_cruise_setpoint < self._v_cruise_setpoint_prev:
self.state = SpeedLimitControlState.active
elif self._v_cruise_prev_rounded == self._speed_limit_offsetted_rounded:
self.state = SpeedLimitControlState.active
def transition_state_from_adapting(self) -> None:
""" Make state transition from adapting state """
if self._v_offset >= LIMIT_SPEED_OFFSET_TH:
self.state = SpeedLimitControlState.active
def transition_state_from_active(self) -> None:
""" Make state transition from active state """
if self._engage_type == Engage.user_confirm:
if self._state_prev == SpeedLimitControlState.active:
if self._v_cruise_setpoint_changed and self._v_cruise_rounded != self._speed_limit_offsetted_rounded:
self.state = SpeedLimitControlState.tempInactive
elif self._speed_limit_changed:
self._last_op_engaged_time = self._current_time - 2. # immediately prompt confirmation
self.state = SpeedLimitControlState.preActive
elif self._engage_type == Engage.auto:
if self._v_offset < LIMIT_SPEED_OFFSET_TH:
self.state = SpeedLimitControlState.adapting
def _state_transition(self) -> None:
self._state_prev = self._state
# In any case, if op is disabled, or speed limit control is disabled
# or the reported speed limit is 0 or gas is pressed, deactivate.
if not self._op_engaged or not self._is_enabled or self._speed_limit == 0:
self.state = SpeedLimitControlState.inactive
return
# In any case, we deactivate the speed limit controller temporarily if the user changes the cruise speed.
# Ignore if a minimum amount of time has not passed since activation. This is to prevent temp inactivations
# due to controlsd logic changing cruise setpoint when going active.
if self._engage_type == Engage.auto and self._v_cruise_setpoint_changed and \
self._current_time > (self._last_op_engaged_time + TEMP_INACTIVE_GUARD_PERIOD):
self.state = SpeedLimitControlState.tempInactive
return
self.state_transition_strategy[self.state]()
if self._engage_type == Engage.user_confirm:
self._update_v_cruise_setpoint_prev()
def get_current_acceleration_as_target(self) -> float:
""" When state is inactive or tempInactive, preserve current acceleration """
return self._a_ego
def get_adapting_state_target_acceleration(self) -> float:
""" In adapting state, calculate target acceleration based on speed limit and current velocity """
if self.distance > 0:
return (self.speed_limit_offseted ** 2 - self._v_ego ** 2) / (2. * self.distance)
return self._v_offset / float(ModelConstants.T_IDXS[CONTROL_N])
def get_active_state_target_acceleration(self) -> float:
""" In active state, aim to keep speed constant around control time horizon """
return self._v_offset / float(ModelConstants.T_IDXS[CONTROL_N])
def _update_events(self, events_sp: EventsSP) -> None:
if self._speed_limit > 0 and self._warning_type == 2 and \
self._speed_limit_warning_offsetted_rounded < int(round(self._v_ego * self._speed_factor)):
events_sp.add(EventNameSP.speedLimitPreActive)
if not self.is_active:
if self._state == SpeedLimitControlState.preActive and self._state_prev != SpeedLimitControlState.preActive and \
self._v_cruise_rounded != self._speed_limit_offsetted_rounded:
events_sp.add(EventNameSP.speedLimitPreActive)
else:
if self._engage_type == Engage.user_confirm:
if self._state_prev == SpeedLimitControlState.preActive:
events_sp.add(EventNameSP.speedLimitConfirmed)
events_sp.add(EventNameSP.speedLimitActive)
elif self._engage_type == Engage.auto:
if self._state_prev not in ACTIVE_STATES:
events_sp.add(EventNameSP.speedLimitActive)
elif self._speed_limit_changed != 0:
events_sp.add(EventNameSP.speedLimitValueChange)
def update(self, enabled: bool, v_ego: float, a_ego: float, sm: messaging.SubMaster, v_cruise_setpoint: float, events_sp: EventsSP) -> None:
_car_state = sm['carState']
self._op_engaged = enabled and self._CP.openpilotLongitudinalControl
self._v_ego = v_ego
self._a_ego = a_ego
self._v_cruise_setpoint = v_cruise_setpoint
self._gas_pressed = _car_state.gasPressed
self._current_time = time.monotonic()
self._speed_limit, self._distance, self._source = self._resolver.resolve(v_ego, self.speed_limit, sm)
self._update_params()
self._update_calculations()
self._state_transition()
self._update_events(events_sp)

View File

@@ -1,125 +0,0 @@
import time
import numpy as np
from cereal import messaging, custom
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller import LIMIT_MAX_MAP_DATA_AGE, LIMIT_ADAPT_ACC
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.common import Source, Policy
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.helpers import debug
class SpeedLimitResolver:
_sm: messaging.SubMaster
_limit_solutions: dict[Source, float] # Store for speed limit solutions from different sources
_distance_solutions: dict[Source, float] # Store for distance to current speed limit start for different sources
_v_ego: float
_current_speed_limit: float
def __init__(self, policy: Policy):
self._limit_solutions = {}
self._distance_solutions = {}
self._policy = policy
self._policy_to_sources_map = {
Policy.car_state_only: [Source.car_state],
Policy.car_state_priority: [Source.car_state, Source.map_data],
Policy.map_data_priority: [Source.map_data, Source.car_state],
Policy.map_data_only: [Source.map_data],
Policy.combined: [Source.car_state, Source.map_data],
}
for source in Source:
self._reset_limit_sources(source)
def change_policy(self, policy: Policy) -> None:
self._policy = policy
def _reset_limit_sources(self, source: Source) -> None:
self._limit_solutions[source] = 0.
self._distance_solutions[source] = 0.
def _is_sock_updated(self, sock: str) -> bool:
return self._sm.alive[sock] and self._sm.updated[sock]
def resolve(self, v_ego: float, current_speed_limit: float, sm: messaging.SubMaster) -> tuple[float, float, Source]:
self._v_ego = v_ego
self._current_speed_limit = current_speed_limit
self._sm = sm
self._resolve_limit_sources()
return self._consolidate()
def _resolve_limit_sources(self) -> None:
"""Get limit solutions from each data source"""
self._get_from_car_state()
self._get_from_map_data()
def _get_from_car_state(self) -> None:
if not self._is_sock_updated('carStateSP'):
debug('SL: No carStateSP instruction for speed limit')
return
self._reset_limit_sources(Source.car_state)
self._limit_solutions[Source.car_state] = self._sm['carStateSP'].speedLimit
self._distance_solutions[Source.car_state] = 0.
def _get_from_map_data(self) -> None:
if not self._is_sock_updated("liveMapDataSP"):
debug('SL: No map data for speed limit')
return
# Load limits from map_data
self._reset_limit_sources(Source.map_data)
self._process_map_data(self._sm["liveMapDataSP"])
def _process_map_data(self, map_data: custom.LiveMapDataSP) -> None:
gps_fix_age = time.time() - map_data.lastGpsTimestamp * 1e-3
if gps_fix_age > LIMIT_MAX_MAP_DATA_AGE:
debug(f'SL: Ignoring map data as is too old. Age: {gps_fix_age}')
return
speed_limit = map_data.speedLimit if map_data.speedLimitValid else 0.
next_speed_limit = map_data.speedLimitAhead if map_data.speedLimitAheadValid else 0.
self._calculate_map_data_limits(speed_limit, next_speed_limit, map_data)
def _calculate_map_data_limits(self, speed_limit: float, next_speed_limit: float, map_data: custom.LiveMapDataSP) -> None:
distance_since_fix = self._v_ego * (time.time() - map_data.lastGpsTimestamp * 1e-3)
distance_to_speed_limit_ahead = max(0., map_data.speedLimitAheadDistance - distance_since_fix)
self._limit_solutions[Source.map_data] = speed_limit
self._distance_solutions[Source.map_data] = 0.
if 0. < next_speed_limit < self._v_ego:
adapt_time = (next_speed_limit - self._v_ego) / LIMIT_ADAPT_ACC
adapt_distance = self._v_ego * adapt_time + 0.5 * LIMIT_ADAPT_ACC * adapt_time ** 2
if distance_to_speed_limit_ahead <= adapt_distance:
self._limit_solutions[Source.map_data] = next_speed_limit
self._distance_solutions[Source.map_data] = distance_to_speed_limit_ahead
def _consolidate(self) -> tuple[float, float, Source]:
source = self._get_source_solution_according_to_policy()
self.speed_limit = self._limit_solutions[source] if source else 0.
self.distance = self._distance_solutions[source] if source else 0.
self.source = source or Source.none
debug(f'SL: *** Speed Limit set: {self.speed_limit}, distance: {self.distance}, source: {self.source}')
return self.speed_limit, self.distance, self.source
def _get_source_solution_according_to_policy(self) -> Source | None:
sources_for_policy = self._policy_to_sources_map[self._policy]
if self._policy != Policy.combined:
# They are ordered in the order of preference, so we pick the first that's non zero
for source in sources_for_policy:
if self._limit_solutions[source] > 0.:
return Source(source)
limits = np.array([self._limit_solutions[source] for source in sources_for_policy], dtype=float)
sources = np.array([source.value for source in sources_for_policy], dtype=int)
if len(limits) > 0:
min_idx = np.argmin(limits)
return Source(sources[min_idx])
return None

View File

@@ -1,78 +0,0 @@
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
from cereal import custom
from openpilot.sunnypilot.selfdrive.selfdrived.events import EventsSP
EventNameSP = custom.OnroadEventSP.EventName
State = custom.LongitudinalPlanSP.SpeedLimitControlState
ACTIVE_STATES = (State.active, State.adapting)
ENABLED_STATES = (State.preActive, State.tempInactive, *ACTIVE_STATES)
class StateMachine:
def __init__(self):
self.state = State.inactive
def update(self, events_sp: EventsSP) -> tuple[bool, bool]:
# INACTIVE
if self.state == State.inactive:
if events_sp.has(EventNameSP.speedLimitEnable):
self.state = State.preActive
elif events_sp.has(EventNameSP.speedLimitAdapting):
self.state = State.adapting
elif events_sp.has(EventNameSP.speedLimitActive):
self.state = State.active
# PRE ACTIVE
elif self.state == State.preActive:
if events_sp.has(EventNameSP.speedLimitDisable):
self.state = State.inactive
elif events_sp.has(EventNameSP.speedLimitUserCancel):
self.state = State.inactive
elif events_sp.has(EventNameSP.speedLimitUserConfirm):
self.state = State.active
elif events_sp.has(EventNameSP.speedLimitActive):
self.state = State.active
# ACTIVE
elif self.state == State.active:
if events_sp.has(EventNameSP.speedLimitDisable):
self.state = State.inactive
elif events_sp.has(EventNameSP.speedLimitUserCancel):
self.state = State.tempInactive
elif events_sp.has(EventNameSP.speedLimitAdapting):
self.state = State.adapting
elif events_sp.has(EventNameSP.speedLimitValueChange):
# For user confirm mode, transition to preActive on speed limit change
if events_sp.has(EventNameSP.speedLimitEnable):
self.state = State.preActive
# ADAPTING
elif self.state == State.adapting:
if events_sp.has(EventNameSP.speedLimitDisable):
self.state = State.inactive
elif events_sp.has(EventNameSP.speedLimitUserCancel):
self.state = State.tempInactive
elif events_sp.has(EventNameSP.speedLimitReached):
self.state = State.active
# TEMP INACTIVE
elif self.state == State.tempInactive:
if events_sp.has(EventNameSP.speedLimitDisable):
self.state = State.inactive
elif events_sp.has(EventNameSP.speedLimitValueChange):
# When speed limit changes, reactivate
if events_sp.has(EventNameSP.speedLimitEnable):
self.state = State.preActive
else:
self.state = State.inactive
enabled = self.state in ENABLED_STATES
active = self.state in ACTIVE_STATES
return enabled, active

View File

@@ -1,128 +0,0 @@
import random
import time
import pytest
from pytest_mock import MockerFixture
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller import LIMIT_MAX_MAP_DATA_AGE
# from selfdrive.controls.lib.speed_limit_controller_tbd import SpeedLimitResolver as OriginalSpeedLimitResolver
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.speed_limit_resolver import SpeedLimitResolver as RefactoredSpeedLimitResolver
from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit_controller.common import Source, Policy
def create_mock(properties, mocker: MockerFixture):
mock = mocker.MagicMock()
for _property, value in properties.items():
setattr(mock, _property, value)
return mock
def setup_sm_mock(mocker: MockerFixture):
cruise_speed_limit = random.uniform(0, 120)
live_map_data_limit = random.uniform(0, 120)
car_state = create_mock({
'gasPressed': False,
'brakePressed': False,
'standstill': False,
}, mocker)
car_state_sp = create_mock({
'speedLimit': cruise_speed_limit,
}, mocker)
live_map_data = create_mock({
'speedLimit': live_map_data_limit,
'speedLimitValid': True,
'speedLimitAhead': 0.,
'speedLimitAheadValid': 0.,
'speedLimitAheadDistance': 0.,
'lastGpsTimestamp': time.time() * 1e3,
}, mocker)
sm_mock = mocker.MagicMock()
sm_mock.__getitem__.side_effect = lambda key: {
'carState': car_state,
'liveMapDataSP': live_map_data,
'carStateSP': car_state_sp,
}[key]
return sm_mock
parametrized_policies = pytest.mark.parametrize(
"policy, sm_key, function_key", [
(Policy.car_state_only, 'carStateSP', 'car_state'),
(Policy.car_state_priority, 'carStateSP', 'car_state'),
(Policy.map_data_only, 'liveMapDataSP', 'map_data'),
(Policy.map_data_priority, 'liveMapDataSP', 'map_data'),
],
ids=lambda val: val.name if hasattr(val, 'name') else str(val)
)
@pytest.mark.parametrize("resolver_class", [RefactoredSpeedLimitResolver], ids=["Refactored"])
class TestSpeedLimitResolverValidation:
@pytest.mark.parametrize("policy", list(Policy), ids=lambda policy: policy.name)
def test_initial_state(self, resolver_class, policy):
resolver = resolver_class(policy)
for source in Source:
if source in resolver._limit_solutions:
assert resolver._limit_solutions[source] == 0.
assert resolver._distance_solutions[source] == 0.
@parametrized_policies
def test_resolver(self, resolver_class, policy, sm_key, function_key, mocker: MockerFixture):
resolver = resolver_class(policy)
sm_mock = setup_sm_mock(mocker)
source_speed_limit = sm_mock[sm_key].speedLimit
# Assert the resolver
speed_limit, _, source = resolver.resolve(source_speed_limit, 0, sm_mock)
assert speed_limit == source_speed_limit
assert source == Source[function_key]
def test_resolver_combined(self, resolver_class, mocker: MockerFixture):
resolver = resolver_class(Policy.combined)
sm_mock = setup_sm_mock(mocker)
socket_to_source = {'carStateSP': Source.car_state, 'liveMapDataSP': Source.map_data}
minimum_key, minimum_speed_limit = min(
((key, sm_mock[key].speedLimit) for key in
socket_to_source.keys()), key=lambda x: x[1])
# Assert the resolver
speed_limit, _, source = resolver.resolve(minimum_speed_limit, 0, sm_mock)
assert speed_limit == minimum_speed_limit
assert source == socket_to_source[minimum_key]
@parametrized_policies
def test_parser(self, resolver_class, policy, sm_key, function_key, mocker: MockerFixture):
resolver = resolver_class(policy)
sm_mock = setup_sm_mock(mocker)
source_speed_limit = sm_mock[sm_key].speedLimit
# Assert the parsing
speed_limit, _, source = resolver.resolve(source_speed_limit, 0, sm_mock)
assert resolver._limit_solutions[Source[function_key]] == source_speed_limit
assert resolver._distance_solutions[Source[function_key]] == 0.
@pytest.mark.parametrize("policy", list(Policy), ids=lambda policy: policy.name)
def test_resolve_interaction_in_update(self, resolver_class, policy, mocker: MockerFixture):
v_ego = 50
resolver = resolver_class(policy)
sm_mock = setup_sm_mock(mocker)
_speed_limit, _distance, _source = resolver.resolve(v_ego, 0, sm_mock)
# After resolution
assert _speed_limit is not None
assert _distance is not None
assert _source is not None
@pytest.mark.parametrize("policy", list(Policy), ids=lambda policy: policy.name)
def test_old_map_data_ignored(self, resolver_class, policy, mocker: MockerFixture):
resolver = resolver_class(policy)
sm_mock = mocker.MagicMock()
sm_mock['liveMapDataSP'].lastGpsTimestamp = (time.time() - 2 * LIMIT_MAX_MAP_DATA_AGE) * 1e3
resolver._sm = sm_mock
resolver._get_from_map_data()
assert resolver._limit_solutions[Source.map_data] == 0.
assert resolver._distance_solutions[Source.map_data] == 0.

View File

@@ -1,6 +1,4 @@
import cereal.messaging as messaging
from cereal import log, car, custom
from openpilot.common.conversions import Conversions as CV
from openpilot.sunnypilot.selfdrive.selfdrived.events_base import EventsBase, Priority, ET, Alert, \
NoEntryAlert, ImmediateDisableAlert, EngagementAlert, NormalPermanentAlert, AlertCallbackType, wrong_car_mode_alert
@@ -16,17 +14,6 @@ EventNameSP = custom.OnroadEventSP.EventName
EVENT_NAME_SP = {v: k for k, v in EventNameSP.schema.enumerants.items()}
def speed_limit_adjust_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert:
speedLimit = sm['longitudinalPlanSP'].slc.speedLimit
speed = round(speedLimit * (CV.MS_TO_KPH if metric else CV.MS_TO_MPH))
message = f'Adjusting to {speed} {"km/h" if metric else "mph"} speed limit'
return Alert(
message,
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 4.)
class EventsSP(EventsBase):
def __init__(self):
super().__init__()
@@ -145,34 +132,6 @@ EVENTS_SP: dict[int, dict[str, Alert | AlertCallbackType]] = {
EventNameSP.pedalPressedAlertOnly: {
ET.WARNING: NoEntryAlert("Pedal Pressed")
},
EventNameSP.speedLimitPreActive: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.none, .45), # TODO-SP: AudibleAlert.promptSingleLow
},
EventNameSP.speedLimitActive: {
ET.WARNING: Alert(
"Set speed changed to match posted speed limit",
"",
AlertStatus.normal, AlertSize.small,
Priority.LOW, VisualAlert.none, AudibleAlert.none, 3.),
},
EventNameSP.speedLimitConfirmed: {
ET.WARNING: Alert(
"",
"",
AlertStatus.normal, AlertSize.none,
Priority.MID, VisualAlert.none, AudibleAlert.none, .45), # TODO-SP: AudibleAlert.promptSingleHigh
},
EventNameSP.speedLimitValueChange: {
ET.WARNING: speed_limit_adjust_alert,
},
}
}

View File

@@ -63,15 +63,6 @@ def manager_init() -> None:
("ModelManager_ModelsCache", ""),
("NeuralNetworkLateralControl", "0"),
("QuietMode", "0"),
("SpeedLimitControl", "0"),
("SpeedLimitControlPolicy", "3"),
("SpeedLimitEngageType", "0"),
("SpeedLimitOffsetType", "0"),
("SpeedLimitValueOffset", "0"),
("SpeedLimitWarningType", "0"),
("SpeedLimitWarningOffsetType", "0"),
("SpeedLimitWarningValueOffset", "0"),
]
if params.get_bool("RecordFrontLock"):