import numpy as np from msgq.visionipc import VisionIpcServer, VisionStreamType from cereal import messaging from openpilot.tools.sim.lib.common import W, H def rgb_to_nv12(rgb): """Convert RGB image to NV12 (YUV420) format using BT.601 coefficients.""" h, w = rgb.shape[:2] r = rgb[:, :, 0].astype(np.int32) g = rgb[:, :, 1].astype(np.int32) b = rgb[:, :, 2].astype(np.int32) # Y plane - BT.601 coefficients (matches original OpenCL kernel) y = (((b * 13 + g * 65 + r * 33) + 64) >> 7) + 16 y = np.clip(y, 0, 255).astype(np.uint8) # Subsample RGB for UV (2x2 box filter) r_sub = (r[0::2, 0::2] + r[0::2, 1::2] + r[1::2, 0::2] + r[1::2, 1::2] + 2) >> 2 g_sub = (g[0::2, 0::2] + g[0::2, 1::2] + g[1::2, 0::2] + g[1::2, 1::2] + 2) >> 2 b_sub = (b[0::2, 0::2] + b[0::2, 1::2] + b[1::2, 0::2] + b[1::2, 1::2] + 2) >> 2 # U and V planes u = np.clip((b_sub * 56 - g_sub * 37 - r_sub * 19 + 0x8080) >> 8, 0, 255).astype(np.uint8) v = np.clip((r_sub * 56 - g_sub * 47 - b_sub * 9 + 0x8080) >> 8, 0, 255).astype(np.uint8) # Interleave UV for NV12 format uv = np.empty((h // 2, w), dtype=np.uint8) uv[:, 0::2] = u uv[:, 1::2] = v return np.concatenate([y.ravel(), uv.ravel()]).tobytes() class Camerad: """Simulates the camerad daemon""" def __init__(self, dual_camera): self.pm = messaging.PubMaster(['roadCameraState', 'wideRoadCameraState']) self.frame_road_id = 0 self.frame_wide_id = 0 self.vipc_server = VisionIpcServer("camerad") self.vipc_server.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 5, W, H) if dual_camera: self.vipc_server.create_buffers(VisionStreamType.VISION_STREAM_WIDE_ROAD, 5, W, H) self.vipc_server.start_listener() def cam_send_yuv_road(self, yuv): self._send_yuv(yuv, self.frame_road_id, 'roadCameraState', VisionStreamType.VISION_STREAM_ROAD) self.frame_road_id += 1 def cam_send_yuv_wide_road(self, yuv): self._send_yuv(yuv, self.frame_wide_id, 'wideRoadCameraState', VisionStreamType.VISION_STREAM_WIDE_ROAD) self.frame_wide_id += 1 def rgb_to_yuv(self, rgb): """Convert RGB to NV12 YUV format.""" assert rgb.shape == (H, W, 3), f"{rgb.shape}" assert rgb.dtype == np.uint8 return rgb_to_nv12(rgb) def _send_yuv(self, yuv, frame_id, pub_type, yuv_type): eof = int(frame_id * 0.05 * 1e9) self.vipc_server.send(yuv_type, yuv, frame_id, eof, eof) dat = messaging.new_message(pub_type, valid=True) msg = { "frameId": frame_id, "transform": [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] } setattr(dat, pub_type, msg) self.pm.send(pub_type, dat)