params: safe and efficient async writing parameters (#25912)

* Safe and efficient asynchronous writing parameters

* call putNonBlocking in locationd

* remove space

* ->AsyncWriter

* remove semicolon

* use member function

* asyc write multiple times

* add test case for AsyncWriter

* merge master

* add missing include

* public

* cleanup

* create once

* cleanup

* update that

* explicit waiting

* improve test case

---------

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
This commit is contained in:
Dean Lee 2023-09-06 23:50:28 +08:00 committed by GitHub
parent 0afcf12368
commit 0d797f4e8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 123 additions and 39 deletions

View File

@ -29,7 +29,7 @@ Export('_common', '_gpucommon')
if GetOption('extras'):
env.Program('tests/test_common',
['tests/test_runner.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'],
['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'],
LIBS=[_common, 'json11', 'zmq', 'pthread'])
# Cython bindings

View File

@ -7,6 +7,7 @@
#include <csignal>
#include <unordered_map>
#include "common/queue.h"
#include "common/swaglog.h"
#include "common/util.h"
#include "system/hardware/hw.h"
@ -327,3 +328,33 @@ void Params::clearAll(ParamKeyType key_type) {
fsync_dir(getParamPath());
}
void Params::putNonBlocking(const std::string &key, const std::string &val) {
static AsyncWriter async_writer;
async_writer.queue({params_path, key, val});
}
// AsyncWriter
AsyncWriter::~AsyncWriter() {
if (future.valid()) {
future.wait();
}
}
void AsyncWriter::queue(const std::tuple<std::string, std::string, std::string> &dat) {
q.push(dat);
// start thread on demand
if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) {
future = std::async(std::launch::async, &AsyncWriter::write, this);
}
}
void AsyncWriter::write() {
std::tuple<std::string, std::string, std::string> dat;
while (q.try_pop(dat, 0)) {
auto &[path, key, value] = dat;
Params(path).put(key, value);
}
}

View File

@ -1,9 +1,13 @@
#pragma once
#include <future>
#include <map>
#include <string>
#include <tuple>
#include <vector>
#include "common/queue.h"
enum ParamKeyType {
PERSISTENT = 0x02,
CLEAR_ON_MANAGER_START = 0x04,
@ -42,8 +46,25 @@ public:
inline int putBool(const std::string &key, bool val) {
return put(key.c_str(), val ? "1" : "0", 1);
}
void putNonBlocking(const std::string &key, const std::string &val);
inline void putBoolNonBlocking(const std::string &key, bool val) {
putNonBlocking(key, val ? "1" : "0");
}
private:
std::string params_path;
std::string prefix;
};
class AsyncWriter {
public:
AsyncWriter() {}
~AsyncWriter();
void queue(const std::tuple<std::string, std::string, std::string> &dat);
private:
void write();
std::future<void> future;
SafeQueue<std::tuple<std::string, std::string, std::string>> q;
};

View File

@ -1,10 +1,7 @@
from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName, put_nonblocking, \
put_bool_nonblocking
from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName
assert Params
assert ParamKeyType
assert UnknownKeyName
assert put_nonblocking
assert put_bool_nonblocking
if __name__ == "__main__":
import sys

View File

@ -3,7 +3,6 @@
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.vector cimport vector
import threading
cdef extern from "common/params.h":
cpdef enum ParamKeyType:
@ -19,6 +18,8 @@ cdef extern from "common/params.h":
bool getBool(string, bool) nogil
int remove(string) nogil
int put(string, string) nogil
void putNonBlocking(string, string) nogil
void putBoolNonBlocking(string, bool) nogil
int putBool(string, bool) nogil
bool checkKey(string) nogil
string getParamPath(string) nogil
@ -79,7 +80,7 @@ cdef class Params:
"""
Warning: This function blocks until the param is written to disk!
In very rare cases this can take over a second, and your code will hang.
Use the put_nonblocking helper function in time sensitive code, but
Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but
in general try to avoid writing params as much as possible.
"""
cdef string k = self.check_key(key)
@ -92,6 +93,17 @@ cdef class Params:
with nogil:
self.p.putBool(k, val)
def put_nonblocking(self, key, dat):
cdef string k = self.check_key(key)
cdef string dat_bytes = ensure_bytes(dat)
with nogil:
self.p.putNonBlocking(k, dat_bytes)
def put_bool_nonblocking(self, key, bool val):
cdef string k = self.check_key(key)
with nogil:
self.p.putBoolNonBlocking(k, val)
def remove(self, key):
cdef string k = self.check_key(key)
with nogil:
@ -103,9 +115,3 @@ cdef class Params:
def all_keys(self):
return self.p.allKeys()
def put_nonblocking(key, val, d=""):
threading.Thread(target=lambda: Params(d).put(key, val)).start()
def put_bool_nonblocking(key, bool val, d=""):
threading.Thread(target=lambda: Params(d).put_bool(key, val)).start()

