* device side of statsd

* need to start it

* enable in manager

* add sleep

* cleanup

* remove aggregates for now and standardize on industry terms

* manager needs main

* need to have a try/except

* atomic_write_on_fs_tmp does not work

* cleaner

* use dump

Co-authored-by: Willem Melching <willem.melching@gmail.com>

* one file at a time

* limit amount of files

* move to influx line protocol and cleanup

* needs to be a list

* fix timezone bug

* actually rate limit

* add to release

* normalized origin

* also log deviceType

* more stats

Co-authored-by: Willem Melching <willem.melching@gmail.com>
This commit is contained in:
Robbe Derks 2022-01-10 15:21:48 +01:00 committed by GitHub
parent 8eec818ae6
commit 1b49ce6ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 3 deletions

View File

@ -77,6 +77,7 @@ selfdrive/tombstoned.py
selfdrive/pandad.py
selfdrive/updated.py
selfdrive/rtshield.py
selfdrive/statsd.py
selfdrive/athena/__init__.py
selfdrive/athena/athenad.py

View File

@ -11,6 +11,7 @@ import select
import socket
import threading
import time
import tempfile
from collections import namedtuple
from functools import partial
from typing import Any
@ -31,6 +32,7 @@ from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
from selfdrive.version import get_version, get_origin, get_short_branch, get_commit
from selfdrive.statsd import STATS_DIR
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
@ -48,7 +50,7 @@ dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
upload_queue: Any = queue.Queue()
log_send_queue: Any = queue.Queue()
low_priority_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0))
@ -86,6 +88,7 @@ def handle_long_poll(ws):
threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'),
threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=log_handler, args=(end_event,), name='log_handler'),
threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS)
@ -447,7 +450,7 @@ def log_handler(end_event):
"jsonrpc": "2.0",
"id": log_entry
}
log_send_queue.put_nowait(json.dumps(jsonrpc))
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
curr_log = log_entry
except OSError:
pass # file could be deleted by log rotation
@ -478,6 +481,32 @@ def log_handler(end_event):
cloudlog.exception("athena.log_handler.exception")
def stat_handler(end_event):
while not end_event.is_set():
last_scan = 0
curr_scan = sec_since_boot()
try:
if curr_scan - last_scan > 10:
stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(STATS_DIR)))
if len(stat_filenames) > 0:
stat_path = os.path.join(STATS_DIR, stat_filenames[0])
with open(stat_path) as f:
jsonrpc = {
"method": "storeStats",
"params": {
"stats": f.read()
},
"jsonrpc": "2.0",
"id": stat_filenames[0]
}
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
os.remove(stat_path)
last_scan = curr_scan
except Exception:
cloudlog.exception("athena.stat_handler.exception")
time.sleep(0.1)
def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
while not (end_event.is_set() or global_end_event.is_set()):
try:
@ -550,7 +579,7 @@ def ws_send(ws, end_event):
try:
data = send_queue.get_nowait()
except queue.Empty:
data = log_send_queue.get(timeout=1)
data = low_priority_send_queue.get(timeout=1)
for i in range(0, len(data), WS_FRAME_SIZE):
frame = data[i:i+WS_FRAME_SIZE]
last = i + WS_FRAME_SIZE >= len(data)

View File

@ -13,6 +13,13 @@ else:
CAMERA_FPS = 20
SEGMENT_LENGTH = 60
STATS_DIR_FILE_LIMIT = 10000
STATS_SOCKET = "ipc:///tmp/stats"
if PC:
STATS_DIR = os.path.join(str(Path.home()), ".comma", "stats")
else:
STATS_DIR = "/data/stats/"
STATS_FLUSH_TIME_S = 60
def get_available_percent(default=None):
try:

View File

@ -36,6 +36,7 @@ procs = [
PythonProcess("tombstoned", "selfdrive.tombstoned", enabled=not PC, persistent=True),
PythonProcess("updated", "selfdrive.updated", enabled=not PC, persistent=True),
PythonProcess("uploader", "selfdrive.loggerd.uploader", persistent=True),
PythonProcess("statsd", "selfdrive.statsd", persistent=True),
# EON only
PythonProcess("rtshield", "selfdrive.rtshield", enabled=EON),

122
selfdrive/statsd.py Executable file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
import os
import zmq
import time
from pathlib import Path
from datetime import datetime, timezone
from common.params import Params
from cereal.messaging import SubMaster
from selfdrive.swaglog import cloudlog
from selfdrive.hardware import HARDWARE
from common.file_helpers import atomic_write_in_dir
from selfdrive.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty
from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
class METRIC_TYPE:
GAUGE = 'g'
class StatLog:
def __init__(self):
self.pid = None
def connect(self):
self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect(STATS_SOCKET)
self.pid = os.getpid()
def _send(self, metric: str):
if os.getpid() != self.pid:
self.connect()
try:
self.sock.send_string(metric, zmq.NOBLOCK)
except zmq.error.Again:
# drop :/
pass
def gauge(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
def main():
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict):
res = f"{measurement}"
for tag_key in tags.keys():
res += f",{tag_key}={str(tags[tag_key])}"
res += f" value={value} {int(timestamp.timestamp() * 1e9)}\n"
return res
# open statistics socket
ctx = zmq.Context().instance()
sock = ctx.socket(zmq.PULL)
sock.bind(STATS_SOCKET)
# initialize stats directory
Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
# initialize tags
tags = {
'dongleId': Params().get("DongleId", encoding='utf-8'),
'started': False,
'version': get_short_version(),
'branch': get_short_branch(),
'dirty': is_dirty(),
'origin': get_normalized_origin(),
'deviceType': HARDWARE.get_device_type(),
}
# subscribe to deviceState for started state
sm = SubMaster(['deviceState'])
last_flush_time = time.monotonic()
gauges = {}
while True:
try:
metric = sock.recv_string(zmq.NOBLOCK)
try:
metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0]
metric_value = metric.split('|')[0].split(':')[1]
if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value
else:
cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception:
cloudlog.event("malformed metric", metric=metric)
except zmq.error.Again:
time.sleep(1e-3)
started_prev = sm['deviceState'].started
sm.update(0)
# flush when started state changes or after FLUSH_TIME_S
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
result = ""
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
tags['started'] = sm['deviceState'].started
for gauge_key in gauges.keys():
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags)
# clear intermediate data
gauges = {}
last_flush_time = time.monotonic()
# check that we aren't filling up the drive
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
if len(result) > 0:
stats_path = os.path.join(STATS_DIR, str(int(current_time.timestamp())))
with atomic_write_in_dir(stats_path) as f:
f.write(result)
else:
cloudlog.error("stats dir full")
if __name__ == "__main__":
main()
else:
statlog = StatLog()

