forked from mawei/dp
1
0
Fork 0
dp/common/realtime.py

95 lines
2.6 KiB
Python
Raw Permalink Normal View History

"""Utilities for reading real time clocks and keeping soft real time constraints."""
import gc
import os
import time
from collections import deque
from openpilot.common.threadname import getthreadname
from openpilot.system.hardware import PC
# time step for each process
DT_CTRL = 0.01 # controlsd
DT_MDL = 0.05 # model
DT_HW = 0.5 # hardwared and manager
DT_DMON = 0.05 # driver monitoring
class Priority:
# CORE 2
# - modeld = 55
# - camerad = 54
CTRL_LOW = 51 # plannerd & radard
# CORE 3
# - pandad = 55
CTRL_HIGH = 53
def set_realtime_priority(level: int) -> None:
if not PC:
os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(level))
def set_core_affinity(cores: list[int]) -> None:
if not PC:
os.sched_setaffinity(0, cores)
def config_realtime_process(cores: int | list[int], priority: int) -> None:
gc.disable()
set_realtime_priority(priority)
c = cores if isinstance(cores, list) else [cores, ]
set_core_affinity(c)
class Ratekeeper:
def __init__(self, rate: float, print_delay_threshold: float | None = 0.0) -> None:
"""Rate in Hz for ratekeeping. print_delay_threshold must be nonnegative."""
self._interval = 1. / rate
self._next_frame_time = time.monotonic() + self._interval
self._print_delay_threshold = print_delay_threshold
self._frame = 0
self._remaining = 0.0
self._thread_name = getthreadname()
self._dts = deque([self._interval], maxlen=100)
self._last_monitor_time = time.monotonic()
@property
def frame(self) -> int:
return self._frame
@property
def remaining(self) -> float:
return self._remaining
@property
def lagging(self) -> bool:
avg_dt = sum(self._dts) / len(self._dts)
expected_dt = self._interval * (1 / 0.9)
return avg_dt > expected_dt
# Maintain loop rate by calling this at the end of each loop
def keep_time(self) -> bool:
lagged = self.monitor_time()
if self._remaining > 0:
time.sleep(self._remaining)
return lagged
# Monitors the cumulative lag, but does not enforce a rate
def monitor_time(self) -> bool:
prev = self._last_monitor_time
self._last_monitor_time = time.monotonic()
self._dts.append(self._last_monitor_time - prev)
lagged = False
remaining = self._next_frame_time - time.monotonic()
self._next_frame_time += self._interval
if self._print_delay_threshold is not None and remaining < -self._print_delay_threshold:
print(f"{self._thread_name} lagging by {-remaining * 1000:.2f} ms")
lagged = True
self._frame += 1
self._remaining = remaining
return lagged