Files
sunnypilot/common/params_pyx.pyx
Milan Medić 284450400e add Cython static analysis (#31491)
* Adding pre-commit hook for cython static analysis

* Adding changes to cython files to pass static analysis

* Revert "Adding changes to cython files to pass static analysis"

This reverts commit 9a0eb733199abd9eef1eac3d024ef2760348d67c.

* Adding ignore rule for indentation of 4 spaces (E111)

* Fixes for cython-lint static analysis

* Revert "Fixes for cython-lint static analysis"

This reverts commit 972741735b2bdc73460d65a4d7ea167dfc0f4644.

* Adding two new rules into ignore list (2 new lines after difinition of class)

* Adding fixes for cython static analysis
old-commit-hash: 69fb3c2ed5
2024-02-19 12:11:56 -08:00

119 lines
3.1 KiB
Cython

# distutils: language = c++
# cython: language_level = 3
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.vector cimport vector
cdef extern from "common/params.h":
cpdef enum ParamKeyType:
PERSISTENT
CLEAR_ON_MANAGER_START
CLEAR_ON_ONROAD_TRANSITION
CLEAR_ON_OFFROAD_TRANSITION
DEVELOPMENT_ONLY
ALL
cdef cppclass c_Params "Params":
c_Params(string) except + nogil
string get(string, bool) nogil
bool getBool(string, bool) nogil
int remove(string) nogil
int put(string, string) nogil
void putNonBlocking(string, string) nogil
void putBoolNonBlocking(string, bool) nogil
int putBool(string, bool) nogil
bool checkKey(string) nogil
string getParamPath(string) nogil
void clearAll(ParamKeyType)
vector[string] allKeys()
def ensure_bytes(v):
return v.encode() if isinstance(v, str) else v
class UnknownKeyName(Exception):
pass
cdef class Params:
cdef c_Params* p
def __cinit__(self, d=""):
cdef string path = <string>d.encode()
with nogil:
self.p = new c_Params(path)
def __dealloc__(self):
del self.p
def clear_all(self, tx_type=ParamKeyType.ALL):
self.p.clearAll(tx_type)
def check_key(self, key):
key = ensure_bytes(key)
if not self.p.checkKey(key):
raise UnknownKeyName(key)
return key
def get(self, key, bool block=False, encoding=None):
cdef string k = self.check_key(key)
cdef string val
with nogil:
val = self.p.get(k, block)
if val == b"":
if block:
# If we got no value while running in blocked mode
# it means we got an interrupt while waiting
raise KeyboardInterrupt
else:
return None
return val if encoding is None else val.decode(encoding)
def get_bool(self, key, bool block=False):
cdef string k = self.check_key(key)
cdef bool r
with nogil:
r = self.p.getBool(k, block)
return r
def put(self, key, dat):
"""
Warning: This function blocks until the param is written to disk!
In very rare cases this can take over a second, and your code will hang.
Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but
in general try to avoid writing params as much as possible.
"""
cdef string k = self.check_key(key)
cdef string dat_bytes = ensure_bytes(dat)
with nogil:
self.p.put(k, dat_bytes)
def put_bool(self, key, bool val):
cdef string k = self.check_key(key)
with nogil:
self.p.putBool(k, val)
def put_nonblocking(self, key, dat):
cdef string k = self.check_key(key)
cdef string dat_bytes = ensure_bytes(dat)
with nogil:
self.p.putNonBlocking(k, dat_bytes)
def put_bool_nonblocking(self, key, bool val):
cdef string k = self.check_key(key)
with nogil:
self.p.putBoolNonBlocking(k, val)
def remove(self, key):
cdef string k = self.check_key(key)
with nogil:
self.p.remove(k)
def get_param_path(self, key=""):
cdef string key_bytes = ensure_bytes(key)
return self.p.getParamPath(key_bytes).decode("utf-8")
def all_keys(self):
return self.p.allKeys()