View File

@ -9,6 +9,7 @@ from common.params import Params, put_nonblocking
from common.realtime import sec_since_boot
from selfdrive.hardware import HARDWARE
from selfdrive.swaglog import cloudlog
from selfdrive.statsd import statlog
CAR_VOLTAGE_LOW_PASS_K = 0.091 # LPF gain for 5s tau (dt/tau / (dt/tau + 1))
@ -56,6 +57,7 @@ class PowerMonitoring:
# Low-pass battery voltage
self.car_voltage_instant_mV = peripheralState.voltage
self.car_voltage_mV = ((peripheralState.voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K)))
statlog.gauge("car_voltage", self.car_voltage_mV / 1e3)
# Cap the car battery power and save it in a param every 10-ish seconds
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)

View File

@ -23,6 +23,7 @@ from selfdrive.loggerd.config import get_available_percent
from selfdrive.swaglog import cloudlog
from selfdrive.thermald.power_monitoring import PowerMonitoring
from selfdrive.version import terms_version, training_version
from selfdrive.statsd import statlog
ThermalStatus = log.DeviceState.ThermalStatus
NetworkType = log.DeviceState.NetworkType
@ -291,8 +292,12 @@ def thermald_thread() -> NoReturn:
msg.deviceState.networkInfo = network_info
if nvme_temps is not None:
msg.deviceState.nvmeTempC = nvme_temps
for i, temp in enumerate(nvme_temps):
statlog.gauge(f"nvme_temperature{i}", temp)
if modem_temps is not None:
msg.deviceState.modemTempC = modem_temps
for i, temp in enumerate(modem_temps):
statlog.gauge(f"modem_temperature{i}", temp)
msg.deviceState.screenBrightnessPercent = HARDWARE.get_screen_brightness()
msg.deviceState.batteryPercent = HARDWARE.get_battery_capacity()
@ -409,6 +414,23 @@ def thermald_thread() -> NoReturn:
should_start_prev = should_start
startup_conditions_prev = startup_conditions.copy()
# log more stats
statlog.gauge("free_space_percent", msg.deviceState.freeSpacePercent)
statlog.gauge("gpu_usage_percent", msg.deviceState.gpuUsagePercent)
statlog.gauge("memory_usage_percent", msg.deviceState.memoryUsagePercent)
for i, usage in enumerate(msg.deviceState.cpuUsagePercent):
statlog.gauge(f"cpu{i}_usage_percent", usage)
for i, temp in enumerate(msg.deviceState.cpuTempC):
statlog.gauge(f"cpu{i}_temperature", temp)
for i, temp in enumerate(msg.deviceState.gpuTempC):
statlog.gauge(f"gpu{i}_temperature", temp)
statlog.gauge("memory_temperature", msg.deviceState.memoryTempC)
statlog.gauge("ambient_temperature", msg.deviceState.ambientTempC)
for i, temp in enumerate(msg.deviceState.pmicTempC):
statlog.gauge(f"pmic{i}_temperature", temp)
statlog.gauge("fan_speed_percent_desired", msg.deviceState.fanSpeedPercentDesired)
statlog.gauge("screen_brightness_percent", msg.deviceState.screenBrightnessPercent)
# report to server once every 10 minutes
if (count % int(600. / DT_TRML)) == 0:
if EON and started_ts is None and msg.deviceState.memoryUsagePercent > 40:

View File

@ -53,12 +53,24 @@ def get_origin(default: Optional[str] = None) -> Optional[str]:
return run_cmd_default(["git", "config", "--get", "remote.origin.url"], default=default)
@cache
def get_normalized_origin(default: Optional[str] = None) -> Optional[str]:
return get_origin()\
.replace("git@", "", 1)\
.replace(".git", "", 1)\
.replace("https://", "", 1)\
.replace(":", "/", 1)
@cache
def get_version() -> str:
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf:
version = _versionf.read().split('"')[1]
return version
@cache
def get_short_version() -> str:
return get_version().split('-')[0]
@cache
def is_prebuilt() -> bool:
@ -117,7 +129,9 @@ if __name__ == "__main__":
print(f"Dirty: {is_dirty()}")
print(f"Version: {get_version()}")
print(f"Short version: {get_short_version()}")
print(f"Origin: {get_origin()}")
print(f"Normalized origin: {get_normalized_origin()}")
print(f"Branch: {get_branch()}")
print(f"Short branch: {get_short_branch()}")
print(f"Prebuilt: {is_prebuilt()}")