2024-05-17 11:01:44 -07:00
|
|
|
import pytest
|
2025-07-22 19:30:47 -07:00
|
|
|
import datetime
|
2023-05-05 02:21:41 +08:00
|
|
|
import os
|
2020-01-17 10:28:44 -08:00
|
|
|
import threading
|
|
|
|
|
import time
|
2023-05-05 02:21:41 +08:00
|
|
|
import uuid
|
2020-01-17 10:28:44 -08:00
|
|
|
|
2025-07-22 19:30:47 -07:00
|
|
|
from openpilot.common.params import Params, ParamKeyFlag, UnknownKeyName
|
2020-01-17 10:28:44 -08:00
|
|
|
|
2024-05-17 11:01:44 -07:00
|
|
|
class TestParams:
|
|
|
|
|
def setup_method(self):
|
2023-09-05 18:52:40 -07:00
|
|
|
self.params = Params()
|
2020-01-17 10:28:44 -08:00
|
|
|
|
|
|
|
|
def test_params_put_and_get(self):
|
|
|
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
2025-07-22 21:58:06 -07:00
|
|
|
assert self.params.get("DongleId") == "cb38263377b873ee"
|
2020-01-17 10:28:44 -08:00
|
|
|
|
|
|
|
|
def test_params_non_ascii(self):
|
|
|
|
|
st = b"\xe1\x90\xff"
|
|
|
|
|
self.params.put("CarParams", st)
|
|
|
|
|
assert self.params.get("CarParams") == st
|
|
|
|
|
|
|
|
|
|
def test_params_get_cleared_manager_start(self):
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("CarParams", b"test")
|
2020-01-17 10:28:44 -08:00
|
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
|
|
|
assert self.params.get("CarParams") == b"test"
|
2023-05-05 02:21:41 +08:00
|
|
|
|
|
|
|
|
undefined_param = self.params.get_param_path(uuid.uuid4().hex)
|
|
|
|
|
with open(undefined_param, "w") as f:
|
|
|
|
|
f.write("test")
|
|
|
|
|
assert os.path.isfile(undefined_param)
|
|
|
|
|
|
2025-07-22 19:30:47 -07:00
|
|
|
self.params.clear_all(ParamKeyFlag.CLEAR_ON_MANAGER_START)
|
2020-01-17 10:28:44 -08:00
|
|
|
assert self.params.get("CarParams") is None
|
|
|
|
|
assert self.params.get("DongleId") is not None
|
2023-05-05 02:21:41 +08:00
|
|
|
assert not os.path.isfile(undefined_param)
|
2020-01-17 10:28:44 -08:00
|
|
|
|
|
|
|
|
def test_params_two_things(self):
|
|
|
|
|
self.params.put("DongleId", "bob")
|
2025-08-01 12:29:25 -04:00
|
|
|
self.params.put("AthenadPid", 123)
|
2025-07-22 21:58:06 -07:00
|
|
|
assert self.params.get("DongleId") == "bob"
|
2025-08-01 12:29:25 -04:00
|
|
|
assert self.params.get("AthenadPid") == 123
|
2020-01-17 10:28:44 -08:00
|
|
|
|
|
|
|
|
def test_params_get_block(self):
|
|
|
|
|
def _delayed_writer():
|
|
|
|
|
time.sleep(0.1)
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("CarParams", b"test")
|
2020-01-17 10:28:44 -08:00
|
|
|
threading.Thread(target=_delayed_writer).start()
|
|
|
|
|
assert self.params.get("CarParams") is None
|
2025-07-25 19:20:49 -07:00
|
|
|
assert self.params.get("CarParams", block=True) == b"test"
|
2020-01-17 10:28:44 -08:00
|
|
|
|
|
|
|
|
def test_params_unknown_key_fails(self):
|
2024-05-17 11:01:44 -07:00
|
|
|
with pytest.raises(UnknownKeyName):
|
2020-01-17 10:28:44 -08:00
|
|
|
self.params.get("swag")
|
|
|
|
|
|
2024-05-17 11:01:44 -07:00
|
|
|
with pytest.raises(UnknownKeyName):
|
2021-04-07 15:36:37 +02:00
|
|
|
self.params.get_bool("swag")
|
|
|
|
|
|
2024-05-17 11:01:44 -07:00
|
|
|
with pytest.raises(UnknownKeyName):
|
2021-04-07 15:36:37 +02:00
|
|
|
self.params.put("swag", "abc")
|
|
|
|
|
|
2024-05-17 11:01:44 -07:00
|
|
|
with pytest.raises(UnknownKeyName):
|
2021-04-07 15:36:37 +02:00
|
|
|
self.params.put_bool("swag", True)
|
|
|
|
|
|
2022-08-26 20:46:19 -07:00
|
|
|
def test_remove_not_there(self):
|
2020-10-13 16:23:23 +02:00
|
|
|
assert self.params.get("CarParams") is None
|
2022-08-26 20:46:19 -07:00
|
|
|
self.params.remove("CarParams")
|
2020-10-13 16:23:23 +02:00
|
|
|
assert self.params.get("CarParams") is None
|
|
|
|
|
|
2021-04-07 15:36:37 +02:00
|
|
|
def test_get_bool(self):
|
2022-08-26 20:46:19 -07:00
|
|
|
self.params.remove("IsMetric")
|
2024-05-17 11:01:44 -07:00
|
|
|
assert not self.params.get_bool("IsMetric")
|
2021-04-07 15:36:37 +02:00
|
|
|
|
|
|
|
|
self.params.put_bool("IsMetric", True)
|
2024-05-17 11:01:44 -07:00
|
|
|
assert self.params.get_bool("IsMetric")
|
2021-04-07 15:36:37 +02:00
|
|
|
|
|
|
|
|
self.params.put_bool("IsMetric", False)
|
2024-05-17 11:01:44 -07:00
|
|
|
assert not self.params.get_bool("IsMetric")
|
2021-04-07 15:36:37 +02:00
|
|
|
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("IsMetric", True)
|
2024-05-17 11:01:44 -07:00
|
|
|
assert self.params.get_bool("IsMetric")
|
2021-04-07 15:36:37 +02:00
|
|
|
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("IsMetric", False)
|
2024-05-17 11:01:44 -07:00
|
|
|
assert not self.params.get_bool("IsMetric")
|
2021-04-07 15:36:37 +02:00
|
|
|
|
2020-10-13 16:23:23 +02:00
|
|
|
def test_put_non_blocking_with_get_block(self):
|
2023-09-05 18:52:40 -07:00
|
|
|
q = Params()
|
2020-10-13 16:23:23 +02:00
|
|
|
def _delayed_writer():
|
|
|
|
|
time.sleep(0.1)
|
2025-07-25 19:20:49 -07:00
|
|
|
Params().put_nonblocking("CarParams", b"test")
|
2020-10-13 16:23:23 +02:00
|
|
|
threading.Thread(target=_delayed_writer).start()
|
|
|
|
|
assert q.get("CarParams") is None
|
|
|
|
|
assert q.get("CarParams", True) == b"test"
|
|
|
|
|
|
2022-07-28 13:17:11 -07:00
|
|
|
def test_put_bool_non_blocking_with_get_block(self):
|
2023-09-05 18:52:40 -07:00
|
|
|
q = Params()
|
2022-07-28 13:17:11 -07:00
|
|
|
def _delayed_writer():
|
|
|
|
|
time.sleep(0.1)
|
2023-12-14 12:36:01 +08:00
|
|
|
Params().put_bool_nonblocking("CarParams", True)
|
2022-07-28 13:17:11 -07:00
|
|
|
threading.Thread(target=_delayed_writer).start()
|
|
|
|
|
assert q.get("CarParams") is None
|
|
|
|
|
assert q.get("CarParams", True) == b"1"
|
|
|
|
|
|
2022-09-16 06:43:27 +08:00
|
|
|
def test_params_all_keys(self):
|
|
|
|
|
keys = Params().all_keys()
|
|
|
|
|
|
|
|
|
|
# sanity checks
|
|
|
|
|
assert len(keys) > 20
|
|
|
|
|
assert len(keys) == len(set(keys))
|
|
|
|
|
assert b"CarParams" in keys
|
2025-07-22 19:30:47 -07:00
|
|
|
|
2025-07-24 17:58:16 -07:00
|
|
|
def test_params_default_value(self):
|
|
|
|
|
self.params.remove("LanguageSetting")
|
|
|
|
|
self.params.remove("LongitudinalPersonality")
|
|
|
|
|
self.params.remove("LiveParameters")
|
|
|
|
|
|
|
|
|
|
assert self.params.get("LanguageSetting") is None
|
|
|
|
|
assert self.params.get("LanguageSetting", return_default=False) is None
|
|
|
|
|
assert isinstance(self.params.get("LanguageSetting", return_default=True), str)
|
|
|
|
|
assert isinstance(self.params.get("LongitudinalPersonality", return_default=True), int)
|
|
|
|
|
assert self.params.get("LiveParameters") is None
|
|
|
|
|
assert self.params.get("LiveParameters", return_default=True) is None
|
2025-07-22 19:30:47 -07:00
|
|
|
|
|
|
|
|
def test_params_get_type(self):
|
|
|
|
|
# json
|
2025-12-01 20:48:04 -08:00
|
|
|
self.params.put("ApiCache_FirehoseStats", {"a": 0})
|
|
|
|
|
assert self.params.get("ApiCache_FirehoseStats") == {"a": 0}
|
2025-07-22 19:30:47 -07:00
|
|
|
|
|
|
|
|
# int
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("BootCount", 1441)
|
2025-07-22 19:30:47 -07:00
|
|
|
assert self.params.get("BootCount") == 1441
|
|
|
|
|
|
|
|
|
|
# bool
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("AdbEnabled", True)
|
2025-07-22 19:30:47 -07:00
|
|
|
assert self.params.get("AdbEnabled")
|
2025-07-24 17:58:16 -07:00
|
|
|
assert isinstance(self.params.get("AdbEnabled"), bool)
|
2025-07-22 19:30:47 -07:00
|
|
|
|
|
|
|
|
# time
|
|
|
|
|
now = datetime.datetime.now(datetime.UTC)
|
2025-07-25 19:20:49 -07:00
|
|
|
self.params.put("InstallDate", now)
|
2025-07-22 21:58:06 -07:00
|
|
|
assert self.params.get("InstallDate") == now
|