Reduce pub-sub memory usage by 10x (#36884)

less mem
This commit is contained in:
Adeeb Shihadeh
2025-12-16 13:27:14 -08:00
committed by GitHub
parent 545f7c6f2a
commit bcdeec3133
10 changed files with 144 additions and 29 deletions

View File

@@ -2,7 +2,7 @@
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
from msgq.ipc_pyx import MultiplePublishersError, IpcError
from msgq import fake_event_handle, pub_sock, sub_sock, drain_sock_raw
from msgq import fake_event_handle, drain_sock_raw
import msgq
import os
@@ -18,6 +18,20 @@ from openpilot.common.util import MovingAverage
NO_TRAVERSAL_LIMIT = 2**64-1
def pub_sock(endpoint: str) -> PubSocket:
service = SERVICE_LIST.get(endpoint)
segment_size = service.queue_size if service else 0
return msgq.pub_sock(endpoint, segment_size)
def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1",
conflate: bool = False, timeout: Optional[int] = None) -> SubSocket:
service = SERVICE_LIST.get(endpoint)
segment_size = service.queue_size if service else 0
return msgq.sub_sock(endpoint, poller=poller, addr=addr, conflate=conflate,
timeout=timeout, segment_size=segment_size)
def reset_context():
msgq.context = Context()

View File

