diff --git a/sunnypilot/selfdrive/car/interfaces.py b/sunnypilot/selfdrive/car/interfaces.py index 59cfeabd6e..2cfbd1eec8 100644 --- a/sunnypilot/selfdrive/car/interfaces.py +++ b/sunnypilot/selfdrive/car/interfaces.py @@ -15,6 +15,8 @@ from openpilot.sunnypilot.selfdrive.controls.lib.speed_limit.helpers import set_ import openpilot.system.sentry as sentry +from sunnypilot.sunnylink.statsd import STATSLOGSP + def log_fingerprint(CP: structs.CarParams) -> None: if CP.carFingerprint == "MOCK": @@ -100,6 +102,9 @@ def setup_interfaces(CI: CarInterfaceBase, params: Params = None) -> None: _initialize_torque_lateral_control(CI, CP, enforce_torque, nnlc_enabled) _cleanup_unsupported_params(CP, CP_SP) + STATSLOGSP.raw('sunnypilot.car_params', CP.to_dict()) + # STATSLOGSP.raw('sunnypilot_params.car_params_sp', CP_SP.to_dict()) # https://github.com/sunnypilot/opendbc/pull/361 + def initialize_params(params) -> list[dict[str, Any]]: keys: list = [] diff --git a/sunnypilot/sunnylink/athena/sunnylinkd.py b/sunnypilot/sunnylink/athena/sunnylinkd.py index d1b38d656f..4c431c5d34 100755 --- a/sunnypilot/sunnylink/athena/sunnylinkd.py +++ b/sunnypilot/sunnylink/athena/sunnylinkd.py @@ -16,8 +16,9 @@ from functools import partial from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.common.swaglog import cloudlog +from openpilot.system.hardware.hw import Paths from openpilot.system.athena.athenad import ws_send, jsonrpc_handler, \ - recv_queue, UploadQueueCache, upload_queue, cur_upload_items, backoff, ws_manage, log_handler, start_local_proxy_shim, upload_handler + recv_queue, UploadQueueCache, upload_queue, cur_upload_items, backoff, ws_manage, log_handler, start_local_proxy_shim, upload_handler, stat_handler from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException, create_connection, WebSocketConnectionClosedException) @@ -51,7 +52,7 @@ def handle_long_poll(ws: WebSocket, exit_event: threading.Event | None) -> None: threading.Thread(target=ws_queue, args=(end_event,), name='ws_queue'), threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), # threading.Thread(target=sunny_log_handler, args=(end_event, comma_prime_cellular_end_event), name='log_handler'), - # threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'), + threading.Thread(target=stat_handler, args=(end_event, Paths.stats_sp_root(), True), name='stat_handler'), ] + [ threading.Thread(target=jsonrpc_handler, args=(end_event, partial(startLocalProxy, end_event),), name=f'worker_{x}') for x in range(HANDLER_THREADS) diff --git a/sunnypilot/sunnylink/statsd.py b/sunnypilot/sunnylink/statsd.py new file mode 100755 index 0000000000..eefff63516 --- /dev/null +++ b/sunnypilot/sunnylink/statsd.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +import base64 +import json +import os +import threading +import traceback + +import zmq +import time +import uuid +from pathlib import Path +from collections import defaultdict +from datetime import datetime, UTC + +from openpilot.common.params import Params +from cereal.messaging import SubMaster +from openpilot.system.hardware.hw import Paths +from openpilot.common.swaglog import cloudlog +from openpilot.system.hardware import HARDWARE +from openpilot.common.file_helpers import atomic_write_in_dir +from openpilot.system.version import get_build_metadata +from openpilot.system.loggerd.config import STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S +from openpilot.system.statsd import METRIC_TYPE, StatLogSP +from openpilot.common.realtime import Ratekeeper + +STATSLOGSP = StatLogSP(intercept=False) + +def sp_stats(end_event): + """Collect sunnypilot-specific statistics and send as raw metrics.""" + rk = Ratekeeper(.1, print_delay_threshold=None) + statlogsp = STATSLOGSP + params = Params() + + def flatten_dict(d, parent_key='', sep='.'): + items = {} + if isinstance(d, dict): + for k, v in d.items(): + new_key = f"{parent_key}{sep}{k}" if parent_key else k + items.update(flatten_dict(v, new_key, sep=sep)) + elif isinstance(d, (list, tuple)): + for i, v in enumerate(d): + new_key = f"{parent_key}[{i}]" + items.update(flatten_dict(v, new_key, sep=sep)) + else: + items[parent_key] = d + return items + + # Collect sunnypilot parameters + stats_dict = {} + + param_keys = [ + 'SunnylinkEnabled', + 'AutoLaneChangeBsmDelay', + 'AutoLaneChangeTimer', + 'CarPlatformBundle', + 'CurrentRoute', + 'DevUIInfo', + 'EnableCopyparty', + 'IntelligentCruiseButtonManagement', + 'QuietMode', + 'RainbowMode', + 'ShowAdvancedControls', + 'Mads', + 'MadsMainCruiseAllowed', + 'MadsSteeringMode', + 'MadsUnifiedEngagementMode', + 'ModelManager_ActiveBundle', + 'ModelManager_Favs', + 'EnableSunnylinkUploader', + 'SunnylinkEnabled', + 'InstallDate', + 'UptimeOffroad', + 'UptimeOnroad', + ] + + while not end_event.is_set(): + try: + for key in param_keys: + + try: + value = params.get(key) + except Exception as e: + stats_dict[key] = e + continue + + if value is None: + continue + + if isinstance(value, (dict, list, tuple)): + stats_dict.update(flatten_dict(value, key)) + else: + stats_dict[key] = value + + if stats_dict: + statlogsp.raw('sunnypilot.device_params', stats_dict) + except Exception as e: + cloudlog.error(f"Exception {e}") + finally: + rk.keep_time() + + +def stats_main(end_event): + comma_dongle_id = Params().get("DongleId") + sunnylink_dongle_id = Params().get("SunnylinkDongleId") + + def get_influxdb_line(measurement: str, value: float | dict[str, float], timestamp: datetime, tags: dict) -> str: + res = f"{measurement}" + for k, v in tags.items(): + res += f",{k}={str(v)}" + res += " " + + if isinstance(value, float): + value = {'value': value} + + for k, v in value.items(): + res += f"{k}={str(v)}," + + res += f"sunnylink_dongle_id=\"{sunnylink_dongle_id}\",comma_dongle_id=\"{comma_dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n" + return res + + def get_influxdb_line_raw(measurement: str, value: dict, timestamp: datetime, tags: dict) -> str: + res = f"{measurement}" + try: + custom_tags = "" + for k, v in tags.items(): + custom_tags += f",{k}={str(v)}" + res += custom_tags + + fields = "" + for k, v in value.items(): + # Skip complex types - only keep simple scalar values + if isinstance(v, (dict, list, bytes, bytearray)): + continue + + fields += f"{k}={json.dumps(v)}," + + res += f" {fields}" + except Exception as e: + cloudlog.error(f"Unable to get influxdb line for: {value}") + res += f",invalid=1 reason={e}," + + res += f"sunnylink_dongle_id=\"{sunnylink_dongle_id}\",comma_dongle_id=\"{comma_dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n" + return res + + # open statistics socket + ctx = zmq.Context.instance() + sock = ctx.socket(zmq.PULL) + sock.bind(f"{STATS_SOCKET}_sp") + + STATS_DIR = Paths.stats_sp_root() + + # initialize stats directory + Path(STATS_DIR).mkdir(parents=True, exist_ok=True) + + build_metadata = get_build_metadata() + + # initialize tags + tags = { + 'started': False, + 'version': build_metadata.openpilot.version, + 'branch': build_metadata.channel, + 'dirty': build_metadata.openpilot.is_dirty, + 'origin': build_metadata.openpilot.git_normalized_origin, + 'deviceType': HARDWARE.get_device_type(), + } + + # subscribe to deviceState for started state + sm = SubMaster(['deviceState']) + + idx = 0 + boot_uid = str(uuid.uuid4())[:8] + last_flush_time = time.monotonic() + gauges = {} + samples: dict[str, list[float]] = defaultdict(list) + raws: dict = defaultdict() + try: + while not end_event.is_set(): + started_prev = sm['deviceState'].started + sm.update() + + # Update metrics + while True: + try: + metric = sock.recv_string(zmq.NOBLOCK) + try: + metric_type = metric.split('|')[1] + metric_name = metric.split(':')[0] + metric_value_raw = metric.split('|')[0].split(':')[1] + + if metric_type == METRIC_TYPE.GAUGE: + metric_value = float(metric_value_raw) + gauges[metric_name] = metric_value + elif metric_type == METRIC_TYPE.SAMPLE: + metric_value = float(metric_value_raw) + samples[metric_name].append(metric_value) + elif metric_type == METRIC_TYPE.RAW: + raws[metric_name] = metric_value_raw + else: + cloudlog.event("unknown metric type", metric_type=metric_type) + except Exception: + print(traceback.format_exc()) + cloudlog.event("malformed metric", metric=metric) + except zmq.error.Again: + break + + # flush when started state changes or after FLUSH_TIME_S + if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev): + result = "" + current_time = datetime.now(UTC) + tags['started'] = sm['deviceState'].started + + for key, value in raws.items(): + decoded_value = json.loads(base64.b64decode(value).decode('utf-8')) + result += get_influxdb_line_raw(key, decoded_value, current_time, tags) + + for key, value in gauges.items(): + result += get_influxdb_line(f"gauge.{key}", value, current_time, tags) + + for key, values in samples.items(): + values.sort() + sample_count = len(values) + sample_sum = sum(values) + + stats = { + 'count': sample_count, + 'min': values[0], + 'max': values[-1], + 'mean': sample_sum / sample_count, + } + for percentile in [0.05, 0.5, 0.95]: + value = values[int(round(percentile * (sample_count - 1)))] + stats[f"p{int(percentile * 100)}"] = value + + result += get_influxdb_line(f"sample.{key}", stats, current_time, tags) + + # clear intermediate data + gauges.clear() + samples.clear() + last_flush_time = time.monotonic() + + # check that we aren't filling up the drive + if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT: + if len(result) > 0: + stats_path = os.path.join(STATS_DIR, f"{boot_uid}_{idx}") + with atomic_write_in_dir(stats_path) as f: + f.write(result) + idx += 1 + else: + cloudlog.error("stats dir full") + finally: + sock.close() + ctx.term() + + +def main(): + rk = Ratekeeper(1, print_delay_threshold=None) + end_event = threading.Event() + + threads = [ + threading.Thread(target=stats_main, args=(end_event,)), + threading.Thread(target=sp_stats, args=(end_event,)), + ] + + for t in threads: + t.start() + + try: + while all(t.is_alive() for t in threads): + rk.keep_time() + finally: + end_event.set() + + for t in threads: + t.join() + + +if __name__ == "__main__": + main() diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 42c9cf8a1c..716733021a 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -744,26 +744,40 @@ def log_handler(end_event: threading.Event, log_attr_name=LOG_ATTR_NAME) -> None cloudlog.exception("athena.log_handler.exception") -def stat_handler(end_event: threading.Event) -> None: - STATS_DIR = Paths.stats_root() +def stat_handler(end_event: threading.Event, stats_dir=None, is_sunnylink=False) -> None: + stats_dir = stats_dir or Paths.stats_root() last_scan = 0.0 while not end_event.is_set(): curr_scan = time.monotonic() try: if curr_scan - last_scan > 10: - stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(STATS_DIR))) + stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(stats_dir))) if len(stat_filenames) > 0: - stat_path = os.path.join(STATS_DIR, stat_filenames[0]) + stat_path = os.path.join(stats_dir, stat_filenames[0]) with open(stat_path) as f: + payload = f.read() + is_compressed = False + + # Log the current size of the file + if is_sunnylink: + # Compress and encode the data if it exceeds the maximum size + compressed_data = gzip.compress(payload.encode()) + payload = base64.b64encode(compressed_data).decode() + is_compressed = True + jsonrpc = { "method": "storeStats", "params": { - "stats": f.read() + "stats": payload }, "jsonrpc": "2.0", "id": stat_filenames[0] } + + if is_sunnylink and is_compressed: + jsonrpc["params"]["compressed"] = is_compressed + low_priority_send_queue.put_nowait(json.dumps(jsonrpc)) os.remove(stat_path) last_scan = curr_scan diff --git a/system/hardware/hw.py b/system/hardware/hw.py index d24857e8bd..3527aac872 100644 --- a/system/hardware/hw.py +++ b/system/hardware/hw.py @@ -55,6 +55,13 @@ class Paths: else: return "/data/stats/" + @staticmethod + def stats_sp_root() -> str: + if PC: + return str(Path(Paths.comma_home()) / "stats") + else: + return "/data/stats_sp/" + @staticmethod def config_root() -> str: if PC: diff --git a/system/manager/process_config.py b/system/manager/process_config.py index c5f20e511b..6e52635a64 100644 --- a/system/manager/process_config.py +++ b/system/manager/process_config.py @@ -164,6 +164,7 @@ procs = [ # sunnylink <3 DaemonProcess("manage_sunnylinkd", "sunnypilot.sunnylink.athena.manage_sunnylinkd", "SunnylinkdPid"), PythonProcess("sunnylink_registration_manager", "sunnypilot.sunnylink.registration_manager", sunnylink_need_register_shim), + PythonProcess("statsd_sp", "sunnypilot.sunnylink.statsd", and_(always_run, sunnylink_ready_shim)), ] # sunnypilot diff --git a/system/statsd.py b/system/statsd.py index d60064fc91..89ffa0d6fc 100755 --- a/system/statsd.py +++ b/system/statsd.py @@ -1,11 +1,15 @@ #!/usr/bin/env python3 +import base64 +import json import os +from decimal import Decimal + import zmq import time import uuid from pathlib import Path from collections import defaultdict -from datetime import datetime, UTC +from datetime import datetime, UTC, date from typing import NoReturn from openpilot.common.params import Params @@ -21,18 +25,21 @@ from openpilot.system.loggerd.config import STATS_DIR_FILE_LIMIT, STATS_SOCKET, class METRIC_TYPE: GAUGE = 'g' SAMPLE = 'sa' + RAW = 'r' + class StatLog: def __init__(self): self.pid = None self.zctx = None self.sock = None + self.stats_socket = STATS_SOCKET def connect(self) -> None: - self.zctx = zmq.Context() + self.zctx = zmq.Context.instance() or zmq.Context() self.sock = self.zctx.socket(zmq.PUSH) self.sock.setsockopt(zmq.LINGER, 10) - self.sock.connect(STATS_SOCKET) + self.sock.connect(self.stats_socket) self.pid = os.getpid() def __del__(self): @@ -60,6 +67,50 @@ class StatLog: self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}") +class StatLogSP(StatLog): + def __init__(self, intercept=True): + """ + Initializes the class instance with an optional parameter to determine + if statistical logging should be configured or not. + + :param intercept: A boolean flag that indicates whether to initialize + the `comma_statlog`. If True, the `comma_statlog` attribute is + instantiated as a `StatLog` object. Defaults to True. + """ + super().__init__() + self.comma_statlog = StatLog() if intercept else None + self.stats_socket = f"{STATS_SOCKET}_sp" + + def connect(self) -> None: + super().connect() + if self.comma_statlog: + self.comma_statlog.connect() + + def __del__(self): + super().__del__() + if self.comma_statlog: + self.comma_statlog.__del__() + + def _send(self, metric: str) -> None: + super()._send(metric) + if self.comma_statlog: + self.comma_statlog._send(metric) + + @staticmethod + def default_converter(obj): + if isinstance(obj, (datetime, date)): + return obj.isoformat() + if isinstance(obj, set): + return list(obj) + if isinstance(obj, Decimal): + return float(obj) + return str(obj) # fallback for unknown types + + def raw(self, name: str, value: dict) -> None: + encoded_dict = base64.b64encode(json.dumps(value, default=self.default_converter).encode("utf-8")).decode("utf-8") + self._send(f"{name}:{encoded_dict}|{METRIC_TYPE.RAW}") + + def main() -> NoReturn: dongle_id = Params().get("DongleId") def get_influxdb_line(measurement: str, value: float | dict[str, float], timestamp: datetime, tags: dict) -> str: @@ -180,4 +231,4 @@ def main() -> NoReturn: if __name__ == "__main__": main() else: - statlog = StatLog() + statlog = StatLogSP(intercept=True)