From bcdeec3133643b4a6e2f16071cdcb130f2637c70 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Tue, 16 Dec 2025 13:27:14 -0800 Subject: [PATCH] Reduce pub-sub memory usage by 10x (#36884) less mem --- cereal/messaging/__init__.py | 16 +++++- cereal/messaging/socketmaster.cc | 5 +- cereal/services.py | 42 ++++++++------ msgq_repo | 2 +- selfdrive/debug/analyze-msg-size.py | 82 ++++++++++++++++++++++++++++ selfdrive/pandad/pandad.cc | 3 +- selfdrive/test/test_onroad.py | 2 +- system/loggerd/loggerd.cc | 2 +- system/loggerd/tests/test_loggerd.py | 15 +++-- tools/cabana/streams/devicestream.cc | 4 +- 10 files changed, 144 insertions(+), 29 deletions(-) create mode 100755 selfdrive/debug/analyze-msg-size.py diff --git a/cereal/messaging/__init__.py b/cereal/messaging/__init__.py index b03285f80a..0ad846f0f4 100644 --- a/cereal/messaging/__init__.py +++ b/cereal/messaging/__init__.py @@ -2,7 +2,7 @@ from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event from msgq.ipc_pyx import MultiplePublishersError, IpcError -from msgq import fake_event_handle, pub_sock, sub_sock, drain_sock_raw +from msgq import fake_event_handle, drain_sock_raw import msgq import os @@ -18,6 +18,20 @@ from openpilot.common.util import MovingAverage NO_TRAVERSAL_LIMIT = 2**64-1 +def pub_sock(endpoint: str) -> PubSocket: + service = SERVICE_LIST.get(endpoint) + segment_size = service.queue_size if service else 0 + return msgq.pub_sock(endpoint, segment_size) + + +def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", + conflate: bool = False, timeout: Optional[int] = None) -> SubSocket: + service = SERVICE_LIST.get(endpoint) + segment_size = service.queue_size if service else 0 + return msgq.sub_sock(endpoint, poller=poller, addr=addr, conflate=conflate, + timeout=timeout, segment_size=segment_size) + + def reset_context(): msgq.context = Context() diff --git a/cereal/messaging/socketmaster.cc b/cereal/messaging/socketmaster.cc index 7f7e2795c4..dfeeb807ee 100644 --- a/cereal/messaging/socketmaster.cc +++ b/cereal/messaging/socketmaster.cc @@ -50,7 +50,7 @@ SubMaster::SubMaster(const std::vector &service_list, const std::v assert(services.count(std::string(name)) > 0); service serv = services.at(std::string(name)); - SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true); + SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true, true, serv.queue_size); assert(socket != 0); bool is_polled = inList(poll, name) || poll.empty(); if (is_polled) poller_->registerSocket(socket); @@ -187,7 +187,8 @@ SubMaster::~SubMaster() { PubMaster::PubMaster(const std::vector &service_list) { for (auto name : service_list) { assert(services.count(name) > 0); - PubSocket *socket = PubSocket::create(message_context.context(), name); + service serv = services.at(std::string(name)); + PubSocket *socket = PubSocket::create(message_context.context(), name, true, serv.queue_size); assert(socket); sockets_[name] = socket; } diff --git a/cereal/services.py b/cereal/services.py index f7269b79ce..e7350aceac 100755 --- a/cereal/services.py +++ b/cereal/services.py @@ -1,12 +1,22 @@ #!/usr/bin/env python3 +from enum import IntEnum from typing import Optional +# TODO: this should be automatically determined using the capnp schema +class QueueSize(IntEnum): + BIG = 10 * 1024 * 1024 # 10MB - video frames, large AI outputs + MEDIUM = 2 * 1024 * 1024 # 2MB - high freq (CAN), livestream + SMALL = 250 * 1024 # 250KB - most services + + class Service: - def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] = None): + def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] = None, + queue_size: QueueSize = QueueSize.SMALL): self.should_log = should_log self.frequency = frequency self.decimation = decimation + self.queue_size = queue_size _services: dict[str, tuple] = { @@ -20,15 +30,15 @@ _services: dict[str, tuple] = { "gpsNMEA": (True, 9.), "deviceState": (True, 2., 1), "touch": (True, 20., 1), - "can": (True, 100., 2053), # decimation gives ~3 msgs in a full segment - "controlsState": (True, 100., 10), + "can": (True, 100., 2053, QueueSize.BIG), # decimation gives ~3 msgs in a full segment + "controlsState": (True, 100., 10, QueueSize.MEDIUM), "selfdriveState": (True, 100., 10), "pandaStates": (True, 10., 1), "peripheralState": (True, 2., 1), "radarState": (True, 20., 5), "roadEncodeIdx": (False, 20., 1), "liveTracks": (True, 20.), - "sendcan": (True, 100., 139), + "sendcan": (True, 100., 139, QueueSize.MEDIUM), "logMessage": (True, 0.), "errorLogMessage": (True, 0., 1), "liveCalibration": (True, 4., 4), @@ -40,7 +50,7 @@ _services: dict[str, tuple] = { "carOutput": (True, 100., 10), "longitudinalPlan": (True, 20., 10), "driverAssistance": (True, 20., 20), - "procLog": (True, 0.5, 15), + "procLog": (True, 0.5, 15, QueueSize.BIG), "gpsLocationExternal": (True, 10., 10), "gpsLocation": (True, 1., 1), "ubloxGnss": (True, 10.), @@ -62,7 +72,7 @@ _services: dict[str, tuple] = { "wideRoadEncodeIdx": (False, 20., 1), "wideRoadCameraState": (True, 20., 20), "drivingModelData": (True, 20., 10), - "modelV2": (True, 20.), + "modelV2": (True, 20., None, QueueSize.BIG), "managerState": (True, 2., 1), "uploaderState": (True, 0., 1), "navInstruction": (True, 1., 10), @@ -74,21 +84,21 @@ _services: dict[str, tuple] = { "rawAudioData": (False, 20.), "bookmarkButton": (True, 0., 1), "audioFeedback": (True, 0., 1), + "roadEncodeData": (False, 20., None, QueueSize.BIG), + "driverEncodeData": (False, 20., None, QueueSize.BIG), + "wideRoadEncodeData": (False, 20., None, QueueSize.BIG), + "qRoadEncodeData": (False, 20., None, QueueSize.BIG), # debug "uiDebug": (True, 0., 1), "testJoystick": (True, 0.), "alertDebug": (True, 20., 5), - "roadEncodeData": (False, 20.), - "driverEncodeData": (False, 20.), - "wideRoadEncodeData": (False, 20.), - "qRoadEncodeData": (False, 20.), "livestreamWideRoadEncodeIdx": (False, 20.), "livestreamRoadEncodeIdx": (False, 20.), "livestreamDriverEncodeIdx": (False, 20.), - "livestreamWideRoadEncodeData": (False, 20.), - "livestreamRoadEncodeData": (False, 20.), - "livestreamDriverEncodeData": (False, 20.), + "livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), + "livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), + "livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM), "customReservedRawData0": (True, 0.), "customReservedRawData1": (True, 0.), "customReservedRawData2": (True, 0.), @@ -106,13 +116,13 @@ def build_header(): h += "#include \n" h += "#include \n" - h += "struct service { std::string name; bool should_log; float frequency; int decimation; };\n" + h += "struct service { std::string name; bool should_log; float frequency; int decimation; size_t queue_size; };\n" h += "static std::map services = {\n" for k, v in SERVICE_LIST.items(): should_log = "true" if v.should_log else "false" decimation = -1 if v.decimation is None else v.decimation - h += ' { "%s", {"%s", %s, %f, %d}},\n' % \ - (k, k, should_log, v.frequency, decimation) + h += ' { "%s", {"%s", %s, %f, %d, %d}},\n' % \ + (k, k, should_log, v.frequency, decimation, v.queue_size) h += "};\n" h += "#endif\n" diff --git a/msgq_repo b/msgq_repo index 92999f6bc1..345878d914 160000 --- a/msgq_repo +++ b/msgq_repo @@ -1 +1 @@ -Subproject commit 92999f6bc19c16170ff984473b43c799162faca1 +Subproject commit 345878d9141d0470d1a96f8fb5e59efb61dd5cf9 diff --git a/selfdrive/debug/analyze-msg-size.py b/selfdrive/debug/analyze-msg-size.py new file mode 100755 index 0000000000..69015a6be2 --- /dev/null +++ b/selfdrive/debug/analyze-msg-size.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import argparse +from tqdm import tqdm + +from cereal.services import SERVICE_LIST, QueueSize +from openpilot.tools.lib.logreader import LogReader + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Analyze message sizes from a log route") + parser.add_argument("route", nargs="?", default="98395b7c5b27882e/000000a8--f87e7cd255", + help="Log route to analyze (default: 98395b7c5b27882e/000000a8--f87e7cd255)") + args = parser.parse_args() + + lr = LogReader(args.route) + + szs = {} + for msg in tqdm(lr): + sz = len(msg.as_builder().to_bytes()) + msg_type = msg.which() + if msg_type not in szs: + szs[msg_type] = {'min': sz, 'max': sz, 'sum': sz, 'count': 1} + else: + szs[msg_type]['min'] = min(szs[msg_type]['min'], sz) + szs[msg_type]['max'] = max(szs[msg_type]['max'], sz) + szs[msg_type]['sum'] += sz + szs[msg_type]['count'] += 1 + + print() + print(f"{'Service':<36} {'Min (KB)':>12} {'Max (KB)':>12} {'Avg (KB)':>12} {'KB/min':>12} {'KB/sec':>12} {'Minutes in 10MB':>18} {'Seconds in Queue':>18}") + print("-" * 132) + def sort_key(x): + k, v = x + avg = v['sum'] / v['count'] + freq = SERVICE_LIST.get(k, None) + freq_val = freq.frequency if freq else 0.0 + kb_per_min = (avg * freq_val * 60) / 1024 if freq_val > 0 else 0.0 + return kb_per_min + total_kb_per_min = 0.0 + RINGBUFFER_SIZE_KB = 10 * 1024 # 10MB old default + for k, v in sorted(szs.items(), key=sort_key, reverse=True): + avg = v['sum'] / v['count'] + service = SERVICE_LIST.get(k, None) + freq_val = service.frequency if service else 0.0 + queue_size_kb = (service.queue_size / 1024) if service else 250 # default to SMALL + kb_per_min = (avg * freq_val * 60) / 1024 if freq_val > 0 else 0.0 + kb_per_sec = kb_per_min / 60 + minutes_in_buffer = RINGBUFFER_SIZE_KB / kb_per_min if kb_per_min > 0 else float('inf') + seconds_in_queue = (queue_size_kb / kb_per_sec) if kb_per_sec > 0 else float('inf') + total_kb_per_min += kb_per_min + min_str = f"{minutes_in_buffer:.2f}" if minutes_in_buffer != float('inf') else "inf" + sec_queue_str = f"{seconds_in_queue:.2f}" if seconds_in_queue != float('inf') else "inf" + print(f"{k:<36} {v['min']/1024:>12.2f} {v['max']/1024:>12.2f} {avg/1024:>12.2f} {kb_per_min:>12.2f} {kb_per_sec:>12.2f} {min_str:>18} {sec_queue_str:>18}") + + # Summary section + print() + print(f"Total usage: {total_kb_per_min / 1024:.2f} MB/min") + + # Calculate memory usage: old (10MB for all) vs new (from services.py) + OLD_SIZE = 10 * 1024 * 1024 # 10MB was the old default + old_total = len(SERVICE_LIST) * OLD_SIZE + + new_total = sum(s.queue_size for s in SERVICE_LIST.values()) + + # Count by queue size + size_counts = {QueueSize.BIG: 0, QueueSize.MEDIUM: 0, QueueSize.SMALL: 0} + for s in SERVICE_LIST.values(): + size_counts[s.queue_size] += 1 + + savings_pct = (1 - new_total / old_total) * 100 + + print() + print(f"{'Queue Size Comparison':<40}") + print("-" * 60) + print(f"{'Old (10MB default):':<30} {old_total / 1024 / 1024:>10.2f} MB") + print(f"{'New (from services.py):':<30} {new_total / 1024 / 1024:>10.2f} MB") + print(f"{'Savings:':<30} {savings_pct:>10.1f}%") + print() + print(f"{'Breakdown:':<30}") + print(f" BIG (10MB): {size_counts[QueueSize.BIG]:>3} services") + print(f" MEDIUM (2MB): {size_counts[QueueSize.MEDIUM]:>3} services") + print(f" SMALL (250KB): {size_counts[QueueSize.SMALL]:>3} services") diff --git a/selfdrive/pandad/pandad.cc b/selfdrive/pandad/pandad.cc index a76cbc46e3..2fd4a4def2 100644 --- a/selfdrive/pandad/pandad.cc +++ b/selfdrive/pandad/pandad.cc @@ -11,6 +11,7 @@ #include "cereal/gen/cpp/car.capnp.h" #include "cereal/messaging/messaging.h" +#include "cereal/services.h" #include "common/ratekeeper.h" #include "common/swaglog.h" #include "common/timing.h" @@ -82,7 +83,7 @@ void can_send_thread(std::vector pandas, bool fake_send) { AlignedBuffer aligned_buf; std::unique_ptr context(Context::create()); - std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan")); + std::unique_ptr subscriber(SubSocket::create(context.get(), "sendcan", "127.0.0.1", false, true, services.at("sendcan").queue_size)); assert(subscriber != NULL); subscriber->setTimeout(100); diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 972f09be30..ed3fca5fb0 100644 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -289,7 +289,7 @@ class TestOnroad: # check for big leaks. note that memory usage is # expected to go up while the MSGQ buffers fill up - assert np.average(mems) <= 82, "Average memory usage above 85%" + assert np.average(mems) <= 65, "Average memory usage too high" assert np.max(np.diff(mems)) <= 4, "Max memory increase too high" assert np.average(np.diff(mems)) <= 1, "Average memory increase too high" diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index 21de1ff33f..47da321024 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -238,7 +238,7 @@ void loggerd_thread() { if (it.should_log || (encoder && !livestream_encoder) || record_audio) { LOGD("logging %s", it.name.c_str()); - SubSocket * sock = SubSocket::create(ctx.get(), it.name); + SubSocket * sock = SubSocket::create(ctx.get(), it.name, "127.0.0.1", false, true, it.queue_size); assert(sock != NULL); poller->registerSocket(sock); service_state[sock] = { diff --git a/system/loggerd/tests/test_loggerd.py b/system/loggerd/tests/test_loggerd.py index 1cac16adcd..9703ac2f5f 100644 --- a/system/loggerd/tests/test_loggerd.py +++ b/system/loggerd/tests/test_loggerd.py @@ -24,7 +24,6 @@ from openpilot.system.version import get_version from openpilot.tools.lib.helpers import RE from openpilot.tools.lib.logreader import LogReader from msgq.visionipc import VisionIpcServer, VisionStreamType -from openpilot.common.transformations.camera import DEVICE_CAMERAS SentinelType = log.Sentinel.SentinelType @@ -99,13 +98,17 @@ class TestLoggerd: return sent_msgs def _publish_camera_and_audio_messages(self, num_segs=1, segment_length=5): - d = DEVICE_CAMERAS[("tici", "ar0231")] + # Use small frame sizes for testing (width, height, size, stride, uv_offset) + # NV12 format: size = stride * height * 1.5, uv_offset = stride * height + w, h = 320, 240 + frame_spec = (w, h, w * h * 3 // 2, w, w * h) streams = [ - (VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048 * 2346, 2048, 2048 * 1216), "roadCameraState"), - (VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048 * 2346, 2048, 2048 * 1216), "driverCameraState"), - (VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048 * 2346, 2048, 2048 * 1216), "wideRoadCameraState"), + (VisionStreamType.VISION_STREAM_ROAD, frame_spec, "roadCameraState"), + (VisionStreamType.VISION_STREAM_DRIVER, frame_spec, "driverCameraState"), + (VisionStreamType.VISION_STREAM_WIDE_ROAD, frame_spec, "wideRoadCameraState"), ] + sm = messaging.SubMaster(["roadEncodeData"]) pm = messaging.PubMaster([s for _, _, s in streams] + ["rawAudioData"]) vipc_server = VisionIpcServer("camerad") for stream_type, frame_spec, _ in streams: @@ -139,6 +142,8 @@ class TestLoggerd: for _, _, state in streams: assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001) + sm.update(100) # wait for encode data publish + managed_processes["loggerd"].stop() managed_processes["encoderd"].stop() diff --git a/tools/cabana/streams/devicestream.cc b/tools/cabana/streams/devicestream.cc index 6de63dfbbc..462dd7a361 100644 --- a/tools/cabana/streams/devicestream.cc +++ b/tools/cabana/streams/devicestream.cc @@ -3,6 +3,8 @@ #include #include +#include "cereal/services.h" + #include #include #include @@ -20,7 +22,7 @@ void DeviceStream::streamThread() { std::unique_ptr context(Context::create()); std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); - std::unique_ptr sock(SubSocket::create(context.get(), "can", address)); + std::unique_ptr sock(SubSocket::create(context.get(), "can", address, false, true, services.at("can").queue_size)); assert(sock != NULL); // run as fast as messages come in while (!QThread::currentThread()->isInterruptionRequested()) {