mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-19 07:44:00 +08:00
Add sample metric type to statsd (#23557)
* add sample stat, and make current power draw work on eon/c2 * fix power monitoring test * little cleanup * add type hinting to dict * save as different values * duh * rip out power stat * small fix Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * cleanup this too Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
@@ -3,8 +3,9 @@ import os
|
||||
import zmq
|
||||
import time
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import NoReturn
|
||||
from typing import NoReturn, Union, List, Dict
|
||||
|
||||
from common.params import Params
|
||||
from cereal.messaging import SubMaster
|
||||
@@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK
|
||||
|
||||
class METRIC_TYPE:
|
||||
GAUGE = 'g'
|
||||
SAMPLE = 'sa'
|
||||
|
||||
class StatLog:
|
||||
def __init__(self):
|
||||
@@ -42,14 +44,27 @@ class StatLog:
|
||||
def gauge(self, name: str, value: float) -> None:
|
||||
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
|
||||
|
||||
# Samples will be recorded in a buffer and at aggregation time,
|
||||
# statistical properties will be logged (mean, count, percentiles, ...)
|
||||
def sample(self, name: str, value: float):
|
||||
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
|
||||
|
||||
|
||||
def main() -> NoReturn:
|
||||
dongle_id = Params().get("DongleId", encoding='utf-8')
|
||||
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict) -> str:
|
||||
def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
|
||||
res = f"{measurement}"
|
||||
for k, v in tags.items():
|
||||
res += f",{k}={str(v)}"
|
||||
res += f" value={value},dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
|
||||
res += " "
|
||||
|
||||
if isinstance(value, float):
|
||||
value = {'value': value}
|
||||
|
||||
for k, v in value.items():
|
||||
res += f"{k}={v},"
|
||||
|
||||
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
|
||||
return res
|
||||
|
||||
# open statistics socket
|
||||
@@ -75,6 +90,7 @@ def main() -> NoReturn:
|
||||
|
||||
last_flush_time = time.monotonic()
|
||||
gauges = {}
|
||||
samples: Dict[str, List[float]] = defaultdict(list)
|
||||
while True:
|
||||
started_prev = sm['deviceState'].started
|
||||
sm.update()
|
||||
@@ -86,10 +102,12 @@ def main() -> NoReturn:
|
||||
try:
|
||||
metric_type = metric.split('|')[1]
|
||||
metric_name = metric.split(':')[0]
|
||||
metric_value = metric.split('|')[0].split(':')[1]
|
||||
metric_value = float(metric.split('|')[0].split(':')[1])
|
||||
|
||||
if metric_type == METRIC_TYPE.GAUGE:
|
||||
gauges[metric_name] = metric_value
|
||||
elif metric_type == METRIC_TYPE.SAMPLE:
|
||||
samples[metric_name].append(metric_value)
|
||||
else:
|
||||
cloudlog.event("unknown metric type", metric_type=metric_type)
|
||||
except Exception:
|
||||
@@ -103,11 +121,29 @@ def main() -> NoReturn:
|
||||
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
tags['started'] = sm['deviceState'].started
|
||||
|
||||
for gauge_key in gauges:
|
||||
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags)
|
||||
for key, value in gauges.items():
|
||||
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
|
||||
|
||||
for key, values in samples.items():
|
||||
values.sort()
|
||||
sample_count = len(values)
|
||||
sample_sum = sum(values)
|
||||
|
||||
stats = {
|
||||
'count': sample_count,
|
||||
'min': values[0],
|
||||
'max': values[-1],
|
||||
'mean': sample_sum / sample_count,
|
||||
}
|
||||
for percentile in [0.05, 0.5, 0.95]:
|
||||
value = values[int(round(percentile * (sample_count - 1)))]
|
||||
stats[f"p{int(percentile * 100)}"] = value
|
||||
|
||||
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
|
||||
|
||||
# clear intermediate data
|
||||
gauges = {}
|
||||
gauges.clear()
|
||||
samples.clear()
|
||||
last_flush_time = time.monotonic()
|
||||
|
||||
# check that we aren't filling up the drive
|
||||
|
||||
Reference in New Issue
Block a user