diff --git a/selfdrive/ui/tests/.gitignore b/selfdrive/ui/tests/.gitignore index 98f2a5e8ce..90a32ffca2 100644 --- a/selfdrive/ui/tests/.gitignore +++ b/selfdrive/ui/tests/.gitignore @@ -3,7 +3,6 @@ test_translations test_ui/report_1 test_ui/raylib_report -diff/*.mp4 -diff/*.html +diff/**/*.mp4 +diff/**/*.html diff/.coverage -diff/htmlcov/ diff --git a/selfdrive/ui/tests/diff/replay.py b/selfdrive/ui/tests/diff/replay.py index a668cc776b..1cb0b42ada 100755 --- a/selfdrive/ui/tests/diff/replay.py +++ b/selfdrive/ui/tests/diff/replay.py @@ -1,41 +1,21 @@ #!/usr/bin/env python3 import os -import time +import argparse import coverage import pyray as rl -from dataclasses import dataclass -from openpilot.selfdrive.ui.tests.diff.diff import DIFF_OUT_DIR - -os.environ["RECORD"] = "1" -if "RECORD_OUTPUT" not in os.environ: - os.environ["RECORD_OUTPUT"] = "mici_ui_replay.mp4" - -os.environ["RECORD_OUTPUT"] = os.path.join(DIFF_OUT_DIR, os.environ["RECORD_OUTPUT"]) +from typing import Literal +from collections.abc import Callable +from cereal.messaging import PubMaster from openpilot.common.params import Params from openpilot.common.prefix import OpenpilotPrefix +from openpilot.selfdrive.ui.tests.diff.diff import DIFF_OUT_DIR from openpilot.system.version import terms_version, training_version -from openpilot.system.ui.lib.application import gui_app, MousePos, MouseEvent + +LayoutVariant = Literal["mici", "tizi"] FPS = 60 -HEADLESS = os.getenv("WINDOWED", "0") == "1" - - -@dataclass -class DummyEvent: - click: bool = False - # TODO: add some kind of intensity - swipe_left: bool = False - swipe_right: bool = False - swipe_down: bool = False - - -SCRIPT = [ - (0, DummyEvent()), - (FPS * 1, DummyEvent(click=True)), - (FPS * 2, DummyEvent(click=True)), - (FPS * 3, DummyEvent()), -] +HEADLESS = os.getenv("WINDOWED", "0") != "1" def setup_state(): @@ -44,66 +24,60 @@ def setup_state(): params.put("CompletedTrainingVersion", training_version) params.put("DongleId", "test123456789") params.put("UpdaterCurrentDescription", "0.10.1 / test-branch / abc1234 / Nov 30") - return None -def inject_click(coords): - events = [] - x, y = coords[0] - events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=True, left_released=False, left_down=False, t=time.monotonic())) - for x, y in coords[1:]: - events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=False, left_down=True, t=time.monotonic())) - x, y = coords[-1] - events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=True, left_down=False, t=time.monotonic())) - - with gui_app._mouse._lock: - gui_app._mouse._events.extend(events) - - -def handle_event(event: DummyEvent): - if event.click: - inject_click([(gui_app.width // 2, gui_app.height // 2)]) - if event.swipe_left: - inject_click([(gui_app.width * 3 // 4, gui_app.height // 2), - (gui_app.width // 4, gui_app.height // 2), - (0, gui_app.height // 2)]) - if event.swipe_right: - inject_click([(gui_app.width // 4, gui_app.height // 2), - (gui_app.width * 3 // 4, gui_app.height // 2), - (gui_app.width, gui_app.height // 2)]) - if event.swipe_down: - inject_click([(gui_app.width // 2, gui_app.height // 4), - (gui_app.width // 2, gui_app.height * 3 // 4), - (gui_app.width // 2, gui_app.height)]) - - -def run_replay(): - from openpilot.selfdrive.ui.ui_state import ui_state # import here for correct param setup (e.g. training guide) - from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout # import here for coverage +def run_replay(variant: LayoutVariant) -> None: + from openpilot.selfdrive.ui.ui_state import ui_state # Import within OpenpilotPrefix context so param values are setup correctly + from openpilot.system.ui.lib.application import gui_app # Import here for accurate coverage + from openpilot.selfdrive.ui.tests.diff.replay_script import build_script setup_state() os.makedirs(DIFF_OUT_DIR, exist_ok=True) - if not HEADLESS: + if HEADLESS: rl.set_config_flags(rl.FLAG_WINDOW_HIDDEN) gui_app.init_window("ui diff test", fps=FPS) - - main_layout = MiciMainLayout() + # Dynamically import main layout based on variant + if variant == "mici": + from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout as MainLayout + else: + from openpilot.selfdrive.ui.layouts.main import MainLayout + main_layout = MainLayout() main_layout.set_rect(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) + pm = PubMaster(["deviceState", "pandaStates", "driverStateV2", "selfdriveState"]) + script = build_script(pm, main_layout, variant) script_index = 0 + + send_fn: Callable | None = None frame = 0 # Override raylib timing functions to return deterministic values based on frame count instead of real time rl.get_frame_time = lambda: 1.0 / FPS rl.get_time = lambda: frame / FPS + # Main loop to replay events and render frames for should_render in gui_app.render(): - while script_index < len(SCRIPT) and SCRIPT[script_index][0] == frame: - _, event = SCRIPT[script_index] - handle_event(event) + # Handle all events for the current frame + while script_index < len(script) and script[script_index][0] == frame: + _, event = script[script_index] + # Call setup function, if any + if event.setup: + event.setup() + # Send mouse events to the application + if event.mouse_events: + with gui_app._mouse._lock: + gui_app._mouse._events.extend(event.mouse_events) + # Update persistent send function + if event.send_fn is not None: + send_fn = event.send_fn + # Move to next script event script_index += 1 + # Keep sending cereal messages for persistent states (onroad, alerts) + if send_fn: + send_fn() + ui_state.update() if should_render: @@ -111,7 +85,7 @@ def run_replay(): frame += 1 - if script_index >= len(SCRIPT): + if script_index >= len(script): break gui_app.close() @@ -121,14 +95,34 @@ def run_replay(): def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--big', action='store_true', help='Use big UI layout (tizi/tici) instead of mici layout') + args = parser.parse_args() + + variant: LayoutVariant = 'tizi' if args.big else 'mici' + + if args.big: + os.environ["BIG"] = "1" + os.environ["RECORD"] = "1" + os.environ["RECORD_OUTPUT"] = os.path.join(DIFF_OUT_DIR, os.environ.get("RECORD_OUTPUT", f"{variant}_ui_replay.mp4")) + + print(f"Running {variant} UI replay...") with OpenpilotPrefix(): - cov = coverage.coverage(source=['openpilot.selfdrive.ui.mici']) + sources = ["openpilot.system.ui"] + if variant == "mici": + sources.append("openpilot.selfdrive.ui.mici") + omit = ["**/*tizi*", "**/*tici*"] # exclude files containing "tizi" or "tici" + else: + sources.extend(["openpilot.selfdrive.ui.layouts", "openpilot.selfdrive.ui.onroad", "openpilot.selfdrive.ui.widgets"]) + omit = ["**/*mici*"] # exclude files containing "mici" + cov = coverage.Coverage(source=sources, omit=omit) with cov.collect(): - run_replay() + run_replay(variant) cov.save() cov.report() - cov.html_report(directory=os.path.join(DIFF_OUT_DIR, 'htmlcov')) - print("HTML report: htmlcov/index.html") + directory = os.path.join(DIFF_OUT_DIR, f"htmlcov-{variant}") + cov.html_report(directory=directory) + print(f"HTML report: {directory}/index.html") if __name__ == "__main__": diff --git a/selfdrive/ui/tests/diff/replay_script.py b/selfdrive/ui/tests/diff/replay_script.py new file mode 100644 index 0000000000..9f2104ec49 --- /dev/null +++ b/selfdrive/ui/tests/diff/replay_script.py @@ -0,0 +1,249 @@ +from __future__ import annotations +from typing import TYPE_CHECKING +from collections.abc import Callable +from dataclasses import dataclass + +from cereal import car, log, messaging +from cereal.messaging import PubMaster +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert +from openpilot.selfdrive.ui.tests.diff.replay import FPS, LayoutVariant +from openpilot.system.updated.updated import parse_release_notes + +WAIT = int(FPS * 0.5) # Default frames to wait after events + +AlertSize = log.SelfdriveState.AlertSize +AlertStatus = log.SelfdriveState.AlertStatus + +BRANCH_NAME = "this-is-a-really-super-mega-ultra-max-extreme-ultimate-long-branch-name" + + +@dataclass +class ScriptEvent: + if TYPE_CHECKING: + # Only import for type checking to avoid excluding the application code from coverage + from openpilot.system.ui.lib.application import MouseEvent + + setup: Callable | None = None # Setup function to run prior to adding mouse events + mouse_events: list[MouseEvent] | None = None # Mouse events to send to the application on this event's frame + send_fn: Callable | None = None # When set, the main loop uses this as the new persistent sender + + +ScriptEntry = tuple[int, ScriptEvent] # (frame, event) + + +class Script: + def __init__(self, fps: int) -> None: + self.fps = fps + self.frame = 0 + self.entries: list[ScriptEntry] = [] + + def get_frame_time(self) -> float: + return self.frame / self.fps + + def add(self, event: ScriptEvent, before: int = 0, after: int = 0) -> None: + """Add event to the script, optionally with the given number of frames to wait before or after the event.""" + self.frame += before + self.entries.append((self.frame, event)) + self.frame += after + + def end(self) -> None: + """Add a final empty event to mark the end of the script.""" + self.add(ScriptEvent()) # Without this, it will just end on the last event without waiting for any specified delay after it + + def wait(self, frames: int) -> None: + """Add a delay for the given number of frames followed by an empty event.""" + self.add(ScriptEvent(), before=frames) + + def setup(self, fn: Callable, wait_after: int = WAIT) -> None: + """Add a setup function to be called immediately followed by a delay of the given number of frames.""" + self.add(ScriptEvent(setup=fn), after=wait_after) + + def set_send(self, fn: Callable, wait_after: int = WAIT) -> None: + """Set a new persistent send function to be called every frame.""" + self.add(ScriptEvent(send_fn=fn), after=wait_after) + + # TODO: Also add more complex gestures, like swipe or drag + def click(self, x: int, y: int, wait_after: int = WAIT, wait_between: int = 2) -> None: + """Add a click event to the script for the given position and specify frames to wait between mouse events or after the click.""" + # NOTE: By default we wait a couple frames between mouse events so pressed states will be rendered + from openpilot.system.ui.lib.application import MouseEvent, MousePos + + # TODO: Add support for long press (left_down=True) + mouse_down = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=True, left_released=False, left_down=False, t=self.get_frame_time()) + self.add(ScriptEvent(mouse_events=[mouse_down]), after=wait_between) + mouse_up = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=True, left_down=False, t=self.get_frame_time()) + self.add(ScriptEvent(mouse_events=[mouse_up]), after=wait_after) + + +# --- Setup functions --- + +def put_update_params(params: Params | None = None) -> None: + if params is None: + params = Params() + params.put("UpdaterCurrentReleaseNotes", parse_release_notes(BASEDIR)) + params.put("UpdaterNewReleaseNotes", parse_release_notes(BASEDIR)) + params.put("UpdaterTargetBranch", BRANCH_NAME) + + +def setup_offroad_alerts() -> None: + put_update_params(Params()) + set_offroad_alert("Offroad_TemperatureTooHigh", True, extra_text='99C') + set_offroad_alert("Offroad_ExcessiveActuation", True, extra_text='longitudinal') + set_offroad_alert("Offroad_IsTakingSnapshot", True) + + +def setup_update_available() -> None: + params = Params() + params.put_bool("UpdateAvailable", True) + params.put("UpdaterNewDescription", f"0.10.2 / {BRANCH_NAME} / 0a1b2c3 / Jan 01") + put_update_params(params) + + +def setup_developer_params() -> None: + CP = car.CarParams() + CP.alphaLongitudinalAvailable = True + Params().put("CarParamsPersistent", CP.to_bytes()) + + +# --- Send functions --- + +def send_onroad(pm: PubMaster) -> None: + ds = messaging.new_message('deviceState') + ds.deviceState.started = True + ds.deviceState.networkType = log.DeviceState.NetworkType.wifi + + ps = messaging.new_message('pandaStates', 1) + ps.pandaStates[0].pandaType = log.PandaState.PandaType.dos + ps.pandaStates[0].ignitionLine = True + + pm.send('deviceState', ds) + pm.send('pandaStates', ps) + + +def make_network_state_setup(pm: PubMaster, network_type) -> Callable: + def _send() -> None: + ds = messaging.new_message('deviceState') + ds.deviceState.networkType = network_type + pm.send('deviceState', ds) + return _send + + +def make_alert_setup(pm: PubMaster, size, text1, text2, status) -> Callable: + def _send() -> None: + send_onroad(pm) + alert = messaging.new_message('selfdriveState') + ss = alert.selfdriveState + ss.alertSize = size + ss.alertText1 = text1 + ss.alertText2 = text2 + ss.alertStatus = status + pm.send('selfdriveState', alert) + return _send + + +# --- Script builders --- + +def build_mici_script(pm: PubMaster, main_layout, script: Script) -> None: + """Build the replay script for the mici layout.""" + from openpilot.system.ui.lib.application import gui_app + + center = (gui_app.width // 2, gui_app.height // 2) + + # TODO: Explore more + script.wait(FPS) + script.click(*center, FPS) # Open settings + script.click(*center, FPS) # Open toggles + script.end() + + +def build_tizi_script(pm: PubMaster, main_layout, script: Script) -> None: + """Build the replay script for the tizi layout.""" + + def make_home_refresh_setup(fn: Callable) -> Callable: + """Return setup function that calls the given function to modify state and forces an immediate refresh on the home layout.""" + from openpilot.selfdrive.ui.layouts.main import MainState + + def setup(): + fn() + main_layout._layouts[MainState.HOME].last_refresh = 0 + + return setup + + # TODO: Better way of organizing the events + + # === Homescreen === + script.set_send(make_network_state_setup(pm, log.DeviceState.NetworkType.wifi)) + + # === Offroad Alerts (auto-transitions via HomeLayout refresh) === + script.setup(make_home_refresh_setup(setup_offroad_alerts)) + + # === Update Available (auto-transitions via HomeLayout refresh) === + script.setup(make_home_refresh_setup(setup_update_available)) + + # === Settings - Device (click sidebar settings button) === + script.click(150, 90) + script.click(1985, 790) # reset calibration confirmation + script.click(650, 750) # cancel + + # === Settings - Network === + script.click(278, 450) + script.click(1880, 100) # advanced network settings + script.click(630, 80) # back + + # === Settings - Toggles === + script.click(278, 600) + script.click(1200, 280) # experimental mode description + + # === Settings - Software === + script.setup(put_update_params, wait_after=0) + script.click(278, 720) + + # === Settings - Firehose === + script.click(278, 845) + + # === Settings - Developer (set CarParamsPersistent first) === + script.setup(setup_developer_params, wait_after=0) + script.click(278, 950) + script.click(2000, 960) # toggle alpha long + script.click(1500, 875) # confirm + + # === Keyboard modal (SSH keys button in developer panel) === + script.click(1930, 470) # click SSH keys + script.click(1930, 115) # click cancel on keyboard + + # === Close settings === + script.click(250, 160) + + # === Onroad === + script.set_send(lambda: send_onroad(pm)) + script.click(1000, 500) # click onroad to toggle sidebar + + # === Onroad alerts === + # Small alert (normal) + script.set_send(make_alert_setup(pm, AlertSize.small, "Small Alert", "This is a small alert", AlertStatus.normal)) + # Medium alert (userPrompt) + script.set_send(make_alert_setup(pm, AlertSize.mid, "Medium Alert", "This is a medium alert", AlertStatus.userPrompt)) + # Full alert (critical) + script.set_send(make_alert_setup(pm, AlertSize.full, "DISENGAGE IMMEDIATELY", "Driver Distracted", AlertStatus.critical)) + # Full alert multiline + script.set_send(make_alert_setup(pm, AlertSize.full, "Reverse\nGear", "", AlertStatus.normal)) + # Full alert long text + script.set_send(make_alert_setup(pm, AlertSize.full, "TAKE CONTROL IMMEDIATELY", "Calibration Invalid: Remount Device & Recalibrate", AlertStatus.userPrompt)) + + # End + script.end() + + +def build_script(pm: PubMaster, main_layout, variant: LayoutVariant) -> list[ScriptEntry]: + """Build the replay script for the appropriate layout variant and return list of script entries.""" + print(f"Building {variant} replay script...") + + script = Script(FPS) + builder = build_tizi_script if variant == 'tizi' else build_mici_script + builder(pm, main_layout, script) + + print(f"Built replay script with {len(script.entries)} events and {script.frame} frames ({script.get_frame_time():.2f} seconds)") + + return script.entries