Files
sunnypilot/system/statsd.py
DevTekVE 4b5de0eddb stats: sunnylink integration (#1454)
* sunnylink: add statsd process and related telemetry logging infrastructure

- Introduced `statsd_sp` process for handling Sunnylink-specific stats.
- Enhanced metrics logging with improved directory structure and data handling.

* sunnylink: re-enable and refine stat_handler for telemetry processing

- Reactivated `stat_handler` thread with improved path handling.
- Made `stat_handler` more flexible by allowing directory injection.

* statsd: fix formatting issue in telemetry string generation

- Corrected missing comma between `sunnylink_dongle_id` and `comma_dongle_id`.

* update statsd_sp process configuration for enhanced readiness logic

- Modified `statsd_sp` initialization to include `always_run` alongside `sunnylink_ready_shim`.
- Ensures robust process activation conditions.

* refactor(statsd): enhance and unify StatLogSP implementation

- Replaced custom `StatLogSP` in sunnylink with centralized implementation from `system.statsd`.
- Ensures consistent logic for StatLogSP handling across modules.

* fix

* refactor(statsd): add intercept parameter to StatLogSP for configurable logging

- Introduced optional `intercept` parameter to `StatLogSP` to manage `comma_statlog` initialization.
- Updated usage in `sunnylink` to disable interception where unnecessary.

* Dont complain

* feat(statsd): add raw metric type and SunnyPilot-specific stats collection

- Introduced `METRIC_TYPE.RAW` for base64-encoded raw data metrics.
- Added `sp_stats` thread to export SunnyPilot params as raw metrics.
- Enhanced telemetry handling with decoding and serialization updates.

* refactor(statsd): improve `sp_stats` error handling and param processing

- Enhanced exception handling for `params.get` to prevent crashes.
- Added support for nested dict values to be included in stats.

* refactor(statsd): adjust imports and minor code formatting updates

- Updated `Ratekeeper` import path for consistency with the `openpilot` module structure.
- Fixed minor formatting for improved readability.

* refactor(statsd): update typings and remove unused NoReturn annotation

- Removed unnecessary `NoReturn` typing for `stats_main` to simplify function definition.
- Adjusted `get_influxdb_line_raw` to refine typing for `value` parameter.

* cleanup

* init

* init

* slightly more

* staticmethod

* handle them all

* get them models

* log with route

* more

* car

* Revert "car"

This reverts commit fe1c90cf4d.

* handle capnp

* Revert "handle capnp"

This reverts commit c5aea68803.

* 1 more time

* Revert "1 more time"

This reverts commit a364474fa5.

* Cleaning to expose wider

* feat(interfaces, statsd): log car params to stats system

- Added `STATSLOGSP` import and logging to capture `carFingerprint` in metrics.
- Improved error handling in `get_influxdb_line_raw` for robust metric generation.

* refactor(interfaces): streamline car params logging to stats

- Simplified logging by directly converting `CP` to a dictionary.
- Removed legacy stats aggregation for clarity.

* feat(sunnylink): enable compression for stats in SunnyLink

- Added optional compression for stats payload to support large data.
- Updated `stat_handler` to handle compression and base64 encoding.

* fix(statsd): filter complex types in `get_influxdb_line_raw`

- Skips unsupported types (dict, list, bytes) to prevent formatting errors.
- Simplifies type annotation for `value` parameter.

* fix(statsd): use `json.dumps` for string conversion in `get_influxdb_line_raw`

- Ensures proper handling of special characters in values.
- Prevents potential formatting issues with raw `str()` conversion.

* refactor(interfaces, statsd): update parameter keys for stats logging

- Renamed logged keys for better clarity (`sunnypilot_params` → `sunnypilot.car_params`, `device_params`).
- Ensures consistency across data logs.

* bet

---------

Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
2025-11-04 16:53:31 -05:00

235 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
import base64
import json
import os
from decimal import Decimal
import zmq
import time
import uuid
from pathlib import Path
from collections import defaultdict
from datetime import datetime, UTC, date
from typing import NoReturn
from openpilot.common.params import Params
from cereal.messaging import SubMaster
from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog
from openpilot.system.hardware import HARDWARE
from openpilot.common.file_helpers import atomic_write_in_dir
from openpilot.system.version import get_build_metadata
from openpilot.system.loggerd.config import STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
class METRIC_TYPE:
GAUGE = 'g'
SAMPLE = 'sa'
RAW = 'r'
class StatLog:
def __init__(self):
self.pid = None
self.zctx = None
self.sock = None
self.stats_socket = STATS_SOCKET
def connect(self) -> None:
self.zctx = zmq.Context.instance() or zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect(self.stats_socket)
self.pid = os.getpid()
def __del__(self):
if self.sock is not None:
self.sock.close()
if self.zctx is not None:
self.zctx.term()
def _send(self, metric: str) -> None:
if os.getpid() != self.pid:
self.connect()
try:
self.sock.send_string(metric, zmq.NOBLOCK)
except zmq.error.Again:
# drop :/
pass
def gauge(self, name: str, value: float) -> None:
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
class StatLogSP(StatLog):
def __init__(self, intercept=True):
"""
Initializes the class instance with an optional parameter to determine
if statistical logging should be configured or not.
:param intercept: A boolean flag that indicates whether to initialize
the `comma_statlog`. If True, the `comma_statlog` attribute is
instantiated as a `StatLog` object. Defaults to True.
"""
super().__init__()
self.comma_statlog = StatLog() if intercept else None
self.stats_socket = f"{STATS_SOCKET}_sp"
def connect(self) -> None:
super().connect()
if self.comma_statlog:
self.comma_statlog.connect()
def __del__(self):
super().__del__()
if self.comma_statlog:
self.comma_statlog.__del__()
def _send(self, metric: str) -> None:
super()._send(metric)
if self.comma_statlog:
self.comma_statlog._send(metric)
@staticmethod
def default_converter(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, set):
return list(obj)
if isinstance(obj, Decimal):
return float(obj)
return str(obj) # fallback for unknown types
def raw(self, name: str, value: dict) -> None:
encoded_dict = base64.b64encode(json.dumps(value, default=self.default_converter).encode("utf-8")).decode("utf-8")
self._send(f"{name}:{encoded_dict}|{METRIC_TYPE.RAW}")
def main() -> NoReturn:
dongle_id = Params().get("DongleId")
def get_influxdb_line(measurement: str, value: float | dict[str, float], timestamp: datetime, tags: dict) -> str:
res = f"{measurement}"
for k, v in tags.items():
res += f",{k}={str(v)}"
res += " "
if isinstance(value, float):
value = {'value': value}
for k, v in value.items():
res += f"{k}={v},"
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
return res
# open statistics socket
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.PULL)
sock.bind(STATS_SOCKET)
STATS_DIR = Paths.stats_root()
# initialize stats directory
Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
build_metadata = get_build_metadata()
# initialize tags
tags = {
'started': False,
'version': build_metadata.openpilot.version,
'branch': build_metadata.channel,
'dirty': build_metadata.openpilot.is_dirty,
'origin': build_metadata.openpilot.git_normalized_origin,
'deviceType': HARDWARE.get_device_type(),
}
# subscribe to deviceState for started state
sm = SubMaster(['deviceState'])
idx = 0
boot_uid = str(uuid.uuid4())[:8]
last_flush_time = time.monotonic()
gauges = {}
samples: dict[str, list[float]] = defaultdict(list)
try:
while True:
started_prev = sm['deviceState'].started
sm.update()
# Update metrics
while True:
try:
metric = sock.recv_string(zmq.NOBLOCK)
try:
metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0]
metric_value = float(metric.split('|')[0].split(':')[1])
if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value
elif metric_type == METRIC_TYPE.SAMPLE:
samples[metric_name].append(metric_value)
else:
cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception:
cloudlog.event("malformed metric", metric=metric)
except zmq.error.Again:
break
# flush when started state changes or after FLUSH_TIME_S
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
result = ""
current_time = datetime.now(UTC)
tags['started'] = sm['deviceState'].started
for key, value in gauges.items():
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
for key, values in samples.items():
values.sort()
sample_count = len(values)
sample_sum = sum(values)
stats = {
'count': sample_count,
'min': values[0],
'max': values[-1],
'mean': sample_sum / sample_count,
}
for percentile in [0.05, 0.5, 0.95]:
value = values[int(round(percentile * (sample_count - 1)))]
stats[f"p{int(percentile * 100)}"] = value
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
# clear intermediate data
gauges.clear()
samples.clear()
last_flush_time = time.monotonic()
# check that we aren't filling up the drive
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
if len(result) > 0:
stats_path = os.path.join(STATS_DIR, f"{boot_uid}_{idx}")
with atomic_write_in_dir(stats_path) as f:
f.write(result)
idx += 1
else:
cloudlog.error("stats dir full")
finally:
sock.close()
ctx.term()
if __name__ == "__main__":
main()
else:
statlog = StatLogSP(intercept=True)