mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-03-03 03:13:54 +08:00
Render only the road view in clips rather than the full main layout, matching the updated UI module structure.
341 lines
14 KiB
Python
Executable File
341 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
import multiprocessing
|
|
import itertools
|
|
import numpy as np
|
|
import tqdm
|
|
from argparse import ArgumentParser
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from openpilot.tools.lib.route import Route
|
|
from openpilot.tools.lib.logreader import LogReader
|
|
from openpilot.tools.lib.filereader import FileReader
|
|
from openpilot.tools.lib.framereader import FrameReader, ffprobe
|
|
from openpilot.selfdrive.test.process_replay.migration import migrate_all
|
|
from openpilot.common.prefix import OpenpilotPrefix
|
|
from openpilot.common.utils import Timer
|
|
from msgq.visionipc import VisionIpcServer, VisionStreamType
|
|
|
|
FRAMERATE = 20
|
|
DEMO_ROUTE, DEMO_START, DEMO_END = 'a2a0ccea32023010/2023-07-27--13-01-19', 90, 105
|
|
|
|
logger = logging.getLogger('clip')
|
|
|
|
|
|
def parse_args():
|
|
parser = ArgumentParser(description="Direct clip renderer")
|
|
parser.add_argument("route", nargs="?", help="Route ID (dongle/route or dongle/route/start/end)")
|
|
parser.add_argument("-s", "--start", type=int, help="Start time in seconds")
|
|
parser.add_argument("-e", "--end", type=int, help="End time in seconds")
|
|
parser.add_argument("-o", "--output", default="output.mp4", help="Output file path")
|
|
parser.add_argument("-d", "--data-dir", help="Local directory with route data")
|
|
parser.add_argument("-t", "--title", help="Title overlay text")
|
|
parser.add_argument("-f", "--file-size", type=float, default=9.0, help="Target file size in MB")
|
|
parser.add_argument("-x", "--speed", type=int, default=1, help="Speed multiplier")
|
|
parser.add_argument("--demo", action="store_true", help="Use demo route with default timing")
|
|
parser.add_argument("--big", action="store_true", default=True, help="Use big UI (2160x1080)")
|
|
parser.add_argument("--qcam", action="store_true", help="Use qcamera instead of fcamera")
|
|
parser.add_argument("--windowed", action="store_true", help="Show window")
|
|
parser.add_argument("--no-metadata", action="store_true", help="Disable metadata overlay")
|
|
parser.add_argument("--no-time-overlay", action="store_true", help="Disable time overlay")
|
|
args = parser.parse_args()
|
|
|
|
if args.demo:
|
|
args.route, args.start, args.end = args.route or DEMO_ROUTE, args.start or DEMO_START, args.end or DEMO_END
|
|
elif not args.route:
|
|
parser.error("route is required (or use --demo)")
|
|
|
|
if args.route and args.route.count('/') == 3:
|
|
parts = args.route.split('/')
|
|
args.route, args.start, args.end = '/'.join(parts[:2]), args.start or int(parts[2]), args.end or int(parts[3])
|
|
|
|
if args.start is None or args.end is None:
|
|
parser.error("--start and --end are required")
|
|
if args.end <= args.start:
|
|
parser.error(f"end ({args.end}) must be greater than start ({args.start})")
|
|
return args
|
|
|
|
|
|
def setup_env(output_path: str, big: bool = False, speed: int = 1, target_mb: float = 0, duration: int = 0):
|
|
os.environ.update({"RECORD": "1", "OFFSCREEN": "1", "RECORD_OUTPUT": str(Path(output_path).with_suffix(".mp4"))})
|
|
if speed > 1:
|
|
os.environ["RECORD_SPEED"] = str(speed)
|
|
if target_mb > 0 and duration > 0:
|
|
os.environ["RECORD_BITRATE"] = f"{int(target_mb * 8 * 1024 / (duration / speed))}k"
|
|
if big:
|
|
os.environ["BIG"] = "1"
|
|
|
|
|
|
def _download_segment(path: str) -> bytes:
|
|
with FileReader(path) as f:
|
|
return bytes(f.read())
|
|
|
|
|
|
def _parse_and_chunk_segment(args: tuple) -> list[dict]:
|
|
raw_data, fps = args
|
|
from openpilot.tools.lib.logreader import _LogFileReader
|
|
messages = migrate_all(list(_LogFileReader("", dat=raw_data, sort_by_time=True)))
|
|
if not messages:
|
|
return []
|
|
|
|
dt_ns, chunks, current, next_time = 1e9 / fps, [], {}, messages[0].logMonoTime + 1e9 / fps # type: ignore[var-annotated]
|
|
for msg in messages:
|
|
if msg.logMonoTime >= next_time:
|
|
chunks.append(current)
|
|
current, next_time = {}, next_time + dt_ns * ((msg.logMonoTime - next_time) // dt_ns + 1)
|
|
current[msg.which()] = msg
|
|
return chunks + [current] if current else chunks
|
|
|
|
|
|
def load_logs_parallel(log_paths: list[str], fps: int = 20) -> list[dict]:
|
|
num_workers = min(16, len(log_paths), (multiprocessing.cpu_count() or 1))
|
|
logger.info(f"Downloading {len(log_paths)} segments with {num_workers} workers...")
|
|
|
|
with ThreadPoolExecutor(max_workers=num_workers) as pool:
|
|
futures = {pool.submit(_download_segment, path): idx for idx, path in enumerate(log_paths)}
|
|
raw_data = {futures[f]: f.result() for f in as_completed(futures)}
|
|
|
|
logger.info("Parsing and chunking segments...")
|
|
with multiprocessing.Pool(num_workers) as pool:
|
|
return list(itertools.chain.from_iterable(pool.map(_parse_and_chunk_segment, [(raw_data[i], fps) for i in range(len(log_paths))])))
|
|
|
|
|
|
def patch_submaster(message_chunks, ui_state):
|
|
# Reset started_frame so alerts render correctly (recv_frame must be >= started_frame)
|
|
ui_state.started_frame = 0
|
|
ui_state.started_time = time.monotonic()
|
|
|
|
def mock_update(timeout=None):
|
|
sm, t = ui_state.sm, time.monotonic()
|
|
sm.updated = dict.fromkeys(sm.services, False)
|
|
if sm.frame < len(message_chunks):
|
|
for svc, msg in message_chunks[sm.frame].items():
|
|
if svc in sm.data:
|
|
sm.seen[svc] = sm.updated[svc] = sm.alive[svc] = sm.valid[svc] = True
|
|
sm.data[svc] = getattr(msg.as_builder(), svc)
|
|
sm.logMonoTime[svc], sm.recv_time[svc], sm.recv_frame[svc] = msg.logMonoTime, t, sm.frame
|
|
sm.frame += 1
|
|
ui_state.sm.update = mock_update
|
|
|
|
|
|
def get_frame_dimensions(camera_path: str) -> tuple[int, int]:
|
|
"""Get frame dimensions from a video file using ffprobe."""
|
|
probe = ffprobe(camera_path)
|
|
stream = probe["streams"][0]
|
|
return stream["width"], stream["height"]
|
|
|
|
|
|
def iter_segment_frames(camera_paths, start_time, end_time, fps=20, use_qcam=False, frame_size: tuple[int, int] | None = None):
|
|
frames_per_seg = fps * 60
|
|
start_frame, end_frame = int(start_time * fps), int(end_time * fps)
|
|
current_seg: int = -1
|
|
seg_frames: FrameReader | np.ndarray | None = None
|
|
|
|
for global_idx in range(start_frame, end_frame):
|
|
seg_idx, local_idx = global_idx // frames_per_seg, global_idx % frames_per_seg
|
|
|
|
if seg_idx != current_seg:
|
|
current_seg = seg_idx
|
|
path = camera_paths[seg_idx] if seg_idx < len(camera_paths) else None
|
|
if not path:
|
|
raise RuntimeError(f"No camera file for segment {seg_idx}")
|
|
|
|
if use_qcam:
|
|
w, h = frame_size or get_frame_dimensions(path)
|
|
with FileReader(path) as f:
|
|
result = subprocess.run(["ffmpeg", "-v", "quiet", "-i", "-", "-f", "rawvideo", "-pix_fmt", "nv12", "-"],
|
|
input=f.read(), capture_output=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg failed: {result.stderr.decode()}")
|
|
seg_frames = np.frombuffer(result.stdout, dtype=np.uint8).reshape(-1, w * h * 3 // 2)
|
|
else:
|
|
seg_frames = FrameReader(path, pix_fmt="nv12")
|
|
|
|
assert seg_frames is not None
|
|
frame = seg_frames[local_idx] if use_qcam else seg_frames.get(local_idx) # type: ignore[index, union-attr]
|
|
yield global_idx, frame
|
|
|
|
|
|
class FrameQueue:
|
|
def __init__(self, camera_paths, start_time, end_time, fps=20, prefetch_count=60, use_qcam=False):
|
|
# Probe first valid camera file for dimensions
|
|
first_path = next((p for p in camera_paths if p), None)
|
|
if not first_path:
|
|
raise RuntimeError("No valid camera paths")
|
|
self.frame_w, self.frame_h = get_frame_dimensions(first_path)
|
|
|
|
self._queue, self._stop, self._error = queue.Queue(maxsize=prefetch_count), threading.Event(), None
|
|
self._thread = threading.Thread(target=self._worker,
|
|
args=(camera_paths, start_time, end_time, fps, use_qcam, (self.frame_w, self.frame_h)), daemon=True)
|
|
self._thread.start()
|
|
|
|
def _worker(self, camera_paths, start_time, end_time, fps, use_qcam, frame_size):
|
|
try:
|
|
for idx, data in iter_segment_frames(camera_paths, start_time, end_time, fps, use_qcam, frame_size):
|
|
if self._stop.is_set():
|
|
break
|
|
self._queue.put((idx, data.tobytes()))
|
|
except Exception as e:
|
|
logger.exception("Decode error")
|
|
self._error = e
|
|
finally:
|
|
self._queue.put(None)
|
|
|
|
def get(self, timeout=60.0):
|
|
if self._error:
|
|
raise self._error
|
|
result = self._queue.get(timeout=timeout)
|
|
if result is None:
|
|
raise StopIteration("No more frames")
|
|
return result
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
while not self._queue.empty():
|
|
try:
|
|
self._queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
self._thread.join(timeout=2.0)
|
|
|
|
|
|
def load_route_metadata(route):
|
|
from openpilot.common.params import Params, UnknownKeyName
|
|
lr = LogReader(route.log_paths()[0])
|
|
init_data, car_params = lr.first('initData'), lr.first('carParams')
|
|
|
|
params = Params()
|
|
for entry in init_data.params.entries:
|
|
try:
|
|
params.put(entry.key, params.cpp2python(entry.key, entry.value))
|
|
except UnknownKeyName:
|
|
pass
|
|
|
|
origin = init_data.gitRemote.split('/')[3] if len(init_data.gitRemote.split('/')) > 3 else 'unknown'
|
|
return {
|
|
'version': init_data.version, 'route': route.name.canonical_name,
|
|
'car': car_params.carFingerprint if car_params else 'unknown', 'origin': origin,
|
|
'branch': init_data.gitBranch, 'commit': init_data.gitCommit[:7], 'modified': str(init_data.dirty).lower(),
|
|
}
|
|
|
|
|
|
def draw_text_box(rl, text, x, y, size, gui_app, font, font_scale, color=None, center=False):
|
|
box_color, text_color = rl.Color(0, 0, 0, 85), color or rl.WHITE
|
|
# measure_text_ex is NOT auto-scaled, so multiply by font_scale
|
|
# draw_text_ex IS auto-scaled, so pass size directly
|
|
text_size = rl.measure_text_ex(font, text, size * font_scale, 0)
|
|
text_width, text_height = int(text_size.x), int(text_size.y)
|
|
if center:
|
|
x = (gui_app.width - text_width) // 2
|
|
rl.draw_rectangle(x - 8, y - 4, text_width + 16, text_height + 8, box_color)
|
|
rl.draw_text_ex(font, text, rl.Vector2(x, y), size, 0, text_color)
|
|
|
|
|
|
def render_overlays(rl, gui_app, font, font_scale, metadata, title, start_time, frame_idx, show_metadata, show_time):
|
|
if show_metadata and metadata and frame_idx < FRAMERATE * 5:
|
|
m = metadata
|
|
text = ", ".join([f"openpilot v{m['version']}", f"route: {m['route']}", f"car: {m['car']}", f"origin: {m['origin']}",
|
|
f"branch: {m['branch']}", f"commit: {m['commit']}", f"modified: {m['modified']}"])
|
|
# Truncate if too wide (leave 20px margin on each side)
|
|
max_width = gui_app.width - 40
|
|
while rl.measure_text_ex(font, text, 15 * font_scale, 0).x > max_width and len(text) > 20:
|
|
text = text[:-4] + "..."
|
|
draw_text_box(rl, text, 0, 8, 15, gui_app, font, font_scale, center=True)
|
|
|
|
if title:
|
|
draw_text_box(rl, title, 0, 60, 32, gui_app, font, font_scale, center=True)
|
|
|
|
if show_time:
|
|
t = start_time + frame_idx / FRAMERATE
|
|
time_text = f"{int(t)//60:02d}:{int(t)%60:02d}"
|
|
time_width = int(rl.measure_text_ex(font, time_text, 24 * font_scale, 0).x)
|
|
draw_text_box(rl, time_text, gui_app.width - time_width - 45, 45, 24, gui_app, font, font_scale)
|
|
|
|
|
|
def clip(route: Route, output: str, start: int, end: int, headless: bool = True, big: bool = False,
|
|
title: str | None = None, show_metadata: bool = True, show_time: bool = True, use_qcam: bool = False):
|
|
timer, duration = Timer(), end - start
|
|
|
|
import pyray as rl
|
|
if big:
|
|
from openpilot.selfdrive.ui.onroad.augmented_road_view import AugmentedRoadView
|
|
else:
|
|
from openpilot.selfdrive.ui.mici.onroad.augmented_road_view import AugmentedRoadView # type: ignore[assignment]
|
|
from openpilot.selfdrive.ui.ui_state import ui_state
|
|
from openpilot.system.ui.lib.application import gui_app, FontWeight, FONT_SCALE
|
|
timer.lap("import")
|
|
|
|
logger.info(f"Clipping {route.name.canonical_name}, {start}s-{end}s ({duration}s)")
|
|
seg_start, seg_end = start // 60, (end - 1) // 60 + 1
|
|
all_chunks = load_logs_parallel(route.log_paths()[seg_start:seg_end], fps=FRAMERATE)
|
|
timer.lap("logs")
|
|
|
|
frame_start = (start - seg_start * 60) * FRAMERATE
|
|
message_chunks = all_chunks[frame_start:frame_start + duration * FRAMERATE]
|
|
if not message_chunks:
|
|
logger.error("No messages to render")
|
|
sys.exit(1)
|
|
|
|
metadata = load_route_metadata(route) if show_metadata else None
|
|
if headless:
|
|
rl.set_config_flags(rl.ConfigFlags.FLAG_WINDOW_HIDDEN)
|
|
|
|
with OpenpilotPrefix(shared_download_cache=True):
|
|
camera_paths = route.qcamera_paths() if use_qcam else route.camera_paths()
|
|
frame_queue = FrameQueue(camera_paths, start, end, fps=FRAMERATE, use_qcam=use_qcam)
|
|
|
|
vipc = VisionIpcServer("camerad")
|
|
vipc.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 4, frame_queue.frame_w, frame_queue.frame_h)
|
|
vipc.start_listener()
|
|
|
|
patch_submaster(message_chunks, ui_state)
|
|
gui_app.init_window("clip", fps=FRAMERATE)
|
|
|
|
road_view = AugmentedRoadView()
|
|
road_view.set_rect(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
|
|
font = gui_app.font(FontWeight.NORMAL)
|
|
timer.lap("setup")
|
|
|
|
frame_idx = 0
|
|
with tqdm.tqdm(total=len(message_chunks), desc="Rendering", unit="frame") as pbar:
|
|
for should_render in gui_app.render():
|
|
if frame_idx >= len(message_chunks):
|
|
break
|
|
_, frame_bytes = frame_queue.get()
|
|
vipc.send(VisionStreamType.VISION_STREAM_ROAD, frame_bytes, frame_idx, int(frame_idx * 5e7), int(frame_idx * 5e7))
|
|
ui_state.update()
|
|
if should_render:
|
|
road_view.render()
|
|
render_overlays(rl, gui_app, font, FONT_SCALE, metadata, title, start, frame_idx, show_metadata, show_time)
|
|
frame_idx += 1
|
|
pbar.update(1)
|
|
timer.lap("render")
|
|
|
|
frame_queue.stop()
|
|
gui_app.close()
|
|
timer.lap("ffmpeg")
|
|
|
|
logger.info(f"Clip saved to: {Path(output).resolve()}")
|
|
logger.info(f"Generated {timer.fmt(duration)}")
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s\t%(message)s")
|
|
args = parse_args()
|
|
assert args.big, "Clips doesn't support mici UI yet. TODO: make it work"
|
|
|
|
setup_env(args.output, big=args.big, speed=args.speed, target_mb=args.file_size, duration=args.end - args.start)
|
|
clip(Route(args.route, data_dir=args.data_dir), args.output, args.start, args.end, not args.windowed,
|
|
args.big, args.title, not args.no_metadata, not args.no_time_overlay, args.qcam)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|