#!/usr/bin/env python3 from enum import IntEnum from typing import Optional # TODO: this should be automatically determined using the capnp schema class QueueSize(IntEnum): BIG = 10 * 1024 * 1024 # 10MB - video frames, large AI outputs MEDIUM = 2 * 1024 * 1024 # 2MB - high freq (CAN), livestream SMALL = 250 * 1024 # 250KB - most services class Service: def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] = None, queue_size: QueueSize = QueueSize.SMALL): self.should_log = should_log self.frequency = frequency self.decimation = decimation self.queue_size = queue_size _services: dict[str, tuple] = { # service: (should_log, frequency, qlog decimation (optional)) # note: the "EncodeIdx" packets will still be in the log "gyroscope": (True, 104., 104), "accelerometer": (True, 104., 104), "magnetometer": (True, 25.), "lightSensor": (True, 100., 100), "temperatureSensor": (True, 2., 200), "gpsNMEA": (True, 9.), "deviceState": (True, 2., 1), "touch": (True, 20., 1), "can": (True, 100., 2053, QueueSize.BIG), # decimation gives ~3 msgs in a full segment "controlsState": (True, 100., 10, QueueSize.MEDIUM), "selfdriveState": (True, 100., 10), "pandaStates": (True, 10., 1), "peripheralState": (True, 2., 1), "radarState": (True, 20., 5), "roadEncodeIdx": (False, 20., 1), "liveTracks": (True, 20.), "sendcan": (True, 100., 139, QueueSize.MEDIUM), "logMessage": (True, 0.), "errorLogMessage": (True, 0., 1), "liveCalibration": (True, 4., 4), "liveTorqueParameters": (True, 4., 1), "liveDelay": (True, 4., 1), "androidLog": (True, 0.), "carState": (True, 100., 10), "carControl": (True, 100., 10), "carOutput": (True, 100., 10), "longitudinalPlan": (True, 20., 10), "driverAssistance": (True, 20., 20), "procLog": (True, 0.5, 15, QueueSize.BIG), "gpsLocationExternal": (True, 10., 10), "gpsLocation": (True, 1., 1), "ubloxGnss": (True, 10.), "qcomGnss": (True, 2.), "gnssMeasurements": (True, 10., 10), "clocks": (True, 0.1, 1), "ubloxRaw": (True, 20.), "livePose": (True, 20., 4), "liveParameters": (True, 20., 5), "cameraOdometry": (True, 20., 10), "thumbnail": (True, 1 / 60., 1), "onroadEvents": (True, 1., 1), "carParams": (True, 0.02, 1), "roadCameraState": (True, 20., 20), "driverCameraState": (True, 20., 20), "driverEncodeIdx": (False, 20., 1), "driverStateV2": (True, 20., 10), "driverMonitoringState": (True, 20., 10), "wideRoadEncodeIdx": (False, 20., 1), "wideRoadCameraState": (True, 20., 20), "drivingModelData": (True, 20., 10), "modelV2": (True, 20., None, QueueSize.BIG), "managerState": (True, 2., 1), "uploaderState": (True, 0., 1), "navInstruction": (True, 1., 10), "navRoute": (True, 0.), "navThumbnail": (True, 0.), "qRoadEncodeIdx": (False, 20.), "userBookmark": (True, 0., 1), "soundPressure": (True, 10., 10), "rawAudioData": (False, 20.), "bookmarkButton": (True, 0., 1), "audioFeedback": (True, 0., 1), "roadEncodeData": (False, 20., None, QueueSize.BIG), "driverEncodeData": (False, 20., None, QueueSize.BIG), "wideRoadEncodeData": (False, 20., None, QueueSize.BIG), "qRoadEncodeData": (False, 20., None, QueueSize.BIG), # sunnypilot "modelManagerSP": (False, 1., 1, QueueSize.BIG), "backupManagerSP": (False, 1., 1, QueueSize.BIG), "selfdriveStateSP": (True, 100., 10), "longitudinalPlanSP": (True, 20., 10), "onroadEventsSP": (True, 1., 1), "carParamsSP": (True, 0.02, 1), "carControlSP": (True, 100., 10), "carStateSP": (True, 100., 10), "liveMapDataSP": (True, 1., 1), "modelDataV2SP": (True, 20., None, QueueSize.BIG), "liveLocationKalman": (True, 20.), # debug "uiDebug": (True, 0., 1), "testJoystick": (True, 0.), "alertDebug": (True, 20., 5), "livestreamWideRoadEncodeIdx": (False, 20.), "livestreamRoadEncodeIdx": (False, 20.), "livestreamDriverEncodeIdx": (False, 20.), "livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM), "customReservedRawData0": (True, 0.), "customReservedRawData1": (True, 0.), "customReservedRawData2": (True, 0.), } SERVICE_LIST = {name: Service(*vals) for idx, (name, vals) in enumerate(_services.items())} def build_header(): h = "" h += "/* THIS IS AN AUTOGENERATED FILE, PLEASE EDIT services.py */\n" h += "#ifndef __SERVICES_H\n" h += "#define __SERVICES_H\n" h += "#include \n" h += "#include \n" h += "struct service { std::string name; bool should_log; float frequency; int decimation; size_t queue_size; };\n" h += "static std::map services = {\n" for k, v in SERVICE_LIST.items(): should_log = "true" if v.should_log else "false" decimation = -1 if v.decimation is None else v.decimation h += ' { "%s", {"%s", %s, %f, %d, %d}},\n' % \ (k, k, should_log, v.frequency, decimation, v.queue_size) h += "};\n" h += "#endif\n" return h if __name__ == "__main__": print(build_header())