182 lines
5.3 KiB
Python
Executable File
182 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import zmq
|
|
import time
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from typing import NoReturn
|
|
|
|
from openpilot.common.params import Params
|
|
from cereal.messaging import SubMaster
|
|
from openpilot.system.hardware.hw import Paths
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.system.hardware import HARDWARE
|
|
from openpilot.common.file_helpers import atomic_write_in_dir
|
|
from openpilot.system.version import get_build_metadata
|
|
from openpilot.system.loggerd.config import STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
|
|
|
|
|
|
class METRIC_TYPE:
|
|
GAUGE = 'g'
|
|
SAMPLE = 'sa'
|
|
|
|
class StatLog:
|
|
def __init__(self):
|
|
self.pid = None
|
|
self.zctx = None
|
|
self.sock = None
|
|
|
|
def connect(self) -> None:
|
|
self.zctx = zmq.Context()
|
|
self.sock = self.zctx.socket(zmq.PUSH)
|
|
self.sock.setsockopt(zmq.LINGER, 10)
|
|
self.sock.connect(STATS_SOCKET)
|
|
self.pid = os.getpid()
|
|
|
|
def __del__(self):
|
|
if self.sock is not None:
|
|
self.sock.close()
|
|
if self.zctx is not None:
|
|
self.zctx.term()
|
|
|
|
def _send(self, metric: str) -> None:
|
|
if os.getpid() != self.pid:
|
|
self.connect()
|
|
|
|
try:
|
|
self.sock.send_string(metric, zmq.NOBLOCK)
|
|
except zmq.error.Again:
|
|
# drop :/
|
|
pass
|
|
|
|
def gauge(self, name: str, value: float) -> None:
|
|
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
|
|
|
|
# Samples will be recorded in a buffer and at aggregation time,
|
|
# statistical properties will be logged (mean, count, percentiles, ...)
|
|
def sample(self, name: str, value: float):
|
|
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
|
|
|
|
|
|
def main() -> NoReturn:
|
|
dongle_id = Params().get("DongleId", encoding='utf-8')
|
|
def get_influxdb_line(measurement: str, value: float | dict[str, float], timestamp: datetime, tags: dict) -> str:
|
|
res = f"{measurement}"
|
|
for k, v in tags.items():
|
|
res += f",{k}={str(v)}"
|
|
res += " "
|
|
|
|
if isinstance(value, float):
|
|
value = {'value': value}
|
|
|
|
for k, v in value.items():
|
|
res += f"{k}={v},"
|
|
|
|
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
|
|
return res
|
|
|
|
# open statistics socket
|
|
ctx = zmq.Context.instance()
|
|
sock = ctx.socket(zmq.PULL)
|
|
sock.bind(STATS_SOCKET)
|
|
|
|
STATS_DIR = Paths.stats_root()
|
|
|
|
# initialize stats directory
|
|
Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
build_metadata = get_build_metadata()
|
|
|
|
# initialize tags
|
|
tags = {
|
|
'started': False,
|
|
'version': build_metadata.openpilot.version,
|
|
'branch': build_metadata.channel,
|
|
'dirty': build_metadata.openpilot.is_dirty,
|
|
'origin': build_metadata.openpilot.git_normalized_origin,
|
|
'deviceType': HARDWARE.get_device_type(),
|
|
}
|
|
|
|
# subscribe to deviceState for started state
|
|
sm = SubMaster(['deviceState'])
|
|
|
|
idx = 0
|
|
last_flush_time = time.monotonic()
|
|
gauges = {}
|
|
samples: dict[str, list[float]] = defaultdict(list)
|
|
try:
|
|
while True:
|
|
started_prev = sm['deviceState'].started
|
|
sm.update()
|
|
|
|
# Update metrics
|
|
while True:
|
|
try:
|
|
metric = sock.recv_string(zmq.NOBLOCK)
|
|
try:
|
|
metric_type = metric.split('|')[1]
|
|
metric_name = metric.split(':')[0]
|
|
metric_value = float(metric.split('|')[0].split(':')[1])
|
|
|
|
if metric_type == METRIC_TYPE.GAUGE:
|
|
gauges[metric_name] = metric_value
|
|
elif metric_type == METRIC_TYPE.SAMPLE:
|
|
samples[metric_name].append(metric_value)
|
|
else:
|
|
cloudlog.event("unknown metric type", metric_type=metric_type)
|
|
except Exception:
|
|
cloudlog.event("malformed metric", metric=metric)
|
|
except zmq.error.Again:
|
|
break
|
|
|
|
# flush when started state changes or after FLUSH_TIME_S
|
|
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
|
|
result = ""
|
|
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
tags['started'] = sm['deviceState'].started
|
|
|
|
for key, value in gauges.items():
|
|
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
|
|
|
|
for key, values in samples.items():
|
|
values.sort()
|
|
sample_count = len(values)
|
|
sample_sum = sum(values)
|
|
|
|
stats = {
|
|
'count': sample_count,
|
|
'min': values[0],
|
|
'max': values[-1],
|
|
'mean': sample_sum / sample_count,
|
|
}
|
|
for percentile in [0.05, 0.5, 0.95]:
|
|
value = values[int(round(percentile * (sample_count - 1)))]
|
|
stats[f"p{int(percentile * 100)}"] = value
|
|
|
|
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
|
|
|
|
# clear intermediate data
|
|
gauges.clear()
|
|
samples.clear()
|
|
last_flush_time = time.monotonic()
|
|
|
|
# check that we aren't filling up the drive
|
|
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
|
|
if len(result) > 0:
|
|
stats_path = os.path.join(STATS_DIR, f"{current_time.timestamp():.0f}_{idx}")
|
|
with atomic_write_in_dir(stats_path) as f:
|
|
f.write(result)
|
|
idx += 1
|
|
else:
|
|
cloudlog.error("stats dir full")
|
|
finally:
|
|
sock.close()
|
|
ctx.term()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
else:
|
|
statlog = StatLog()
|