diff --git a/release/files_common b/release/files_common index 4f8a149b80..f2b10004b9 100644 --- a/release/files_common +++ b/release/files_common @@ -77,6 +77,7 @@ selfdrive/tombstoned.py selfdrive/pandad.py selfdrive/updated.py selfdrive/rtshield.py +selfdrive/statsd.py selfdrive/athena/__init__.py selfdrive/athena/athenad.py diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index a0f17397a4..a0c6a0c844 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -11,6 +11,7 @@ import select import socket import threading import time +import tempfile from collections import namedtuple from functools import partial from typing import Any @@ -31,6 +32,7 @@ from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.xattr_cache import getxattr, setxattr from selfdrive.swaglog import cloudlog, SWAGLOG_DIR from selfdrive.version import get_version, get_origin, get_short_branch, get_commit +from selfdrive.statsd import STATS_DIR ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) @@ -48,7 +50,7 @@ dispatcher["echo"] = lambda s: s recv_queue: Any = queue.Queue() send_queue: Any = queue.Queue() upload_queue: Any = queue.Queue() -log_send_queue: Any = queue.Queue() +low_priority_send_queue: Any = queue.Queue() log_recv_queue: Any = queue.Queue() cancelled_uploads: Any = set() UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0)) @@ -86,6 +88,7 @@ def handle_long_poll(ws): threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'), threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), threading.Thread(target=log_handler, args=(end_event,), name='log_handler'), + threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'), ] + [ threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}') for x in range(HANDLER_THREADS) @@ -447,7 +450,7 @@ def log_handler(end_event): "jsonrpc": "2.0", "id": log_entry } - log_send_queue.put_nowait(json.dumps(jsonrpc)) + low_priority_send_queue.put_nowait(json.dumps(jsonrpc)) curr_log = log_entry except OSError: pass # file could be deleted by log rotation @@ -478,6 +481,32 @@ def log_handler(end_event): cloudlog.exception("athena.log_handler.exception") +def stat_handler(end_event): + while not end_event.is_set(): + last_scan = 0 + curr_scan = sec_since_boot() + try: + if curr_scan - last_scan > 10: + stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(STATS_DIR))) + if len(stat_filenames) > 0: + stat_path = os.path.join(STATS_DIR, stat_filenames[0]) + with open(stat_path) as f: + jsonrpc = { + "method": "storeStats", + "params": { + "stats": f.read() + }, + "jsonrpc": "2.0", + "id": stat_filenames[0] + } + low_priority_send_queue.put_nowait(json.dumps(jsonrpc)) + os.remove(stat_path) + last_scan = curr_scan + except Exception: + cloudlog.exception("athena.stat_handler.exception") + time.sleep(0.1) + + def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event): while not (end_event.is_set() or global_end_event.is_set()): try: @@ -550,7 +579,7 @@ def ws_send(ws, end_event): try: data = send_queue.get_nowait() except queue.Empty: - data = log_send_queue.get(timeout=1) + data = low_priority_send_queue.get(timeout=1) for i in range(0, len(data), WS_FRAME_SIZE): frame = data[i:i+WS_FRAME_SIZE] last = i + WS_FRAME_SIZE >= len(data) diff --git a/selfdrive/loggerd/config.py b/selfdrive/loggerd/config.py index b626073ce4..6cd20a68ab 100644 --- a/selfdrive/loggerd/config.py +++ b/selfdrive/loggerd/config.py @@ -13,6 +13,13 @@ else: CAMERA_FPS = 20 SEGMENT_LENGTH = 60 +STATS_DIR_FILE_LIMIT = 10000 +STATS_SOCKET = "ipc:///tmp/stats" +if PC: + STATS_DIR = os.path.join(str(Path.home()), ".comma", "stats") +else: + STATS_DIR = "/data/stats/" +STATS_FLUSH_TIME_S = 60 def get_available_percent(default=None): try: diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 2e3741350e..b5fc01364e 100644 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -36,6 +36,7 @@ procs = [ PythonProcess("tombstoned", "selfdrive.tombstoned", enabled=not PC, persistent=True), PythonProcess("updated", "selfdrive.updated", enabled=not PC, persistent=True), PythonProcess("uploader", "selfdrive.loggerd.uploader", persistent=True), + PythonProcess("statsd", "selfdrive.statsd", persistent=True), # EON only PythonProcess("rtshield", "selfdrive.rtshield", enabled=EON), diff --git a/selfdrive/statsd.py b/selfdrive/statsd.py new file mode 100755 index 0000000000..8fd1f60ca1 --- /dev/null +++ b/selfdrive/statsd.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +import os +import zmq +import time +from pathlib import Path +from datetime import datetime, timezone +from common.params import Params +from cereal.messaging import SubMaster +from selfdrive.swaglog import cloudlog +from selfdrive.hardware import HARDWARE +from common.file_helpers import atomic_write_in_dir +from selfdrive.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty +from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S + + +class METRIC_TYPE: + GAUGE = 'g' + +class StatLog: + def __init__(self): + self.pid = None + + def connect(self): + self.zctx = zmq.Context() + self.sock = self.zctx.socket(zmq.PUSH) + self.sock.setsockopt(zmq.LINGER, 10) + self.sock.connect(STATS_SOCKET) + self.pid = os.getpid() + + def _send(self, metric: str): + if os.getpid() != self.pid: + self.connect() + + try: + self.sock.send_string(metric, zmq.NOBLOCK) + except zmq.error.Again: + # drop :/ + pass + + def gauge(self, name: str, value: float): + self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}") + + +def main(): + def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict): + res = f"{measurement}" + for tag_key in tags.keys(): + res += f",{tag_key}={str(tags[tag_key])}" + res += f" value={value} {int(timestamp.timestamp() * 1e9)}\n" + return res + + # open statistics socket + ctx = zmq.Context().instance() + sock = ctx.socket(zmq.PULL) + sock.bind(STATS_SOCKET) + + # initialize stats directory + Path(STATS_DIR).mkdir(parents=True, exist_ok=True) + + # initialize tags + tags = { + 'dongleId': Params().get("DongleId", encoding='utf-8'), + 'started': False, + 'version': get_short_version(), + 'branch': get_short_branch(), + 'dirty': is_dirty(), + 'origin': get_normalized_origin(), + 'deviceType': HARDWARE.get_device_type(), + } + + # subscribe to deviceState for started state + sm = SubMaster(['deviceState']) + + last_flush_time = time.monotonic() + gauges = {} + while True: + try: + metric = sock.recv_string(zmq.NOBLOCK) + try: + metric_type = metric.split('|')[1] + metric_name = metric.split(':')[0] + metric_value = metric.split('|')[0].split(':')[1] + + if metric_type == METRIC_TYPE.GAUGE: + gauges[metric_name] = metric_value + else: + cloudlog.event("unknown metric type", metric_type=metric_type) + except Exception: + cloudlog.event("malformed metric", metric=metric) + except zmq.error.Again: + time.sleep(1e-3) + + started_prev = sm['deviceState'].started + sm.update(0) + + # flush when started state changes or after FLUSH_TIME_S + if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev): + result = "" + current_time = datetime.utcnow().replace(tzinfo=timezone.utc) + tags['started'] = sm['deviceState'].started + + for gauge_key in gauges.keys(): + result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags) + + # clear intermediate data + gauges = {} + last_flush_time = time.monotonic() + + # check that we aren't filling up the drive + if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT: + if len(result) > 0: + stats_path = os.path.join(STATS_DIR, str(int(current_time.timestamp()))) + with atomic_write_in_dir(stats_path) as f: + f.write(result) + else: + cloudlog.error("stats dir full") + + +if __name__ == "__main__": + main() +else: + statlog = StatLog() diff --git a/selfdrive/thermald/power_monitoring.py b/selfdrive/thermald/power_monitoring.py index 8e24eb3cf1..0dc4c5a1d3 100644 --- a/selfdrive/thermald/power_monitoring.py +++ b/selfdrive/thermald/power_monitoring.py @@ -9,6 +9,7 @@ from common.params import Params, put_nonblocking from common.realtime import sec_since_boot from selfdrive.hardware import HARDWARE from selfdrive.swaglog import cloudlog +from selfdrive.statsd import statlog CAR_VOLTAGE_LOW_PASS_K = 0.091 # LPF gain for 5s tau (dt/tau / (dt/tau + 1)) @@ -56,6 +57,7 @@ class PowerMonitoring: # Low-pass battery voltage self.car_voltage_instant_mV = peripheralState.voltage self.car_voltage_mV = ((peripheralState.voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K))) + statlog.gauge("car_voltage", self.car_voltage_mV / 1e3) # Cap the car battery power and save it in a param every 10-ish seconds self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0) diff --git a/selfdrive/thermald/thermald.py b/selfdrive/thermald/thermald.py index 4ddf37f418..dd106d4ccb 100755 --- a/selfdrive/thermald/thermald.py +++ b/selfdrive/thermald/thermald.py @@ -23,6 +23,7 @@ from selfdrive.loggerd.config import get_available_percent from selfdrive.swaglog import cloudlog from selfdrive.thermald.power_monitoring import PowerMonitoring from selfdrive.version import terms_version, training_version +from selfdrive.statsd import statlog ThermalStatus = log.DeviceState.ThermalStatus NetworkType = log.DeviceState.NetworkType @@ -291,8 +292,12 @@ def thermald_thread() -> NoReturn: msg.deviceState.networkInfo = network_info if nvme_temps is not None: msg.deviceState.nvmeTempC = nvme_temps + for i, temp in enumerate(nvme_temps): + statlog.gauge(f"nvme_temperature{i}", temp) if modem_temps is not None: msg.deviceState.modemTempC = modem_temps + for i, temp in enumerate(modem_temps): + statlog.gauge(f"modem_temperature{i}", temp) msg.deviceState.screenBrightnessPercent = HARDWARE.get_screen_brightness() msg.deviceState.batteryPercent = HARDWARE.get_battery_capacity() @@ -409,6 +414,23 @@ def thermald_thread() -> NoReturn: should_start_prev = should_start startup_conditions_prev = startup_conditions.copy() + # log more stats + statlog.gauge("free_space_percent", msg.deviceState.freeSpacePercent) + statlog.gauge("gpu_usage_percent", msg.deviceState.gpuUsagePercent) + statlog.gauge("memory_usage_percent", msg.deviceState.memoryUsagePercent) + for i, usage in enumerate(msg.deviceState.cpuUsagePercent): + statlog.gauge(f"cpu{i}_usage_percent", usage) + for i, temp in enumerate(msg.deviceState.cpuTempC): + statlog.gauge(f"cpu{i}_temperature", temp) + for i, temp in enumerate(msg.deviceState.gpuTempC): + statlog.gauge(f"gpu{i}_temperature", temp) + statlog.gauge("memory_temperature", msg.deviceState.memoryTempC) + statlog.gauge("ambient_temperature", msg.deviceState.ambientTempC) + for i, temp in enumerate(msg.deviceState.pmicTempC): + statlog.gauge(f"pmic{i}_temperature", temp) + statlog.gauge("fan_speed_percent_desired", msg.deviceState.fanSpeedPercentDesired) + statlog.gauge("screen_brightness_percent", msg.deviceState.screenBrightnessPercent) + # report to server once every 10 minutes if (count % int(600. / DT_TRML)) == 0: if EON and started_ts is None and msg.deviceState.memoryUsagePercent > 40: diff --git a/selfdrive/version.py b/selfdrive/version.py index a0cb61875e..0c1a44f236 100644 --- a/selfdrive/version.py +++ b/selfdrive/version.py @@ -53,12 +53,24 @@ def get_origin(default: Optional[str] = None) -> Optional[str]: return run_cmd_default(["git", "config", "--get", "remote.origin.url"], default=default) +@cache +def get_normalized_origin(default: Optional[str] = None) -> Optional[str]: + return get_origin()\ + .replace("git@", "", 1)\ + .replace(".git", "", 1)\ + .replace("https://", "", 1)\ + .replace(":", "/", 1) + + @cache def get_version() -> str: with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf: version = _versionf.read().split('"')[1] return version +@cache +def get_short_version() -> str: + return get_version().split('-')[0] @cache def is_prebuilt() -> bool: @@ -117,7 +129,9 @@ if __name__ == "__main__": print(f"Dirty: {is_dirty()}") print(f"Version: {get_version()}") + print(f"Short version: {get_short_version()}") print(f"Origin: {get_origin()}") + print(f"Normalized origin: {get_normalized_origin()}") print(f"Branch: {get_branch()}") print(f"Short branch: {get_short_branch()}") print(f"Prebuilt: {is_prebuilt()}")