edge/opendbc/can/parser_pyx.pyx

147 lines
3.9 KiB
Cython

# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3
from cython.operator cimport dereference as deref, preincrement as preinc
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_set cimport unordered_set
from libc.stdint cimport uint32_t
from .common cimport CANParser as cpp_CANParser
from .common cimport dbc_lookup, SignalValue, DBC
import numbers
from collections import defaultdict
cdef class CANParser:
cdef:
cpp_CANParser *can
const DBC *dbc
vector[SignalValue] can_values
cdef readonly:
dict vl
dict vl_all
dict ts_nanos
string dbc_name
def __init__(self, dbc_name, messages, bus=0):
self.dbc_name = dbc_name
self.dbc = dbc_lookup(dbc_name)
if not self.dbc:
raise RuntimeError(f"Can't find DBC: {dbc_name}")
self.vl = {}
self.vl_all = {}
self.ts_nanos = {}
msg_name_to_address = {}
address_to_msg_name = {}
for i in range(self.dbc[0].msgs.size()):
msg = self.dbc[0].msgs[i]
name = msg.name.decode("utf8")
msg_name_to_address[name] = msg.address
address_to_msg_name[msg.address] = name
# Convert message names into addresses and check existence in DBC
cdef vector[pair[uint32_t, int]] message_v
for i in range(len(messages)):
c = messages[i]
address = c[0] if isinstance(c[0], numbers.Number) else msg_name_to_address.get(c[0])
if address not in address_to_msg_name:
raise RuntimeError(f"could not find message {repr(c[0])} in DBC {self.dbc_name}")
message_v.push_back((address, c[1]))
name = address_to_msg_name[address]
self.vl[address] = {}
self.vl[name] = self.vl[address]
self.vl_all[address] = {}
self.vl_all[name] = self.vl_all[address]
self.ts_nanos[address] = {}
self.ts_nanos[name] = self.ts_nanos[address]
self.can = new cpp_CANParser(bus, dbc_name, message_v)
self.update_strings([])
def __dealloc__(self):
if self.can:
del self.can
def update_strings(self, strings, sendcan=False):
for v in self.vl_all.values():
for l in v.values(): # no-cython-lint
l.clear()
cdef vector[SignalValue] new_vals
cdef unordered_set[uint32_t] updated_addrs
self.can.update_strings(strings, new_vals, sendcan)
cdef vector[SignalValue].iterator it = new_vals.begin()
cdef SignalValue* cv
while it != new_vals.end():
cv = &deref(it)
# Cast char * directly to unicode
cv_name = <unicode>cv.name
self.vl[cv.address][cv_name] = cv.value
self.vl_all[cv.address][cv_name] = cv.all_values
self.ts_nanos[cv.address][cv_name] = cv.ts_nanos
updated_addrs.insert(cv.address)
preinc(it)
return updated_addrs
@property
def can_valid(self):
return self.can.can_valid
@property
def bus_timeout(self):
return self.can.bus_timeout
cdef class CANDefine():
cdef:
const DBC *dbc
cdef public:
dict dv
string dbc_name
def __init__(self, dbc_name):
self.dbc_name = dbc_name
self.dbc = dbc_lookup(dbc_name)
if not self.dbc:
raise RuntimeError(f"Can't find DBC: '{dbc_name}'")
address_to_msg_name = {}
for i in range(self.dbc[0].msgs.size()):
msg = self.dbc[0].msgs[i]
name = msg.name.decode("utf8")
address = msg.address
address_to_msg_name[address] = name
dv = defaultdict(dict)
for i in range(self.dbc[0].vals.size()):
val = self.dbc[0].vals[i]
sgname = val.name.decode("utf8")
def_val = val.def_val.decode("utf8")
address = val.address
msgname = address_to_msg_name[address]
# separate definition/value pairs
def_val = def_val.split()
values = [int(v) for v in def_val[::2]]
defs = def_val[1::2]
# two ways to lookup: address or msg name
dv[address][sgname] = dict(zip(values, defs))
dv[msgname][sgname] = dv[address][sgname]
self.dv = dict(dv)