mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-24 22:23:53 +08:00
* always c++ * Create C++ params class * get works * tests hang now * passes tests * cleanup string formatting * handle interrupt in blocking read * fix memory leak * remove unused constructor * Use delete_db_value directly * Rename put -> write_db_value * filename cleanup * no semicolons in cython * Update common/SConscript Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add std::string version of delete_db_value * This is handled * cleanup encoding * Add clear method to clear all * add persistent params * fix android build * Should be called clear_all * only import params when needed * set params path on manager import * recusrively create directories * Fix function order * cleanup mkdirp Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> Co-authored-by: Comma Device <device@comma.ai>
87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
import os
|
|
import threading
|
|
import time
|
|
import tempfile
|
|
import shutil
|
|
import stat
|
|
import unittest
|
|
|
|
from common.params import Params, UnknownKeyName, put_nonblocking
|
|
|
|
class TestParams(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
print("using", self.tmpdir)
|
|
self.params = Params(self.tmpdir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmpdir)
|
|
|
|
def test_params_put_and_get(self):
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
assert self.params.get("DongleId") == b"cb38263377b873ee"
|
|
|
|
def test_params_non_ascii(self):
|
|
st = b"\xe1\x90\xff"
|
|
self.params.put("CarParams", st)
|
|
assert self.params.get("CarParams") == st
|
|
|
|
def test_params_get_cleared_panda_disconnect(self):
|
|
self.params.put("CarParams", "test")
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
assert self.params.get("CarParams") == b"test"
|
|
self.params.panda_disconnect()
|
|
assert self.params.get("CarParams") is None
|
|
assert self.params.get("DongleId") is not None
|
|
|
|
def test_params_get_cleared_manager_start(self):
|
|
self.params.put("CarParams", "test")
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
assert self.params.get("CarParams") == b"test"
|
|
self.params.manager_start()
|
|
assert self.params.get("CarParams") is None
|
|
assert self.params.get("DongleId") is not None
|
|
|
|
def test_params_two_things(self):
|
|
self.params.put("DongleId", "bob")
|
|
self.params.put("AthenadPid", "123")
|
|
assert self.params.get("DongleId") == b"bob"
|
|
assert self.params.get("AthenadPid") == b"123"
|
|
|
|
def test_params_get_block(self):
|
|
def _delayed_writer():
|
|
time.sleep(0.1)
|
|
self.params.put("CarParams", "test")
|
|
threading.Thread(target=_delayed_writer).start()
|
|
assert self.params.get("CarParams") is None
|
|
assert self.params.get("CarParams", True) == b"test"
|
|
|
|
def test_params_unknown_key_fails(self):
|
|
with self.assertRaises(UnknownKeyName):
|
|
self.params.get("swag")
|
|
|
|
def test_params_permissions(self):
|
|
permissions = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH
|
|
|
|
self.params.put("DongleId", "cb38263377b873ee")
|
|
st_mode = os.stat(f"{self.tmpdir}/d/DongleId").st_mode
|
|
assert (st_mode & permissions) == permissions
|
|
|
|
def test_delete_not_there(self):
|
|
assert self.params.get("CarParams") is None
|
|
self.params.delete("CarParams")
|
|
assert self.params.get("CarParams") is None
|
|
|
|
def test_put_non_blocking_with_get_block(self):
|
|
q = Params(self.tmpdir)
|
|
def _delayed_writer():
|
|
time.sleep(0.1)
|
|
put_nonblocking("CarParams", "test", self.tmpdir)
|
|
threading.Thread(target=_delayed_writer).start()
|
|
assert q.get("CarParams") is None
|
|
assert q.get("CarParams", True) == b"test"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|