ui: add big (tizi) replay (#37198)

* init: tizi_replay.py from pr 37123

* separate coverage folder

* ui replay: adjust HOLD constant, fix coverage, use separate folder for coverage

* openpilot prefix

* fix directory

* fix ui_state

* fix settings click pos

* remove

* attempt merge replay files

* remove

* todo

* fix recording

* spacing

* simplify

* comment

* refactor hold

* refactor: remove layout definitions from VARIANTS and import conditionally in run_replay

* refactor:  remove VARIANTS config

* add argparser with --big flag and improve coverage sources

* refactor

* lowercase

* refactor: combine scripts

* add types

* refactor: move imports for gui_app and ui_state to improve coverage and organization

* update

* update script

* comment

* fix headless

* todo

* fix: get_time and get_frame_time determinism

* todo

* remove file accidently commited

* fix: improve inject_click and handle_event for deterministic event timestamps

* comment

* simplify add

* refactor script building

* fix mici clicks

* pass in pm

* fix wifi state

* refactor clicks

* more refactor

* click cancel instead of remove overlay

* setup_send_fn

* add setup fn

* dummy update

* change

* remove todo

* rename fn to frame_fn

* refactor

* fix workflow

* rename raylib ui preview to old

* rename mici workflow

* fix diff videos

* ignore sub html and mp4 files

* rename for diff

* rename for diff again (mici)

* use ScriptEvent instead of DummyEvent, and move mouse events directly to it; rename hold to wait

* fix: only import MouseEvent for type hint to fix coverage

* adjust settings button click

* clarify

* move ScriptEvent to replay_script

* add handle_event function

* remove passing in setup function, and refactor click events

* clean

* formatting

* refactor

* no import

* comment

* refactor

* refactor setup functions to replay_setup

* refactor

* add ReplayContext

* refactor

* move more setup functions

* refactor and simplify

* refactor

* refactor: add Script class

* refactor: enhance Script event handling and add wait functionality

* refactor

* remove setup_and_click

* use script.setup instead

* comments

* rename wait_frames to wait_after

* add comments

* revert workflows

* revert rename

* move arg parsing to main

* remove quotes

* add type

* return types

* type

* VariantType

* rename to LayoutVariant

* clarify

* switch

* todo

* Revert "fix diff videos"

This reverts commit 7a6e45a409cb7e6d7a330317639fcee74ef8bd31.

* add todos

* add more coverage

* wait 2 frames by default

* add comment

* comment

* switch

* fix space

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* remove extra

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Remove unnecessary blank line in ReplayContext class

* simplify

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
David
2026-02-15 22:03:30 -06:00
committed by GitHub
parent c393973916
commit 03a4f7ef9a
3 changed files with 318 additions and 76 deletions

View File

@@ -3,7 +3,6 @@ test_translations
test_ui/report_1
test_ui/raylib_report
diff/*.mp4
diff/*.html
diff/**/*.mp4
diff/**/*.html
diff/.coverage
diff/htmlcov/

View File

@@ -1,41 +1,21 @@
#!/usr/bin/env python3
import os
import time
import argparse
import coverage
import pyray as rl
from dataclasses import dataclass
from openpilot.selfdrive.ui.tests.diff.diff import DIFF_OUT_DIR
os.environ["RECORD"] = "1"
if "RECORD_OUTPUT" not in os.environ:
os.environ["RECORD_OUTPUT"] = "mici_ui_replay.mp4"
os.environ["RECORD_OUTPUT"] = os.path.join(DIFF_OUT_DIR, os.environ["RECORD_OUTPUT"])
from typing import Literal
from collections.abc import Callable
from cereal.messaging import PubMaster
from openpilot.common.params import Params
from openpilot.common.prefix import OpenpilotPrefix
from openpilot.selfdrive.ui.tests.diff.diff import DIFF_OUT_DIR
from openpilot.system.version import terms_version, training_version
from openpilot.system.ui.lib.application import gui_app, MousePos, MouseEvent
LayoutVariant = Literal["mici", "tizi"]
FPS = 60
HEADLESS = os.getenv("WINDOWED", "0") == "1"
@dataclass
class DummyEvent:
click: bool = False
# TODO: add some kind of intensity
swipe_left: bool = False
swipe_right: bool = False
swipe_down: bool = False
SCRIPT = [
(0, DummyEvent()),
(FPS * 1, DummyEvent(click=True)),
(FPS * 2, DummyEvent(click=True)),
(FPS * 3, DummyEvent()),
]
HEADLESS = os.getenv("WINDOWED", "0") != "1"
def setup_state():
@@ -44,66 +24,60 @@ def setup_state():
params.put("CompletedTrainingVersion", training_version)
params.put("DongleId", "test123456789")
params.put("UpdaterCurrentDescription", "0.10.1 / test-branch / abc1234 / Nov 30")
return None
def inject_click(coords):
events = []
x, y = coords[0]
events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=True, left_released=False, left_down=False, t=time.monotonic()))
for x, y in coords[1:]:
events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=False, left_down=True, t=time.monotonic()))
x, y = coords[-1]
events.append(MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=True, left_down=False, t=time.monotonic()))
with gui_app._mouse._lock:
gui_app._mouse._events.extend(events)
def handle_event(event: DummyEvent):
if event.click:
inject_click([(gui_app.width // 2, gui_app.height // 2)])
if event.swipe_left:
inject_click([(gui_app.width * 3 // 4, gui_app.height // 2),
(gui_app.width // 4, gui_app.height // 2),
(0, gui_app.height // 2)])
if event.swipe_right:
inject_click([(gui_app.width // 4, gui_app.height // 2),
(gui_app.width * 3 // 4, gui_app.height // 2),
(gui_app.width, gui_app.height // 2)])
if event.swipe_down:
inject_click([(gui_app.width // 2, gui_app.height // 4),
(gui_app.width // 2, gui_app.height * 3 // 4),
(gui_app.width // 2, gui_app.height)])
def run_replay():
from openpilot.selfdrive.ui.ui_state import ui_state # import here for correct param setup (e.g. training guide)
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout # import here for coverage
def run_replay(variant: LayoutVariant) -> None:
from openpilot.selfdrive.ui.ui_state import ui_state # Import within OpenpilotPrefix context so param values are setup correctly
from openpilot.system.ui.lib.application import gui_app # Import here for accurate coverage
from openpilot.selfdrive.ui.tests.diff.replay_script import build_script
setup_state()
os.makedirs(DIFF_OUT_DIR, exist_ok=True)
if not HEADLESS:
if HEADLESS:
rl.set_config_flags(rl.FLAG_WINDOW_HIDDEN)
gui_app.init_window("ui diff test", fps=FPS)
main_layout = MiciMainLayout()
# Dynamically import main layout based on variant
if variant == "mici":
from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout as MainLayout
else:
from openpilot.selfdrive.ui.layouts.main import MainLayout
main_layout = MainLayout()
main_layout.set_rect(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
pm = PubMaster(["deviceState", "pandaStates", "driverStateV2", "selfdriveState"])
script = build_script(pm, main_layout, variant)
script_index = 0
send_fn: Callable | None = None
frame = 0
# Override raylib timing functions to return deterministic values based on frame count instead of real time
rl.get_frame_time = lambda: 1.0 / FPS
rl.get_time = lambda: frame / FPS
# Main loop to replay events and render frames
for should_render in gui_app.render():
while script_index < len(SCRIPT) and SCRIPT[script_index][0] == frame:
_, event = SCRIPT[script_index]
handle_event(event)
# Handle all events for the current frame
while script_index < len(script) and script[script_index][0] == frame:
_, event = script[script_index]
# Call setup function, if any
if event.setup:
event.setup()
# Send mouse events to the application
if event.mouse_events:
with gui_app._mouse._lock:
gui_app._mouse._events.extend(event.mouse_events)
# Update persistent send function
if event.send_fn is not None:
send_fn = event.send_fn
# Move to next script event
script_index += 1
# Keep sending cereal messages for persistent states (onroad, alerts)
if send_fn:
send_fn()
ui_state.update()
if should_render:
@@ -111,7 +85,7 @@ def run_replay():
frame += 1
if script_index >= len(SCRIPT):
if script_index >= len(script):
break
gui_app.close()
@@ -121,14 +95,34 @@ def run_replay():
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--big', action='store_true', help='Use big UI layout (tizi/tici) instead of mici layout')
args = parser.parse_args()
variant: LayoutVariant = 'tizi' if args.big else 'mici'
if args.big:
os.environ["BIG"] = "1"
os.environ["RECORD"] = "1"
os.environ["RECORD_OUTPUT"] = os.path.join(DIFF_OUT_DIR, os.environ.get("RECORD_OUTPUT", f"{variant}_ui_replay.mp4"))
print(f"Running {variant} UI replay...")
with OpenpilotPrefix():
cov = coverage.coverage(source=['openpilot.selfdrive.ui.mici'])
sources = ["openpilot.system.ui"]
if variant == "mici":
sources.append("openpilot.selfdrive.ui.mici")
omit = ["**/*tizi*", "**/*tici*"] # exclude files containing "tizi" or "tici"
else:
sources.extend(["openpilot.selfdrive.ui.layouts", "openpilot.selfdrive.ui.onroad", "openpilot.selfdrive.ui.widgets"])
omit = ["**/*mici*"] # exclude files containing "mici"
cov = coverage.Coverage(source=sources, omit=omit)
with cov.collect():
run_replay()
run_replay(variant)
cov.save()
cov.report()
cov.html_report(directory=os.path.join(DIFF_OUT_DIR, 'htmlcov'))
print("HTML report: htmlcov/index.html")
directory = os.path.join(DIFF_OUT_DIR, f"htmlcov-{variant}")
cov.html_report(directory=directory)
print(f"HTML report: {directory}/index.html")
if __name__ == "__main__":

View File

@@ -0,0 +1,249 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from collections.abc import Callable
from dataclasses import dataclass
from cereal import car, log, messaging
from cereal.messaging import PubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
from openpilot.selfdrive.ui.tests.diff.replay import FPS, LayoutVariant
from openpilot.system.updated.updated import parse_release_notes
WAIT = int(FPS * 0.5) # Default frames to wait after events
AlertSize = log.SelfdriveState.AlertSize
AlertStatus = log.SelfdriveState.AlertStatus
BRANCH_NAME = "this-is-a-really-super-mega-ultra-max-extreme-ultimate-long-branch-name"
@dataclass
class ScriptEvent:
if TYPE_CHECKING:
# Only import for type checking to avoid excluding the application code from coverage
from openpilot.system.ui.lib.application import MouseEvent
setup: Callable | None = None # Setup function to run prior to adding mouse events
mouse_events: list[MouseEvent] | None = None # Mouse events to send to the application on this event's frame
send_fn: Callable | None = None # When set, the main loop uses this as the new persistent sender
ScriptEntry = tuple[int, ScriptEvent] # (frame, event)
class Script:
def __init__(self, fps: int) -> None:
self.fps = fps
self.frame = 0
self.entries: list[ScriptEntry] = []
def get_frame_time(self) -> float:
return self.frame / self.fps
def add(self, event: ScriptEvent, before: int = 0, after: int = 0) -> None:
"""Add event to the script, optionally with the given number of frames to wait before or after the event."""
self.frame += before
self.entries.append((self.frame, event))
self.frame += after
def end(self) -> None:
"""Add a final empty event to mark the end of the script."""
self.add(ScriptEvent()) # Without this, it will just end on the last event without waiting for any specified delay after it
def wait(self, frames: int) -> None:
"""Add a delay for the given number of frames followed by an empty event."""
self.add(ScriptEvent(), before=frames)
def setup(self, fn: Callable, wait_after: int = WAIT) -> None:
"""Add a setup function to be called immediately followed by a delay of the given number of frames."""
self.add(ScriptEvent(setup=fn), after=wait_after)
def set_send(self, fn: Callable, wait_after: int = WAIT) -> None:
"""Set a new persistent send function to be called every frame."""
self.add(ScriptEvent(send_fn=fn), after=wait_after)
# TODO: Also add more complex gestures, like swipe or drag
def click(self, x: int, y: int, wait_after: int = WAIT, wait_between: int = 2) -> None:
"""Add a click event to the script for the given position and specify frames to wait between mouse events or after the click."""
# NOTE: By default we wait a couple frames between mouse events so pressed states will be rendered
from openpilot.system.ui.lib.application import MouseEvent, MousePos
# TODO: Add support for long press (left_down=True)
mouse_down = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=True, left_released=False, left_down=False, t=self.get_frame_time())
self.add(ScriptEvent(mouse_events=[mouse_down]), after=wait_between)
mouse_up = MouseEvent(pos=MousePos(x, y), slot=0, left_pressed=False, left_released=True, left_down=False, t=self.get_frame_time())
self.add(ScriptEvent(mouse_events=[mouse_up]), after=wait_after)
# --- Setup functions ---
def put_update_params(params: Params | None = None) -> None:
if params is None:
params = Params()
params.put("UpdaterCurrentReleaseNotes", parse_release_notes(BASEDIR))
params.put("UpdaterNewReleaseNotes", parse_release_notes(BASEDIR))
params.put("UpdaterTargetBranch", BRANCH_NAME)
def setup_offroad_alerts() -> None:
put_update_params(Params())
set_offroad_alert("Offroad_TemperatureTooHigh", True, extra_text='99C')
set_offroad_alert("Offroad_ExcessiveActuation", True, extra_text='longitudinal')
set_offroad_alert("Offroad_IsTakingSnapshot", True)
def setup_update_available() -> None:
params = Params()
params.put_bool("UpdateAvailable", True)
params.put("UpdaterNewDescription", f"0.10.2 / {BRANCH_NAME} / 0a1b2c3 / Jan 01")
put_update_params(params)
def setup_developer_params() -> None:
CP = car.CarParams()
CP.alphaLongitudinalAvailable = True
Params().put("CarParamsPersistent", CP.to_bytes())
# --- Send functions ---
def send_onroad(pm: PubMaster) -> None:
ds = messaging.new_message('deviceState')
ds.deviceState.started = True
ds.deviceState.networkType = log.DeviceState.NetworkType.wifi
ps = messaging.new_message('pandaStates', 1)
ps.pandaStates[0].pandaType = log.PandaState.PandaType.dos
ps.pandaStates[0].ignitionLine = True
pm.send('deviceState', ds)
pm.send('pandaStates', ps)
def make_network_state_setup(pm: PubMaster, network_type) -> Callable:
def _send() -> None:
ds = messaging.new_message('deviceState')
ds.deviceState.networkType = network_type
pm.send('deviceState', ds)
return _send
def make_alert_setup(pm: PubMaster, size, text1, text2, status) -> Callable:
def _send() -> None:
send_onroad(pm)
alert = messaging.new_message('selfdriveState')
ss = alert.selfdriveState
ss.alertSize = size
ss.alertText1 = text1
ss.alertText2 = text2
ss.alertStatus = status
pm.send('selfdriveState', alert)
return _send
# --- Script builders ---
def build_mici_script(pm: PubMaster, main_layout, script: Script) -> None:
"""Build the replay script for the mici layout."""
from openpilot.system.ui.lib.application import gui_app
center = (gui_app.width // 2, gui_app.height // 2)
# TODO: Explore more
script.wait(FPS)
script.click(*center, FPS) # Open settings
script.click(*center, FPS) # Open toggles
script.end()
def build_tizi_script(pm: PubMaster, main_layout, script: Script) -> None:
"""Build the replay script for the tizi layout."""
def make_home_refresh_setup(fn: Callable) -> Callable:
"""Return setup function that calls the given function to modify state and forces an immediate refresh on the home layout."""
from openpilot.selfdrive.ui.layouts.main import MainState
def setup():
fn()
main_layout._layouts[MainState.HOME].last_refresh = 0
return setup
# TODO: Better way of organizing the events
# === Homescreen ===
script.set_send(make_network_state_setup(pm, log.DeviceState.NetworkType.wifi))
# === Offroad Alerts (auto-transitions via HomeLayout refresh) ===
script.setup(make_home_refresh_setup(setup_offroad_alerts))
# === Update Available (auto-transitions via HomeLayout refresh) ===
script.setup(make_home_refresh_setup(setup_update_available))
# === Settings - Device (click sidebar settings button) ===
script.click(150, 90)
script.click(1985, 790) # reset calibration confirmation
script.click(650, 750) # cancel
# === Settings - Network ===
script.click(278, 450)
script.click(1880, 100) # advanced network settings
script.click(630, 80) # back
# === Settings - Toggles ===
script.click(278, 600)
script.click(1200, 280) # experimental mode description
# === Settings - Software ===
script.setup(put_update_params, wait_after=0)
script.click(278, 720)
# === Settings - Firehose ===
script.click(278, 845)
# === Settings - Developer (set CarParamsPersistent first) ===
script.setup(setup_developer_params, wait_after=0)
script.click(278, 950)
script.click(2000, 960) # toggle alpha long
script.click(1500, 875) # confirm
# === Keyboard modal (SSH keys button in developer panel) ===
script.click(1930, 470) # click SSH keys
script.click(1930, 115) # click cancel on keyboard
# === Close settings ===
script.click(250, 160)
# === Onroad ===
script.set_send(lambda: send_onroad(pm))
script.click(1000, 500) # click onroad to toggle sidebar
# === Onroad alerts ===
# Small alert (normal)
script.set_send(make_alert_setup(pm, AlertSize.small, "Small Alert", "This is a small alert", AlertStatus.normal))
# Medium alert (userPrompt)
script.set_send(make_alert_setup(pm, AlertSize.mid, "Medium Alert", "This is a medium alert", AlertStatus.userPrompt))
# Full alert (critical)
script.set_send(make_alert_setup(pm, AlertSize.full, "DISENGAGE IMMEDIATELY", "Driver Distracted", AlertStatus.critical))
# Full alert multiline
script.set_send(make_alert_setup(pm, AlertSize.full, "Reverse\nGear", "", AlertStatus.normal))
# Full alert long text
script.set_send(make_alert_setup(pm, AlertSize.full, "TAKE CONTROL IMMEDIATELY", "Calibration Invalid: Remount Device & Recalibrate", AlertStatus.userPrompt))
# End
script.end()
def build_script(pm: PubMaster, main_layout, variant: LayoutVariant) -> list[ScriptEntry]:
"""Build the replay script for the appropriate layout variant and return list of script entries."""
print(f"Building {variant} replay script...")
script = Script(FPS)
builder = build_tizi_script if variant == 'tizi' else build_mici_script
builder(pm, main_layout, script)
print(f"Built replay script with {len(script.entries)} events and {script.frame} frames ({script.get_frame_time():.2f} seconds)")
return script.entries