@@ -50,7 +50,7 @@ SubMaster::SubMaster(const std::vector<const char *> &service_list, const std::v
assert(services.count(std::string(name)) > 0);
service serv = services.at(std::string(name));
SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true);
SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true, true, serv.queue_size);
assert(socket != 0);
bool is_polled = inList(poll, name) || poll.empty();
if (is_polled) poller_->registerSocket(socket);
@@ -187,7 +187,8 @@ SubMaster::~SubMaster() {
PubMaster::PubMaster(const std::vector<const char *> &service_list) {
for (auto name : service_list) {
assert(services.count(name) > 0);
PubSocket *socket = PubSocket::create(message_context.context(), name);
service serv = services.at(std::string(name));
PubSocket *socket = PubSocket::create(message_context.context(), name, true, serv.queue_size);
assert(socket);
sockets_[name] = socket;
}

View File

@@ -1,12 +1,22 @@
#!/usr/bin/env python3
from enum import IntEnum
from typing import Optional
# TODO: this should be automatically determined using the capnp schema
class QueueSize(IntEnum):
BIG = 10 * 1024 * 1024 # 10MB - video frames, large AI outputs
MEDIUM = 2 * 1024 * 1024 # 2MB - high freq (CAN), livestream
SMALL = 250 * 1024 # 250KB - most services
class Service:
def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] = None):
def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] = None,
queue_size: QueueSize = QueueSize.SMALL):
self.should_log = should_log
self.frequency = frequency
self.decimation = decimation
self.queue_size = queue_size
_services: dict[str, tuple] = {
@@ -20,15 +30,15 @@ _services: dict[str, tuple] = {
"gpsNMEA": (True, 9.),
"deviceState": (True, 2., 1),
"touch": (True, 20., 1),
"can": (True, 100., 2053), # decimation gives ~3 msgs in a full segment
"controlsState": (True, 100., 10),
"can": (True, 100., 2053, QueueSize.BIG), # decimation gives ~3 msgs in a full segment
"controlsState": (True, 100., 10, QueueSize.MEDIUM),
"selfdriveState": (True, 100., 10),
"pandaStates": (True, 10., 1),
"peripheralState": (True, 2., 1),
"radarState": (True, 20., 5),
"roadEncodeIdx": (False, 20., 1),
"liveTracks": (True, 20.),
"sendcan": (True, 100., 139),
"sendcan": (True, 100., 139, QueueSize.MEDIUM),
"logMessage": (True, 0.),
"errorLogMessage": (True, 0., 1),
"liveCalibration": (True, 4., 4),
@@ -40,7 +50,7 @@ _services: dict[str, tuple] = {
"carOutput": (True, 100., 10),
"longitudinalPlan": (True, 20., 10),
"driverAssistance": (True, 20., 20),
"procLog": (True, 0.5, 15),
"procLog": (True, 0.5, 15, QueueSize.BIG),
"gpsLocationExternal": (True, 10., 10),
"gpsLocation": (True, 1., 1),
"ubloxGnss": (True, 10.),
@@ -62,7 +72,7 @@ _services: dict[str, tuple] = {
"wideRoadEncodeIdx": (False, 20., 1),
"wideRoadCameraState": (True, 20., 20),
"drivingModelData": (True, 20., 10),
"modelV2": (True, 20.),
"modelV2": (True, 20., None, QueueSize.BIG),
"managerState": (True, 2., 1),
"uploaderState": (True, 0., 1),
"navInstruction": (True, 1., 10),
@@ -74,21 +84,21 @@ _services: dict[str, tuple] = {
"rawAudioData": (False, 20.),
"bookmarkButton": (True, 0., 1),
"audioFeedback": (True, 0., 1),
"roadEncodeData": (False, 20., None, QueueSize.BIG),
"driverEncodeData": (False, 20., None, QueueSize.BIG),
"wideRoadEncodeData": (False, 20., None, QueueSize.BIG),
"qRoadEncodeData": (False, 20., None, QueueSize.BIG),
# debug
"uiDebug": (True, 0., 1),
"testJoystick": (True, 0.),
"alertDebug": (True, 20., 5),
"roadEncodeData": (False, 20.),
"driverEncodeData": (False, 20.),
"wideRoadEncodeData": (False, 20.),
"qRoadEncodeData": (False, 20.),
"livestreamWideRoadEncodeIdx": (False, 20.),
"livestreamRoadEncodeIdx": (False, 20.),
"livestreamDriverEncodeIdx": (False, 20.),
"livestreamWideRoadEncodeData": (False, 20.),
"livestreamRoadEncodeData": (False, 20.),
"livestreamDriverEncodeData": (False, 20.),
"livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM),
"livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM),
"livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM),
"customReservedRawData0": (True, 0.),
"customReservedRawData1": (True, 0.),
"customReservedRawData2": (True, 0.),
@@ -106,13 +116,13 @@ def build_header():
h += "#include <map>\n"
h += "#include <string>\n"
h += "struct service { std::string name; bool should_log; float frequency; int decimation; };\n"
h += "struct service { std::string name; bool should_log; float frequency; int decimation; size_t queue_size; };\n"
h += "static std::map<std::string, service> services = {\n"
for k, v in SERVICE_LIST.items():
should_log = "true" if v.should_log else "false"
decimation = -1 if v.decimation is None else v.decimation
h += ' { "%s", {"%s", %s, %f, %d}},\n' % \
(k, k, should_log, v.frequency, decimation)
h += ' { "%s", {"%s", %s, %f, %d, %d}},\n' % \
(k, k, should_log, v.frequency, decimation, v.queue_size)
h += "};\n"
h += "#endif\n"

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
import argparse
from tqdm import tqdm
from cereal.services import SERVICE_LIST, QueueSize
from openpilot.tools.lib.logreader import LogReader
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Analyze message sizes from a log route")
parser.add_argument("route", nargs="?", default="98395b7c5b27882e/000000a8--f87e7cd255",
help="Log route to analyze (default: 98395b7c5b27882e/000000a8--f87e7cd255)")
args = parser.parse_args()
lr = LogReader(args.route)
szs = {}
for msg in tqdm(lr):
sz = len(msg.as_builder().to_bytes())
msg_type = msg.which()
if msg_type not in szs:
szs[msg_type] = {'min': sz, 'max': sz, 'sum': sz, 'count': 1}
else:
szs[msg_type]['min'] = min(szs[msg_type]['min'], sz)
szs[msg_type]['max'] = max(szs[msg_type]['max'], sz)
szs[msg_type]['sum'] += sz
szs[msg_type]['count'] += 1
print()
print(f"{'Service':<36} {'Min (KB)':>12} {'Max (KB)':>12} {'Avg (KB)':>12} {'KB/min':>12} {'KB/sec':>12} {'Minutes in 10MB':>18} {'Seconds in Queue':>18}")
print("-" * 132)
def sort_key(x):
k, v = x
avg = v['sum'] / v['count']
freq = SERVICE_LIST.get(k, None)
freq_val = freq.frequency if freq else 0.0
kb_per_min = (avg * freq_val * 60) / 1024 if freq_val > 0 else 0.0
return kb_per_min
total_kb_per_min = 0.0
RINGBUFFER_SIZE_KB = 10 * 1024 # 10MB old default
for k, v in sorted(szs.items(), key=sort_key, reverse=True):
avg = v['sum'] / v['count']
service = SERVICE_LIST.get(k, None)
freq_val = service.frequency if service else 0.0
queue_size_kb = (service.queue_size / 1024) if service else 250 # default to SMALL
kb_per_min = (avg * freq_val * 60) / 1024 if freq_val > 0 else 0.0
kb_per_sec = kb_per_min / 60
minutes_in_buffer = RINGBUFFER_SIZE_KB / kb_per_min if kb_per_min > 0 else float('inf')
seconds_in_queue = (queue_size_kb / kb_per_sec) if kb_per_sec > 0 else float('inf')
total_kb_per_min += kb_per_min
min_str = f"{minutes_in_buffer:.2f}" if minutes_in_buffer != float('inf') else "inf"
sec_queue_str = f"{seconds_in_queue:.2f}" if seconds_in_queue != float('inf') else "inf"
print(f"{k:<36} {v['min']/1024:>12.2f} {v['max']/1024:>12.2f} {avg/1024:>12.2f} {kb_per_min:>12.2f} {kb_per_sec:>12.2f} {min_str:>18} {sec_queue_str:>18}")
# Summary section
print()
print(f"Total usage: {total_kb_per_min / 1024:.2f} MB/min")
# Calculate memory usage: old (10MB for all) vs new (from services.py)
OLD_SIZE = 10 * 1024 * 1024 # 10MB was the old default
old_total = len(SERVICE_LIST) * OLD_SIZE
new_total = sum(s.queue_size for s in SERVICE_LIST.values())
# Count by queue size
size_counts = {QueueSize.BIG: 0, QueueSize.MEDIUM: 0, QueueSize.SMALL: 0}
for s in SERVICE_LIST.values():
size_counts[s.queue_size] += 1
savings_pct = (1 - new_total / old_total) * 100
print()
print(f"{'Queue Size Comparison':<40}")
print("-" * 60)
print(f"{'Old (10MB default):':<30} {old_total / 1024 / 1024:>10.2f} MB")
print(f"{'New (from services.py):':<30} {new_total / 1024 / 1024:>10.2f} MB")
print(f"{'Savings:':<30} {savings_pct:>10.1f}%")
print()
print(f"{'Breakdown:':<30}")
print(f" BIG (10MB): {size_counts[QueueSize.BIG]:>3} services")
print(f" MEDIUM (2MB): {size_counts[QueueSize.MEDIUM]:>3} services")
print(f" SMALL (250KB): {size_counts[QueueSize.SMALL]:>3} services")

View File

@@ -11,6 +11,7 @@
#include "cereal/gen/cpp/car.capnp.h"
#include "cereal/messaging/messaging.h"
#include "cereal/services.h"
#include "common/ratekeeper.h"
#include "common/swaglog.h"
#include "common/timing.h"
@@ -82,7 +83,7 @@ void can_send_thread(std::vector<Panda *> pandas, bool fake_send) {
AlignedBuffer aligned_buf;
std::unique_ptr<Context> context(Context::create());
std::unique_ptr<SubSocket> subscriber(SubSocket::create(context.get(), "sendcan"));
std::unique_ptr<SubSocket> subscriber(SubSocket::create(context.get(), "sendcan", "127.0.0.1", false, true, services.at("sendcan").queue_size));
assert(subscriber != NULL);
subscriber->setTimeout(100);

View File

@@ -289,7 +289,7 @@ class TestOnroad:
# check for big leaks. note that memory usage is
# expected to go up while the MSGQ buffers fill up
assert np.average(mems) <= 82, "Average memory usage above 85%"
assert np.average(mems) <= 65, "Average memory usage too high"
assert np.max(np.diff(mems)) <= 4, "Max memory increase too high"
assert np.average(np.diff(mems)) <= 1, "Average memory increase too high"

View File

@@ -238,7 +238,7 @@ void loggerd_thread() {
if (it.should_log || (encoder && !livestream_encoder) || record_audio) {
LOGD("logging %s", it.name.c_str());
SubSocket * sock = SubSocket::create(ctx.get(), it.name);
SubSocket * sock = SubSocket::create(ctx.get(), it.name, "127.0.0.1", false, true, it.queue_size);
assert(sock != NULL);
poller->registerSocket(sock);
service_state[sock] = {

View File

@@ -24,7 +24,6 @@ from openpilot.system.version import get_version
from openpilot.tools.lib.helpers import RE
from openpilot.tools.lib.logreader import LogReader
from msgq.visionipc import VisionIpcServer, VisionStreamType
from openpilot.common.transformations.camera import DEVICE_CAMERAS
SentinelType = log.Sentinel.SentinelType
@@ -99,13 +98,17 @@ class TestLoggerd:
return sent_msgs
def _publish_camera_and_audio_messages(self, num_segs=1, segment_length=5):
d = DEVICE_CAMERAS[("tici", "ar0231")]
# Use small frame sizes for testing (width, height, size, stride, uv_offset)
# NV12 format: size = stride * height * 1.5, uv_offset = stride * height
w, h = 320, 240
frame_spec = (w, h, w * h * 3 // 2, w, w * h)
streams = [
(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048 * 2346, 2048, 2048 * 1216), "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048 * 2346, 2048, 2048 * 1216), "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048 * 2346, 2048, 2048 * 1216), "wideRoadCameraState"),
(VisionStreamType.VISION_STREAM_ROAD, frame_spec, "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, frame_spec, "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, frame_spec, "wideRoadCameraState"),
]
sm = messaging.SubMaster(["roadEncodeData"])
pm = messaging.PubMaster([s for _, _, s in streams] + ["rawAudioData"])
vipc_server = VisionIpcServer("camerad")
for stream_type, frame_spec, _ in streams:
@@ -139,6 +142,8 @@ class TestLoggerd:
for _, _, state in streams:
assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001)
sm.update(100) # wait for encode data publish
managed_processes["loggerd"].stop()
managed_processes["encoderd"].stop()

View File

@@ -3,6 +3,8 @@
#include <memory>
#include <string>
#include "cereal/services.h"
#include <QButtonGroup>
#include <QFormLayout>
#include <QRadioButton>
@@ -20,7 +22,7 @@ void DeviceStream::streamThread() {
std::unique_ptr<Context> context(Context::create());
std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString();
std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address));
std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address, false, true, services.at("can").queue_size));
assert(sock != NULL);
// run as fast as messages come in
while (!QThread::currentThread()->isInterruptionRequested()) {