mirror of https://github.com/commaai/openpilot.git
controlsd: add state machine tests (#24107)
* Handle NO_ENTRY in PRE_ENABLED
* add test
* add preEnabled NO_ENTRY test
* stash
* test soft disable
* tuples
* remove overriding until it's merged in
* use Events class
* fix tests and split out
* don't rely on controlsd's counter
old-commit-hash: d4f330447a
This commit is contained in:
parent
56bfed0c15
commit
8e11fbe2db
|
@ -38,9 +38,7 @@ NOSENSOR = "NOSENSOR" in os.environ
|
|||
IGNORE_PROCESSES = {"rtshield", "uploader", "deleter", "loggerd", "logmessaged", "tombstoned",
|
||||
"logcatd", "proclogd", "clocksd", "updated", "timezoned", "manage_athenad",
|
||||
"statsd", "shutdownd"} | \
|
||||
{k for k, v in managed_processes.items() if not v.enabled}
|
||||
|
||||
ACTUATOR_FIELDS = set(car.CarControl.Actuators.schema.fields.keys())
|
||||
{k for k, v in managed_processes.items() if not v.enabled}
|
||||
|
||||
ThermalStatus = log.DeviceState.ThermalStatus
|
||||
State = log.ControlsState.OpenpilotState
|
||||
|
@ -52,11 +50,15 @@ EventName = car.CarEvent.EventName
|
|||
ButtonEvent = car.CarState.ButtonEvent
|
||||
SafetyModel = car.CarParams.SafetyModel
|
||||
|
||||
IGNORED_SAFETY_MODES = [SafetyModel.silent, SafetyModel.noOutput]
|
||||
IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
|
||||
CSID_MAP = {"0": EventName.roadCameraError, "1": EventName.wideRoadCameraError, "2": EventName.driverCameraError}
|
||||
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
|
||||
ACTIVE_STATES = (State.enabled, State.softDisabling)
|
||||
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
|
||||
|
||||
|
||||
class Controls:
|
||||
def __init__(self, sm=None, pm=None, can_sock=None):
|
||||
def __init__(self, sm=None, pm=None, can_sock=None, CI=None):
|
||||
config_realtime_process(4 if TICI else 3, Priority.CTRL_HIGH)
|
||||
|
||||
# Setup sockets
|
||||
|
@ -89,11 +91,15 @@ class Controls:
|
|||
if TICI:
|
||||
self.log_sock = messaging.sub_sock('androidLog')
|
||||
|
||||
# wait for one pandaState and one CAN packet
|
||||
print("Waiting for CAN messages...")
|
||||
get_one_can(self.can_sock)
|
||||
if CI is None:
|
||||
# wait for one pandaState and one CAN packet
|
||||
print("Waiting for CAN messages...")
|
||||
get_one_can(self.can_sock)
|
||||
|
||||
self.CI, self.CP = get_car(self.can_sock, self.pm.sock['sendcan'])
|
||||
else:
|
||||
self.CI, self.CP = CI, CI.CP
|
||||
|
||||
self.CI, self.CP = get_car(self.can_sock, self.pm.sock['sendcan'])
|
||||
self.CP.alternativeExperience = 0 # see panda/board/safety_declarations.h for allowed values
|
||||
|
||||
# read params
|
||||
|
@ -415,7 +421,7 @@ class Controls:
|
|||
|
||||
self.current_alert_types = [ET.PERMANENT]
|
||||
|
||||
# ENABLED, PRE ENABLING, SOFT DISABLING
|
||||
# ENABLED, SOFT DISABLING, PRE ENABLING
|
||||
if self.state != State.disabled:
|
||||
# user and immediate disable always have priority in a non-disabled state
|
||||
if self.events.any(ET.USER_DISABLE):
|
||||
|
@ -448,7 +454,10 @@ class Controls:
|
|||
|
||||
# PRE ENABLING
|
||||
elif self.state == State.preEnabled:
|
||||
if not self.events.any(ET.PRE_ENABLE):
|
||||
if self.events.any(ET.NO_ENTRY):
|
||||
self.state = State.disabled
|
||||
self.current_alert_types.append(ET.NO_ENTRY)
|
||||
elif not self.events.any(ET.PRE_ENABLE):
|
||||
self.state = State.enabled
|
||||
else:
|
||||
self.current_alert_types.append(ET.PRE_ENABLE)
|
||||
|
@ -468,14 +477,12 @@ class Controls:
|
|||
if not self.CP.pcmCruise:
|
||||
self.v_cruise_kph = initialize_v_cruise(CS.vEgo, CS.buttonEvents, self.v_cruise_kph_last)
|
||||
|
||||
# Check if actuators are enabled
|
||||
self.active = self.state in (State.enabled, State.softDisabling)
|
||||
# Check if openpilot is engaged and actuators are enabled
|
||||
self.enabled = self.state in ENABLED_STATES
|
||||
self.active = self.state in ACTIVE_STATES
|
||||
if self.active:
|
||||
self.current_alert_types.append(ET.WARNING)
|
||||
|
||||
# Check if openpilot is engaged
|
||||
self.enabled = self.active or self.state == State.preEnabled
|
||||
|
||||
def state_control(self, CS):
|
||||
"""Given the state, this function returns a CarControl packet"""
|
||||
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
|
||||
from cereal import car, log
|
||||
from common.realtime import DT_CTRL
|
||||
from selfdrive.car.car_helpers import interfaces
|
||||
from selfdrive.controls.controlsd import Controls, SOFT_DISABLE_TIME
|
||||
from selfdrive.controls.lib.events import Events, ET, Alert, Priority, AlertSize, AlertStatus, VisualAlert, \
|
||||
AudibleAlert, EVENTS
|
||||
|
||||
State = log.ControlsState.OpenpilotState
|
||||
|
||||
# The event types that maintain the current state
|
||||
MAINTAIN_STATES = {State.enabled: None, State.disabled: None, State.softDisabling: ET.SOFT_DISABLE,
|
||||
State.preEnabled: ET.PRE_ENABLE}
|
||||
ALL_STATES = tuple((state for state in State.schema.enumerants.values() if
|
||||
state != State.overriding)) # TODO: remove overriding exception
|
||||
# The event types checked in DISABLED section of state machine
|
||||
ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE)
|
||||
|
||||
|
||||
def make_event(event_types):
|
||||
event = {}
|
||||
for ev in event_types:
|
||||
event[ev] = Alert("", "", AlertStatus.normal, AlertSize.small, Priority.LOW,
|
||||
VisualAlert.none, AudibleAlert.none, 1.)
|
||||
EVENTS[0] = event
|
||||
return 0
|
||||
|
||||
|
||||
class TestStateMachine(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
CarInterface, CarController, CarState = interfaces["mock"]
|
||||
CP = CarInterface.get_params("mock")
|
||||
CI = CarInterface(CP, CarController, CarState)
|
||||
|
||||
self.controlsd = Controls(CI=CI)
|
||||
self.controlsd.events = Events()
|
||||
self.controlsd.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
|
||||
self.CS = car.CarState()
|
||||
|
||||
def test_immediate_disable(self):
|
||||
for state in ALL_STATES:
|
||||
self.controlsd.events.add(make_event([MAINTAIN_STATES[state], ET.IMMEDIATE_DISABLE]))
|
||||
self.controlsd.state = state
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(State.disabled, self.controlsd.state)
|
||||
self.controlsd.events.clear()
|
||||
|
||||
def test_user_disable(self):
|
||||
for state in ALL_STATES:
|
||||
self.controlsd.events.add(make_event([MAINTAIN_STATES[state], ET.USER_DISABLE]))
|
||||
self.controlsd.state = state
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(State.disabled, self.controlsd.state)
|
||||
self.controlsd.events.clear()
|
||||
|
||||
def test_soft_disable(self):
|
||||
for state in ALL_STATES:
|
||||
if state == State.preEnabled: # preEnabled considers NO_ENTRY instead
|
||||
continue
|
||||
self.controlsd.events.add(make_event([MAINTAIN_STATES[state], ET.SOFT_DISABLE]))
|
||||
self.controlsd.state = state
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(self.controlsd.state, State.disabled if state == State.disabled else State.softDisabling)
|
||||
self.controlsd.events.clear()
|
||||
|
||||
def test_soft_disable_timer(self):
|
||||
self.controlsd.state = State.enabled
|
||||
self.controlsd.events.add(make_event([ET.SOFT_DISABLE]))
|
||||
self.controlsd.state_transition(self.CS)
|
||||
for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
|
||||
self.assertEqual(self.controlsd.state, State.softDisabling)
|
||||
self.controlsd.state_transition(self.CS)
|
||||
|
||||
self.assertEqual(self.controlsd.state, State.disabled)
|
||||
|
||||
def test_no_entry(self):
|
||||
# disabled with enable events
|
||||
for et in ENABLE_EVENT_TYPES:
|
||||
self.controlsd.events.add(make_event([ET.NO_ENTRY, et]))
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(self.controlsd.state, State.disabled)
|
||||
self.controlsd.events.clear()
|
||||
|
||||
def test_no_entry_pre_enable(self):
|
||||
# preEnabled with preEnabled event
|
||||
self.controlsd.state = State.preEnabled
|
||||
self.controlsd.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE]))
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(self.controlsd.state, State.disabled)
|
||||
|
||||
def test_maintain_states(self):
|
||||
# Given current state's event type, we should maintain state
|
||||
for state in ALL_STATES:
|
||||
self.controlsd.state = state
|
||||
self.controlsd.events.add(make_event([MAINTAIN_STATES[state]]))
|
||||
self.controlsd.state_transition(self.CS)
|
||||
self.assertEqual(self.controlsd.state, state)
|
||||
self.controlsd.events.clear()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in New Issue