View File

@ -0,0 +1,27 @@
#include "catch2/catch.hpp"
#define private public
#include "common/params.h"
#include "common/util.h"
TEST_CASE("Params/asyncWriter") {
char tmp_path[] = "/tmp/asyncWriter_XXXXXX";
const std::string param_path = mkdtemp(tmp_path);
Params params(param_path);
auto param_names = {"CarParams", "IsMetric"};
{
AsyncWriter async_writer;
for (const auto &name : param_names) {
async_writer.queue({param_path, name, "1"});
// param is empty
REQUIRE(params.get(name).empty());
}
// check if thread is running
REQUIRE(async_writer.future.valid());
REQUIRE(async_writer.future.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout);
}
// check results
for (const auto &name : param_names) {
REQUIRE(params.get(name) == "1");
}
}

View File

@ -4,7 +4,7 @@ import time
import uuid
import unittest
from openpilot.common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking, put_bool_nonblocking
from openpilot.common.params import Params, ParamKeyType, UnknownKeyName
class TestParams(unittest.TestCase):
def setUp(self):
@ -86,7 +86,7 @@ class TestParams(unittest.TestCase):
q = Params()
def _delayed_writer():
time.sleep(0.1)
put_nonblocking("CarParams", "test")
Params().put_nonblocking("CarParams", "test")
threading.Thread(target=_delayed_writer).start()
assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"test"
@ -95,7 +95,7 @@ class TestParams(unittest.TestCase):
q = Params()
def _delayed_writer():
time.sleep(0.1)
put_bool_nonblocking("CarParams", True)
Params().put_bool_nonblocking("CarParams", True)
threading.Thread(target=_delayed_writer).start()
assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"1"

View File

@ -8,7 +8,7 @@ from cereal import car, log
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.profiler import Profiler
from openpilot.common.params import Params, put_nonblocking, put_bool_nonblocking
from openpilot.common.params import Params
import cereal.messaging as messaging
from cereal.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
@ -129,8 +129,8 @@ class Controls:
# Write CarParams for radard
cp_bytes = self.CP.to_bytes()
self.params.put("CarParams", cp_bytes)
put_nonblocking("CarParamsCache", cp_bytes)
put_nonblocking("CarParamsPersistent", cp_bytes)
self.params.put_nonblocking("CarParamsCache", cp_bytes)
self.params.put_nonblocking("CarParamsPersistent", cp_bytes)
# cleanup old params
if not self.CP.experimentalLongitudinalAvailable or is_release_branch():
@ -449,7 +449,7 @@ class Controls:
self.initialized = True
self.set_initial_state()
put_bool_nonblocking("ControlsReady", True)
self.params.put_bool_nonblocking("ControlsReady", True)
# Check for CAN timeout
if not can_strs:

View File

@ -15,7 +15,7 @@ from typing import List, NoReturn, Optional
from cereal import log
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params, put_nonblocking
from openpilot.common.params import Params
from openpilot.common.realtime import set_realtime_priority
from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot
from openpilot.system.swaglog import cloudlog
@ -63,8 +63,8 @@ class Calibrator:
self.not_car = False
# Read saved calibration
params = Params()
calibration_params = params.get("CalibrationParams")
self.params = Params()
calibration_params = self.params.get("CalibrationParams")
rpy_init = RPY_INIT
wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
height = HEIGHT_INIT
@ -162,7 +162,7 @@ class Calibrator:
write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5)
if self.param_put and write_this_cycle:
put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
self.params.put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
def handle_v_ego(self, v_ego: float) -> None:
self.v_ego = v_ego

View File

