mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 01:53:57 +08:00
* Safe and efficient asynchronous writing parameters
* call putNonBlocking in locationd
* remove space
* ->AsyncWriter
* remove semicolon
* use member function
* asyc write multiple times
* add test case for AsyncWriter
* merge master
* add missing include
* public
* cleanup
* create once
* cleanup
* update that
* explicit waiting
* improve test case
* pass prefix to asywriter
* move to params
* assert(queue.empty())
* add comment
* add todo
* test_power_monitoring: remove patch
* rm laikad.py
* fix import
---------
Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 3c4c4d1f7f
114 lines
3.4 KiB
Python
114 lines
3.4 KiB
Python
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import unittest
|
|
|
|
from openpilot.common.params import Params, ParamKeyType, UnknownKeyName
|
|
|
|
class TestParams(unittest.TestCase):
|
|
def setUp(self):
|
|
self.params = Params()
|
|
|
|
def test_params_put_and_get(self):
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
assert self.params.get("DongleId") == b"cb38263377b873ee"
|
|
|
|
def test_params_non_ascii(self):
|
|
st = b"\xe1\x90\xff"
|
|
self.params.put("CarParams", st)
|
|
assert self.params.get("CarParams") == st
|
|
|
|
def test_params_get_cleared_manager_start(self):
|
|
self.params.put("CarParams", "test")
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
assert self.params.get("CarParams") == b"test"
|
|
|
|
undefined_param = self.params.get_param_path(uuid.uuid4().hex)
|
|
with open(undefined_param, "w") as f:
|
|
f.write("test")
|
|
assert os.path.isfile(undefined_param)
|
|
|
|
self.params.clear_all(ParamKeyType.CLEAR_ON_MANAGER_START)
|
|
assert self.params.get("CarParams") is None
|
|
assert self.params.get("DongleId") is not None
|
|
assert not os.path.isfile(undefined_param)
|
|
|
|
def test_params_two_things(self):
|
|
self.params.put("DongleId", "bob")
|
|
self.params.put("AthenadPid", "123")
|
|
assert self.params.get("DongleId") == b"bob"
|
|
assert self.params.get("AthenadPid") == b"123"
|
|
|
|
def test_params_get_block(self):
|
|
def _delayed_writer():
|
|
time.sleep(0.1)
|
|
self.params.put("CarParams", "test")
|
|
threading.Thread(target=_delayed_writer).start()
|
|
assert self.params.get("CarParams") is None
|
|
assert self.params.get("CarParams", True) == b"test"
|
|
|
|
def test_params_unknown_key_fails(self):
|
|
with self.assertRaises(UnknownKeyName):
|
|
self.params.get("swag")
|
|
|
|
with self.assertRaises(UnknownKeyName):
|
|
self.params.get_bool("swag")
|
|
|
|
with self.assertRaises(UnknownKeyName):
|
|
self.params.put("swag", "abc")
|
|
|
|
with self.assertRaises(UnknownKeyName):
|
|
self.params.put_bool("swag", True)
|
|
|
|
def test_remove_not_there(self):
|
|
assert self.params.get("CarParams") is None
|
|
self.params.remove("CarParams")
|
|
assert self.params.get("CarParams") is None
|
|
|
|
def test_get_bool(self):
|
|
self.params.remove("IsMetric")
|
|
self.assertFalse(self.params.get_bool("IsMetric"))
|
|
|
|
self.params.put_bool("IsMetric", True)
|
|
self.assertTrue(self.params.get_bool("IsMetric"))
|
|
|
|
self.params.put_bool("IsMetric", False)
|
|
self.assertFalse(self.params.get_bool("IsMetric"))
|
|
|
|
self.params.put("IsMetric", "1")
|
|
self.assertTrue(self.params.get_bool("IsMetric"))
|
|
|
|
self.params.put("IsMetric", "0")
|
|
self.assertFalse(self.params.get_bool("IsMetric"))
|
|
|
|
def test_put_non_blocking_with_get_block(self):
|
|
q = Params()
|
|
def _delayed_writer():
|
|
time.sleep(0.1)
|
|
Params().put_nonblocking("CarParams", "test")
|
|
threading.Thread(target=_delayed_writer).start()
|
|
assert q.get("CarParams") is None
|
|
assert q.get("CarParams", True) == b"test"
|
|
|
|
def test_put_bool_non_blocking_with_get_block(self):
|
|
q = Params()
|
|
def _delayed_writer():
|
|
time.sleep(0.1)
|
|
Params().put_bool_nonblocking("CarParams", True)
|
|
threading.Thread(target=_delayed_writer).start()
|
|
assert q.get("CarParams") is None
|
|
assert q.get("CarParams", True) == b"1"
|
|
|
|
def test_params_all_keys(self):
|
|
keys = Params().all_keys()
|
|
|
|
# sanity checks
|
|
assert len(keys) > 20
|
|
assert len(keys) == len(set(keys))
|
|
assert b"CarParams" in keys
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|