openpilot0/system/manager/test/test_manager.py

77 lines
2.3 KiB
Python

import os
import pytest
import signal
import time
from parameterized import parameterized
from cereal import car
from openpilot.common.params import Params
import openpilot.system.manager.manager as manager
from openpilot.system.manager.process import ensure_running
from openpilot.system.manager.process_config import managed_processes
from openpilot.system.hardware import HARDWARE
os.environ['FAKEUPLOAD'] = "1"
MAX_STARTUP_TIME = 3
BLACKLIST_PROCS = ['manage_athenad', 'pandad', 'pigeond']
@pytest.mark.tici
class TestManager:
def setup_method(self):
HARDWARE.set_power_save(False)
# ensure clean CarParams
params = Params()
params.clear_all()
def teardown_method(self):
manager.manager_cleanup()
def test_manager_prepare(self):
os.environ['PREPAREONLY'] = '1'
manager.main()
def test_blacklisted_procs(self):
# TODO: ensure there are blacklisted procs until we have a dedicated test
assert len(BLACKLIST_PROCS), "No blacklisted procs to test not_run"
@parameterized.expand([(i,) for i in range(10)])
def test_startup_time(self, index):
start = time.monotonic()
os.environ['PREPAREONLY'] = '1'
manager.main()
t = time.monotonic() - start
assert t < MAX_STARTUP_TIME, f"startup took {t}s, expected <{MAX_STARTUP_TIME}s"
@pytest.mark.skip("this test is flaky the way it's currently written, should be moved to test_onroad")
def test_clean_exit(self, subtests):
"""
Ensure all processes exit cleanly when stopped.
"""
HARDWARE.set_power_save(False)
manager.manager_init()
CP = car.CarParams.new_message()
procs = ensure_running(managed_processes.values(), True, Params(), CP, not_run=BLACKLIST_PROCS)
time.sleep(10)
for p in procs:
with subtests.test(proc=p.name):
state = p.get_process_state_msg()
assert state.running, f"{p.name} not running"
exit_code = p.stop(retry=False)
assert p.name not in BLACKLIST_PROCS, f"{p.name} was started"
assert exit_code is not None, f"{p.name} failed to exit"
# TODO: interrupted blocking read exits with 1 in cereal. use a more unique return code
exit_codes = [0, 1]
if p.sigkill:
exit_codes = [-signal.SIGKILL]
assert exit_code in exit_codes, f"{p.name} died with {exit_code}"