Files
sunnypilot/selfdrive/statsd.py
Robbe Derks af38179430 Add sample metric type to statsd (#23557)
* add sample stat, and make current power draw work on eon/c2

* fix power monitoring test

* little cleanup

* add type hinting to dict

* save as different values

* duh

* rip out power stat

* small fix

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* cleanup this too

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: d60c44e03c
2022-03-28 17:00:09 +02:00

163 lines
4.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import zmq
import time
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timezone
from typing import NoReturn, Union, List, Dict
from common.params import Params
from cereal.messaging import SubMaster
from selfdrive.swaglog import cloudlog
from selfdrive.hardware import HARDWARE
from common.file_helpers import atomic_write_in_dir
from selfdrive.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty
from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
class METRIC_TYPE:
GAUGE = 'g'
SAMPLE = 'sa'
class StatLog:
def __init__(self):
self.pid = None
def connect(self) -> None:
self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect(STATS_SOCKET)
self.pid = os.getpid()
def _send(self, metric: str) -> None:
if os.getpid() != self.pid:
self.connect()
try:
self.sock.send_string(metric, zmq.NOBLOCK)
except zmq.error.Again:
# drop :/
pass
def gauge(self, name: str, value: float) -> None:
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
def main() -> NoReturn:
dongle_id = Params().get("DongleId", encoding='utf-8')
def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
res = f"{measurement}"
for k, v in tags.items():
res += f",{k}={str(v)}"
res += " "
if isinstance(value, float):
value = {'value': value}
for k, v in value.items():
res += f"{k}={v},"
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
return res
# open statistics socket
ctx = zmq.Context().instance()
sock = ctx.socket(zmq.PULL)
sock.bind(STATS_SOCKET)
# initialize stats directory
Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
# initialize tags
tags = {
'started': False,
'version': get_short_version(),
'branch': get_short_branch(),
'dirty': is_dirty(),
'origin': get_normalized_origin(),
'deviceType': HARDWARE.get_device_type(),
}
# subscribe to deviceState for started state
sm = SubMaster(['deviceState'])
last_flush_time = time.monotonic()
gauges = {}
samples: Dict[str, List[float]] = defaultdict(list)
while True:
started_prev = sm['deviceState'].started
sm.update()
# Update metrics
while True:
try:
metric = sock.recv_string(zmq.NOBLOCK)
try:
metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0]
metric_value = float(metric.split('|')[0].split(':')[1])
if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value
elif metric_type == METRIC_TYPE.SAMPLE:
samples[metric_name].append(metric_value)
else:
cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception:
cloudlog.event("malformed metric", metric=metric)
except zmq.error.Again:
break
# flush when started state changes or after FLUSH_TIME_S
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
result = ""
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
tags['started'] = sm['deviceState'].started
for key, value in gauges.items():
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
for key, values in samples.items():
values.sort()
sample_count = len(values)
sample_sum = sum(values)
stats = {
'count': sample_count,
'min': values[0],
'max': values[-1],
'mean': sample_sum / sample_count,
}
for percentile in [0.05, 0.5, 0.95]:
value = values[int(round(percentile * (sample_count - 1)))]
stats[f"p{int(percentile * 100)}"] = value
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
# clear intermediate data
gauges.clear()
samples.clear()
last_flush_time = time.monotonic()
# check that we aren't filling up the drive
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
if len(result) > 0:
stats_path = os.path.join(STATS_DIR, str(int(current_time.timestamp())))
with atomic_write_in_dir(stats_path) as f:
f.write(result)
else:
cloudlog.error("stats dir full")
if __name__ == "__main__":
main()
else:
statlog = StatLog()