Files
onepilot/system/camerad/snapshot.py
github-actions[bot] 7fa972be6a sunnypilot v2026.02.09-4080
version: sunnypilot v2025.003.000 (dev)
date: 2026-02-09T02:04:38
master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
2026-02-09 02:04:38 +00:00

132 lines
4.0 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess
import time
import numpy as np
from PIL import Image
import cereal.messaging as messaging
from msgq.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.system.hardware import PC
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
from openpilot.system.manager.process_config import managed_processes
VISION_STREAMS = {
"roadCameraState": VisionStreamType.VISION_STREAM_ROAD,
"driverCameraState": VisionStreamType.VISION_STREAM_DRIVER,
"wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD,
}
def jpeg_write(fn, dat):
img = Image.fromarray(dat)
img.save(fn, "JPEG")
def yuv_to_rgb(y, u, v):
ul = np.repeat(np.repeat(u, 2).reshape(u.shape[0], y.shape[1]), 2, axis=0).reshape(y.shape)
vl = np.repeat(np.repeat(v, 2).reshape(v.shape[0], y.shape[1]), 2, axis=0).reshape(y.shape)
yuv = np.dstack((y, ul, vl)).astype(np.int16)
yuv[:, :, 1:] -= 128
m = np.array([
[1.00000, 1.00000, 1.00000],
[0.00000, -0.39465, 2.03211],
[1.13983, -0.58060, 0.00000],
])
rgb = np.dot(yuv, m).clip(0, 255)
return rgb.astype(np.uint8)
def extract_image(buf):
# NV12 format: Y plane followed by interleaved UV plane
# UV plane size is stride * uv_height, where uv_height = align(height/2, 16)
uv_height = ((buf.height // 2) + 15) // 16 * 16
uv_plane_size = buf.stride * uv_height
y = np.array(buf.data[:buf.uv_offset], dtype=np.uint8).reshape((-1, buf.stride))[:buf.height, :buf.width]
uv_data = buf.data[buf.uv_offset:buf.uv_offset + uv_plane_size]
u = np.array(uv_data[::2], dtype=np.uint8).reshape((-1, buf.stride//2))[:buf.height//2, :buf.width//2]
v = np.array(uv_data[1::2], dtype=np.uint8).reshape((-1, buf.stride//2))[:buf.height//2, :buf.width//2]
return yuv_to_rgb(y, u, v)
def get_snapshots(frame="roadCameraState", front_frame="driverCameraState"):
sockets = [s for s in (frame, front_frame) if s is not None]
sm = messaging.SubMaster(sockets)
vipc_clients = {s: VisionIpcClient("camerad", VISION_STREAMS[s], True) for s in sockets}
# wait 4 sec from camerad startup for focus and exposure
while sm[sockets[0]].frameId < int(4. / DT_MDL):
sm.update()
for client in vipc_clients.values():
client.connect(True)
# grab images
rear, front = None, None
if frame is not None:
c = vipc_clients[frame]
rear = extract_image(c.recv())
if front_frame is not None:
c = vipc_clients[front_frame]
front = extract_image(c.recv())
return rear, front
def snapshot():
params = Params()
if (not params.get_bool("IsOffroad")) or params.get_bool("IsTakingSnapshot"):
print("Already taking snapshot")
return None, None
front_camera_allowed = params.get_bool("RecordFront")
params.put_bool("IsTakingSnapshot", True)
set_offroad_alert("Offroad_IsTakingSnapshot", True)
time.sleep(2.0) # Give hardwared time to read the param, or if just started give camerad time to start
# Check if camerad is already started
try:
subprocess.check_call(["pgrep", "camerad"])
print("Camerad already running")
params.put_bool("IsTakingSnapshot", False)
params.remove("Offroad_IsTakingSnapshot")
return None, None
except subprocess.CalledProcessError:
pass
try:
# Allow testing on replay on PC
if not PC:
managed_processes['camerad'].start()
frame = "wideRoadCameraState"
front_frame = "driverCameraState" if front_camera_allowed else None
rear, front = get_snapshots(frame, front_frame)
finally:
managed_processes['camerad'].stop()
params.put_bool("IsTakingSnapshot", False)
set_offroad_alert("Offroad_IsTakingSnapshot", False)
if not front_camera_allowed:
front = None
return rear, front
if __name__ == "__main__":
pic, fpic = snapshot()
if pic is not None:
print(pic.shape)
jpeg_write("/tmp/back.jpg", pic)
if fpic is not None:
jpeg_write("/tmp/front.jpg", fpic)
else:
print("Error taking snapshot")