@ -11,7 +11,7 @@ from typing import List, Optional, Dict, Any
import numpy as np
from cereal import log, messaging
from openpilot.common.params import Params, put_nonblocking
from openpilot.common.params import Params
from laika import AstroDog
from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.downloader import DownloadFailed
@ -82,6 +82,8 @@ class Laikad:
valid_ephem_types: Valid ephemeris types to be used by AstroDog
save_ephemeris: If true saves and loads nav and orbit ephemeris to cache.
"""
self.params = Params()
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types,
clear_old_ephemeris=True, cache_dir=DOWNLOADS_CACHE_FOLDER)
self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom)
@ -113,7 +115,7 @@ class Laikad:
if not self.save_ephemeris:
return
cache_bytes = Params().get(EPHEMERIS_CACHE)
cache_bytes = self.params.get(EPHEMERIS_CACHE)
if not cache_bytes:
return
@ -141,7 +143,7 @@ class Laikad:
if len(valid_navs) > 0:
ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'],
gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G'])
put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes())
self.params.put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes())
cloudlog.debug("Cache saved")
self.last_cached_t = self.last_report_time

View File

@ -669,9 +669,10 @@ void Localizer::configure_gnss_source(const LocalizerGnssSource &source) {
}
int Localizer::locationd_thread() {
Params params;
LocalizerGnssSource source;
const char* gps_location_socket;
if (Params().getBool("UbloxAvailable")) {
if (params.getBool("UbloxAvailable")) {
source = LocalizerGnssSource::UBLOX;
gps_location_socket = "gpsLocationExternal";
} else {
@ -728,10 +729,7 @@ int Localizer::locationd_thread() {
VectorXd posGeo = this->get_position_geodetic();
std::string lastGPSPosJSON = util::string_format(
"{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2));
std::thread([] (const std::string gpsjson) {
Params().put("LastGPSPosition", gpsjson);
}, lastGPSPosJSON).detach();
params.putNonBlocking("LastGPSPosition", lastGPSPosJSON);
}
cnt++;
}

View File

@ -7,7 +7,7 @@ import numpy as np
import cereal.messaging as messaging
from cereal import car
from cereal import log
from openpilot.common.params import Params, put_nonblocking
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, DT_MDL
from openpilot.common.numpy_fast import clip
from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States
@ -247,7 +247,7 @@ def main(sm=None, pm=None):
'stiffnessFactor': liveParameters.stiffnessFactor,
'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg,
}
put_nonblocking("LiveParameters", json.dumps(params))
params_reader.put_nonblocking("LiveParameters", json.dumps(params))
pm.send('liveParameters', msg)

View File

@ -4,7 +4,7 @@ import gc
import cereal.messaging as messaging
from cereal import car
from cereal import log
from openpilot.common.params import Params, put_bool_nonblocking
from openpilot.common.params import Params
from openpilot.common.realtime import set_realtime_priority
from openpilot.selfdrive.controls.lib.events import Events
from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus
@ -14,13 +14,15 @@ def dmonitoringd_thread(sm=None, pm=None):
gc.disable()
set_realtime_priority(2)
params = Params()
if pm is None:
pm = messaging.PubMaster(['driverMonitoringState'])
if sm is None:
sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2'])
driver_status = DriverStatus(rhd_saved=Params().get_bool("IsRhdDetected"))
driver_status = DriverStatus(rhd_saved=params.get_bool("IsRhdDetected"))
sm['liveCalibration'].calStatus = log.LiveCalibrationData.Status.invalid
sm['liveCalibration'].rpyCalib = [0, 0, 0]
@ -87,7 +89,7 @@ def dmonitoringd_thread(sm=None, pm=None):
if (sm['driverStateV2'].frameId % 6000 == 0 and
driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and
driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)):
put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right)
params.put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right)
def main(sm=None, pm=None):
dmonitoringd_thread(sm, pm)

View File

@ -2,7 +2,7 @@ import time
import threading
from typing import Optional
from openpilot.common.params import Params, put_nonblocking
from openpilot.common.params import Params
from openpilot.system.hardware import HARDWARE
from openpilot.system.swaglog import cloudlog
from openpilot.selfdrive.statsd import statlog
@ -60,7 +60,7 @@ class PowerMonitoring:
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
if now - self.last_save_time >= 10:
put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
self.last_save_time = now
# First measurement, set integration time