diff --git a/SConstruct b/SConstruct index 094503cfa..ca5b7b6cb 100644 --- a/SConstruct +++ b/SConstruct @@ -14,7 +14,6 @@ Decider('MD5-timestamp') SetOption('num_jobs', max(1, int(os.cpu_count()/2))) -AddOption('--kaitai', action='store_true', help='Regenerate kaitai struct parsers') AddOption('--asan', action='store_true', help='turn on ASAN') AddOption('--ubsan', action='store_true', help='turn on UBSan') AddOption('--mutation', action='store_true', help='generate mutation-ready code') @@ -202,7 +201,6 @@ SConscript(['rednose/SConscript']) # Build system services SConscript([ - 'system/ubloxd/SConscript', 'system/loggerd/SConscript', ]) diff --git a/pyproject.toml b/pyproject.toml index 19491ba53..1be5c395f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,9 +33,6 @@ dependencies = [ "pyopenssl < 24.3.0", "pyaudio", - # ubloxd (TODO: just use struct) - "kaitaistruct", - # panda "libusb1", "spidev; platform_system == 'Linux'", diff --git a/system/ubloxd/SConscript b/system/ubloxd/SConscript deleted file mode 100644 index 9eb50760b..000000000 --- a/system/ubloxd/SConscript +++ /dev/null @@ -1,11 +0,0 @@ -Import('env') - -if GetOption('kaitai'): - current_dir = Dir('./generated/').srcnode().abspath - python_cmd = f"kaitai-struct-compiler --target python --outdir {current_dir} $SOURCES" - env.Command(File('./generated/ubx.py'), 'ubx.ksy', python_cmd) - env.Command(File('./generated/gps.py'), 'gps.ksy', python_cmd) - env.Command(File('./generated/glonass.py'), 'glonass.ksy', python_cmd) - # kaitai issue: https://github.com/kaitai-io/kaitai_struct/issues/910 - py_glonass_fix = env.Command(None, File('./generated/glonass.py'), "sed -i 's/self._io.align_to_byte()/# self._io.align_to_byte()/' $SOURCES") - env.Depends(py_glonass_fix, File('./generated/glonass.py')) diff --git a/system/ubloxd/binary_struct.py b/system/ubloxd/binary_struct.py new file mode 100644 index 000000000..7b229620a --- /dev/null +++ b/system/ubloxd/binary_struct.py @@ -0,0 +1,280 @@ +""" +Binary struct parsing DSL. + +Defines a declarative schema for binary messages using dataclasses +and type annotations. +""" + +import struct +from enum import Enum +from dataclasses import dataclass, is_dataclass +from typing import Annotated, Any, TypeVar, get_args, get_origin + + +class FieldType: + """Base class for field type descriptors.""" + + +@dataclass(frozen=True) +class IntType(FieldType): + bits: int + signed: bool + big_endian: bool = False + +@dataclass(frozen=True) +class FloatType(FieldType): + bits: int + +@dataclass(frozen=True) +class BitsType(FieldType): + bits: int + +@dataclass(frozen=True) +class BytesType(FieldType): + size: int + +@dataclass(frozen=True) +class ArrayType(FieldType): + element_type: Any + count_field: str + +@dataclass(frozen=True) +class SwitchType(FieldType): + selector: str + cases: dict[Any, Any] + default: Any = None + +@dataclass(frozen=True) +class EnumType(FieldType): + base_type: FieldType + enum_cls: type[Enum] + +@dataclass(frozen=True) +class ConstType(FieldType): + base_type: FieldType + expected: Any + +@dataclass(frozen=True) +class SubstreamType(FieldType): + length_field: str + element_type: Any + +# Common types - little endian +u8 = IntType(8, False) +u16 = IntType(16, False) +u32 = IntType(32, False) +s8 = IntType(8, True) +s16 = IntType(16, True) +s32 = IntType(32, True) +f32 = FloatType(32) +f64 = FloatType(64) +# Big endian variants +u16be = IntType(16, False, big_endian=True) +u32be = IntType(32, False, big_endian=True) +s16be = IntType(16, True, big_endian=True) +s32be = IntType(32, True, big_endian=True) + + +def bits(n: int) -> BitsType: + """Create a bit-level field type.""" + return BitsType(n) + +def bytes_field(size: int) -> BytesType: + """Create a fixed-size bytes field.""" + return BytesType(size) + +def array(element_type: Any, count_field: str) -> ArrayType: + """Create an array/repeated field.""" + return ArrayType(element_type, count_field) + +def switch(selector: str, cases: dict[Any, Any], default: Any = None) -> SwitchType: + """Create a switch-on field.""" + return SwitchType(selector, cases, default) + +def enum(base_type: Any, enum_cls: type[Enum]) -> EnumType: + """Create an enum-wrapped field.""" + field_type = _field_type_from_spec(base_type) + if field_type is None: + raise TypeError(f"Unsupported field type: {base_type!r}") + return EnumType(field_type, enum_cls) + +def const(base_type: Any, expected: Any) -> ConstType: + """Create a constant-value field.""" + field_type = _field_type_from_spec(base_type) + if field_type is None: + raise TypeError(f"Unsupported field type: {base_type!r}") + return ConstType(field_type, expected) + +def substream(length_field: str, element_type: Any) -> SubstreamType: + """Parse a fixed-length substream using an inner schema.""" + return SubstreamType(length_field, element_type) + + +class BinaryReader: + def __init__(self, data: bytes): + self.data = data + self.pos = 0 + self.bit_pos = 0 # 0-7, position within current byte + + def _require(self, n: int) -> None: + if self.pos + n > len(self.data): + raise EOFError("Unexpected end of data") + + def _read_struct(self, fmt: str): + self._align_to_byte() + size = struct.calcsize(fmt) + self._require(size) + value = struct.unpack_from(fmt, self.data, self.pos)[0] + self.pos += size + return value + + def read_bytes(self, n: int) -> bytes: + self._align_to_byte() + self._require(n) + result = self.data[self.pos : self.pos + n] + self.pos += n + return result + + def read_bits_int_be(self, n: int) -> int: + result = 0 + bits_remaining = n + while bits_remaining > 0: + if self.pos >= len(self.data): + raise EOFError("Unexpected end of data while reading bits") + bits_in_byte = 8 - self.bit_pos + bits_to_read = min(bits_remaining, bits_in_byte) + byte_val = self.data[self.pos] + shift = bits_in_byte - bits_to_read + mask = (1 << bits_to_read) - 1 + extracted = (byte_val >> shift) & mask + result = (result << bits_to_read) | extracted + self.bit_pos += bits_to_read + bits_remaining -= bits_to_read + if self.bit_pos >= 8: + self.bit_pos = 0 + self.pos += 1 + return result + + def _align_to_byte(self) -> None: + if self.bit_pos > 0: + self.bit_pos = 0 + self.pos += 1 + + +T = TypeVar('T', bound='BinaryStruct') + + +class BinaryStruct: + """Base class for binary struct definitions.""" + + def __init_subclass__(cls, **kwargs) -> None: + super().__init_subclass__(**kwargs) + if cls is BinaryStruct: + return + if not is_dataclass(cls): + dataclass(init=False)(cls) + fields = list(getattr(cls, '__annotations__', {}).items()) + cls.__binary_fields__ = fields # type: ignore[attr-defined] + + @classmethod + def _read(inner_cls, reader: BinaryReader): + obj = inner_cls.__new__(inner_cls) + for name, spec in inner_cls.__binary_fields__: + value = _parse_field(spec, reader, obj) + setattr(obj, name, value) + return obj + + cls._read = _read # type: ignore[attr-defined] + + @classmethod + def from_bytes(cls: type[T], data: bytes) -> T: + """Parse struct from bytes.""" + reader = BinaryReader(data) + return cls._read(reader) + + @classmethod + def _read(cls: type[T], reader: BinaryReader) -> T: + """Override in subclasses to implement parsing.""" + raise NotImplementedError + + +def _resolve_path(obj: Any, path: str) -> Any: + cur = obj + for part in path.split('.'): + cur = getattr(cur, part) + return cur + +def _unwrap_annotated(spec: Any) -> tuple[Any, ...]: + if get_origin(spec) is Annotated: + return get_args(spec)[1:] + return () + +def _field_type_from_spec(spec: Any) -> FieldType | None: + if isinstance(spec, FieldType): + return spec + for item in _unwrap_annotated(spec): + if isinstance(item, FieldType): + return item + return None + + +def _int_format(field_type: IntType) -> str: + if field_type.bits == 8: + return 'b' if field_type.signed else 'B' + endian = '>' if field_type.big_endian else '<' + if field_type.bits == 16: + code = 'h' if field_type.signed else 'H' + elif field_type.bits == 32: + code = 'i' if field_type.signed else 'I' + else: + raise ValueError(f"Unsupported integer size: {field_type.bits}") + return f"{endian}{code}" + +def _float_format(field_type: FloatType) -> str: + if field_type.bits == 32: + return ' Any: + field_type = _field_type_from_spec(spec) + if field_type is not None: + spec = field_type + if isinstance(spec, ConstType): + value = _parse_field(spec.base_type, reader, obj) + if value != spec.expected: + raise ValueError(f"Invalid constant: expected {spec.expected!r}, got {value!r}") + return value + if isinstance(spec, EnumType): + raw = _parse_field(spec.base_type, reader, obj) + try: + return spec.enum_cls(raw) + except ValueError: + return raw + if isinstance(spec, SwitchType): + key = _resolve_path(obj, spec.selector) + target = spec.cases.get(key, spec.default) + if target is None: + return None + return _parse_field(target, reader, obj) + if isinstance(spec, ArrayType): + count = _resolve_path(obj, spec.count_field) + return [_parse_field(spec.element_type, reader, obj) for _ in range(int(count))] + if isinstance(spec, SubstreamType): + length = _resolve_path(obj, spec.length_field) + data = reader.read_bytes(int(length)) + sub_reader = BinaryReader(data) + return _parse_field(spec.element_type, sub_reader, obj) + if isinstance(spec, IntType): + return reader._read_struct(_int_format(spec)) + if isinstance(spec, FloatType): + return reader._read_struct(_float_format(spec)) + if isinstance(spec, BitsType): + value = reader.read_bits_int_be(spec.bits) + return bool(value) if spec.bits == 1 else value + if isinstance(spec, BytesType): + return reader.read_bytes(spec.size) + if isinstance(spec, type) and issubclass(spec, BinaryStruct): + return spec._read(reader) + raise TypeError(f"Unsupported field spec: {spec!r}") diff --git a/system/ubloxd/generated/glonass.py b/system/ubloxd/generated/glonass.py deleted file mode 100644 index 40aa16bb6..000000000 --- a/system/ubloxd/generated/glonass.py +++ /dev/null @@ -1,247 +0,0 @@ -# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild - -import kaitaistruct -from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO - - -if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): - raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) - -class Glonass(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.idle_chip = self._io.read_bits_int_be(1) != 0 - self.string_number = self._io.read_bits_int_be(4) - # workaround for kaitai bit alignment issue (see glonass_fix.patch for C++) - # self._io.align_to_byte() - _on = self.string_number - if _on == 4: - self.data = Glonass.String4(self._io, self, self._root) - elif _on == 1: - self.data = Glonass.String1(self._io, self, self._root) - elif _on == 3: - self.data = Glonass.String3(self._io, self, self._root) - elif _on == 5: - self.data = Glonass.String5(self._io, self, self._root) - elif _on == 2: - self.data = Glonass.String2(self._io, self, self._root) - else: - self.data = Glonass.StringNonImmediate(self._io, self, self._root) - self.hamming_code = self._io.read_bits_int_be(8) - self.pad_1 = self._io.read_bits_int_be(11) - self.superframe_number = self._io.read_bits_int_be(16) - self.pad_2 = self._io.read_bits_int_be(8) - self.frame_number = self._io.read_bits_int_be(8) - - class String4(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.tau_n_sign = self._io.read_bits_int_be(1) != 0 - self.tau_n_value = self._io.read_bits_int_be(21) - self.delta_tau_n_sign = self._io.read_bits_int_be(1) != 0 - self.delta_tau_n_value = self._io.read_bits_int_be(4) - self.e_n = self._io.read_bits_int_be(5) - self.not_used_1 = self._io.read_bits_int_be(14) - self.p4 = self._io.read_bits_int_be(1) != 0 - self.f_t = self._io.read_bits_int_be(4) - self.not_used_2 = self._io.read_bits_int_be(3) - self.n_t = self._io.read_bits_int_be(11) - self.n = self._io.read_bits_int_be(5) - self.m = self._io.read_bits_int_be(2) - - @property - def tau_n(self): - if hasattr(self, '_m_tau_n'): - return self._m_tau_n - - self._m_tau_n = ((self.tau_n_value * -1) if self.tau_n_sign else self.tau_n_value) - return getattr(self, '_m_tau_n', None) - - @property - def delta_tau_n(self): - if hasattr(self, '_m_delta_tau_n'): - return self._m_delta_tau_n - - self._m_delta_tau_n = ((self.delta_tau_n_value * -1) if self.delta_tau_n_sign else self.delta_tau_n_value) - return getattr(self, '_m_delta_tau_n', None) - - - class StringNonImmediate(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.data_1 = self._io.read_bits_int_be(64) - self.data_2 = self._io.read_bits_int_be(8) - - - class String5(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.n_a = self._io.read_bits_int_be(11) - self.tau_c = self._io.read_bits_int_be(32) - self.not_used = self._io.read_bits_int_be(1) != 0 - self.n_4 = self._io.read_bits_int_be(5) - self.tau_gps = self._io.read_bits_int_be(22) - self.l_n = self._io.read_bits_int_be(1) != 0 - - - class String1(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.not_used = self._io.read_bits_int_be(2) - self.p1 = self._io.read_bits_int_be(2) - self.t_k = self._io.read_bits_int_be(12) - self.x_vel_sign = self._io.read_bits_int_be(1) != 0 - self.x_vel_value = self._io.read_bits_int_be(23) - self.x_accel_sign = self._io.read_bits_int_be(1) != 0 - self.x_accel_value = self._io.read_bits_int_be(4) - self.x_sign = self._io.read_bits_int_be(1) != 0 - self.x_value = self._io.read_bits_int_be(26) - - @property - def x_vel(self): - if hasattr(self, '_m_x_vel'): - return self._m_x_vel - - self._m_x_vel = ((self.x_vel_value * -1) if self.x_vel_sign else self.x_vel_value) - return getattr(self, '_m_x_vel', None) - - @property - def x_accel(self): - if hasattr(self, '_m_x_accel'): - return self._m_x_accel - - self._m_x_accel = ((self.x_accel_value * -1) if self.x_accel_sign else self.x_accel_value) - return getattr(self, '_m_x_accel', None) - - @property - def x(self): - if hasattr(self, '_m_x'): - return self._m_x - - self._m_x = ((self.x_value * -1) if self.x_sign else self.x_value) - return getattr(self, '_m_x', None) - - - class String2(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.b_n = self._io.read_bits_int_be(3) - self.p2 = self._io.read_bits_int_be(1) != 0 - self.t_b = self._io.read_bits_int_be(7) - self.not_used = self._io.read_bits_int_be(5) - self.y_vel_sign = self._io.read_bits_int_be(1) != 0 - self.y_vel_value = self._io.read_bits_int_be(23) - self.y_accel_sign = self._io.read_bits_int_be(1) != 0 - self.y_accel_value = self._io.read_bits_int_be(4) - self.y_sign = self._io.read_bits_int_be(1) != 0 - self.y_value = self._io.read_bits_int_be(26) - - @property - def y_vel(self): - if hasattr(self, '_m_y_vel'): - return self._m_y_vel - - self._m_y_vel = ((self.y_vel_value * -1) if self.y_vel_sign else self.y_vel_value) - return getattr(self, '_m_y_vel', None) - - @property - def y_accel(self): - if hasattr(self, '_m_y_accel'): - return self._m_y_accel - - self._m_y_accel = ((self.y_accel_value * -1) if self.y_accel_sign else self.y_accel_value) - return getattr(self, '_m_y_accel', None) - - @property - def y(self): - if hasattr(self, '_m_y'): - return self._m_y - - self._m_y = ((self.y_value * -1) if self.y_sign else self.y_value) - return getattr(self, '_m_y', None) - - - class String3(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.p3 = self._io.read_bits_int_be(1) != 0 - self.gamma_n_sign = self._io.read_bits_int_be(1) != 0 - self.gamma_n_value = self._io.read_bits_int_be(10) - self.not_used = self._io.read_bits_int_be(1) != 0 - self.p = self._io.read_bits_int_be(2) - self.l_n = self._io.read_bits_int_be(1) != 0 - self.z_vel_sign = self._io.read_bits_int_be(1) != 0 - self.z_vel_value = self._io.read_bits_int_be(23) - self.z_accel_sign = self._io.read_bits_int_be(1) != 0 - self.z_accel_value = self._io.read_bits_int_be(4) - self.z_sign = self._io.read_bits_int_be(1) != 0 - self.z_value = self._io.read_bits_int_be(26) - - @property - def gamma_n(self): - if hasattr(self, '_m_gamma_n'): - return self._m_gamma_n - - self._m_gamma_n = ((self.gamma_n_value * -1) if self.gamma_n_sign else self.gamma_n_value) - return getattr(self, '_m_gamma_n', None) - - @property - def z_vel(self): - if hasattr(self, '_m_z_vel'): - return self._m_z_vel - - self._m_z_vel = ((self.z_vel_value * -1) if self.z_vel_sign else self.z_vel_value) - return getattr(self, '_m_z_vel', None) - - @property - def z_accel(self): - if hasattr(self, '_m_z_accel'): - return self._m_z_accel - - self._m_z_accel = ((self.z_accel_value * -1) if self.z_accel_sign else self.z_accel_value) - return getattr(self, '_m_z_accel', None) - - @property - def z(self): - if hasattr(self, '_m_z'): - return self._m_z - - self._m_z = ((self.z_value * -1) if self.z_sign else self.z_value) - return getattr(self, '_m_z', None) - - diff --git a/system/ubloxd/generated/gps.py b/system/ubloxd/generated/gps.py deleted file mode 100644 index a999016f3..000000000 --- a/system/ubloxd/generated/gps.py +++ /dev/null @@ -1,193 +0,0 @@ -# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild - -import kaitaistruct -from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO - - -if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): - raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) - -class Gps(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.tlm = Gps.Tlm(self._io, self, self._root) - self.how = Gps.How(self._io, self, self._root) - _on = self.how.subframe_id - if _on == 1: - self.body = Gps.Subframe1(self._io, self, self._root) - elif _on == 2: - self.body = Gps.Subframe2(self._io, self, self._root) - elif _on == 3: - self.body = Gps.Subframe3(self._io, self, self._root) - elif _on == 4: - self.body = Gps.Subframe4(self._io, self, self._root) - - class Subframe1(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.week_no = self._io.read_bits_int_be(10) - self.code = self._io.read_bits_int_be(2) - self.sv_accuracy = self._io.read_bits_int_be(4) - self.sv_health = self._io.read_bits_int_be(6) - self.iodc_msb = self._io.read_bits_int_be(2) - self.l2_p_data_flag = self._io.read_bits_int_be(1) != 0 - self.reserved1 = self._io.read_bits_int_be(23) - self.reserved2 = self._io.read_bits_int_be(24) - self.reserved3 = self._io.read_bits_int_be(24) - self.reserved4 = self._io.read_bits_int_be(16) - self._io.align_to_byte() - self.t_gd = self._io.read_s1() - self.iodc_lsb = self._io.read_u1() - self.t_oc = self._io.read_u2be() - self.af_2 = self._io.read_s1() - self.af_1 = self._io.read_s2be() - self.af_0_sign = self._io.read_bits_int_be(1) != 0 - self.af_0_value = self._io.read_bits_int_be(21) - self.reserved5 = self._io.read_bits_int_be(2) - - @property - def af_0(self): - if hasattr(self, '_m_af_0'): - return self._m_af_0 - - self._m_af_0 = ((self.af_0_value - (1 << 21)) if self.af_0_sign else self.af_0_value) - return getattr(self, '_m_af_0', None) - - - class Subframe3(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.c_ic = self._io.read_s2be() - self.omega_0 = self._io.read_s4be() - self.c_is = self._io.read_s2be() - self.i_0 = self._io.read_s4be() - self.c_rc = self._io.read_s2be() - self.omega = self._io.read_s4be() - self.omega_dot_sign = self._io.read_bits_int_be(1) != 0 - self.omega_dot_value = self._io.read_bits_int_be(23) - self._io.align_to_byte() - self.iode = self._io.read_u1() - self.idot_sign = self._io.read_bits_int_be(1) != 0 - self.idot_value = self._io.read_bits_int_be(13) - self.reserved = self._io.read_bits_int_be(2) - - @property - def omega_dot(self): - if hasattr(self, '_m_omega_dot'): - return self._m_omega_dot - - self._m_omega_dot = ((self.omega_dot_value - (1 << 23)) if self.omega_dot_sign else self.omega_dot_value) - return getattr(self, '_m_omega_dot', None) - - @property - def idot(self): - if hasattr(self, '_m_idot'): - return self._m_idot - - self._m_idot = ((self.idot_value - (1 << 13)) if self.idot_sign else self.idot_value) - return getattr(self, '_m_idot', None) - - - class Subframe4(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.data_id = self._io.read_bits_int_be(2) - self.page_id = self._io.read_bits_int_be(6) - self._io.align_to_byte() - _on = self.page_id - if _on == 56: - self.body = Gps.Subframe4.IonosphereData(self._io, self, self._root) - - class IonosphereData(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.a0 = self._io.read_s1() - self.a1 = self._io.read_s1() - self.a2 = self._io.read_s1() - self.a3 = self._io.read_s1() - self.b0 = self._io.read_s1() - self.b1 = self._io.read_s1() - self.b2 = self._io.read_s1() - self.b3 = self._io.read_s1() - - - - class How(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.tow_count = self._io.read_bits_int_be(17) - self.alert = self._io.read_bits_int_be(1) != 0 - self.anti_spoof = self._io.read_bits_int_be(1) != 0 - self.subframe_id = self._io.read_bits_int_be(3) - self.reserved = self._io.read_bits_int_be(2) - - - class Tlm(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.preamble = self._io.read_bytes(1) - if not self.preamble == b"\x8B": - raise kaitaistruct.ValidationNotEqualError(b"\x8B", self.preamble, self._io, u"/types/tlm/seq/0") - self.tlm = self._io.read_bits_int_be(14) - self.integrity_status = self._io.read_bits_int_be(1) != 0 - self.reserved = self._io.read_bits_int_be(1) != 0 - - - class Subframe2(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.iode = self._io.read_u1() - self.c_rs = self._io.read_s2be() - self.delta_n = self._io.read_s2be() - self.m_0 = self._io.read_s4be() - self.c_uc = self._io.read_s2be() - self.e = self._io.read_s4be() - self.c_us = self._io.read_s2be() - self.sqrt_a = self._io.read_u4be() - self.t_oe = self._io.read_u2be() - self.fit_interval_flag = self._io.read_bits_int_be(1) != 0 - self.aoda = self._io.read_bits_int_be(5) - self.reserved = self._io.read_bits_int_be(2) - - - diff --git a/system/ubloxd/generated/ubx.py b/system/ubloxd/generated/ubx.py deleted file mode 100644 index 994658438..000000000 --- a/system/ubloxd/generated/ubx.py +++ /dev/null @@ -1,273 +0,0 @@ -# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild - -import kaitaistruct -from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO -from enum import Enum - - -if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): - raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) - -class Ubx(KaitaiStruct): - - class GnssType(Enum): - gps = 0 - sbas = 1 - galileo = 2 - beidou = 3 - imes = 4 - qzss = 5 - glonass = 6 - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.magic = self._io.read_bytes(2) - if not self.magic == b"\xB5\x62": - raise kaitaistruct.ValidationNotEqualError(b"\xB5\x62", self.magic, self._io, u"/seq/0") - self.msg_type = self._io.read_u2be() - self.length = self._io.read_u2le() - _on = self.msg_type - if _on == 2569: - self.body = Ubx.MonHw(self._io, self, self._root) - elif _on == 533: - self.body = Ubx.RxmRawx(self._io, self, self._root) - elif _on == 531: - self.body = Ubx.RxmSfrbx(self._io, self, self._root) - elif _on == 309: - self.body = Ubx.NavSat(self._io, self, self._root) - elif _on == 2571: - self.body = Ubx.MonHw2(self._io, self, self._root) - elif _on == 263: - self.body = Ubx.NavPvt(self._io, self, self._root) - - class RxmRawx(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.rcv_tow = self._io.read_f8le() - self.week = self._io.read_u2le() - self.leap_s = self._io.read_s1() - self.num_meas = self._io.read_u1() - self.rec_stat = self._io.read_u1() - self.reserved1 = self._io.read_bytes(3) - self._raw_meas = [] - self.meas = [] - for i in range(self.num_meas): - self._raw_meas.append(self._io.read_bytes(32)) - _io__raw_meas = KaitaiStream(BytesIO(self._raw_meas[i])) - self.meas.append(Ubx.RxmRawx.Measurement(_io__raw_meas, self, self._root)) - - - class Measurement(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.pr_mes = self._io.read_f8le() - self.cp_mes = self._io.read_f8le() - self.do_mes = self._io.read_f4le() - self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) - self.sv_id = self._io.read_u1() - self.reserved2 = self._io.read_bytes(1) - self.freq_id = self._io.read_u1() - self.lock_time = self._io.read_u2le() - self.cno = self._io.read_u1() - self.pr_stdev = self._io.read_u1() - self.cp_stdev = self._io.read_u1() - self.do_stdev = self._io.read_u1() - self.trk_stat = self._io.read_u1() - self.reserved3 = self._io.read_bytes(1) - - - - class RxmSfrbx(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) - self.sv_id = self._io.read_u1() - self.reserved1 = self._io.read_bytes(1) - self.freq_id = self._io.read_u1() - self.num_words = self._io.read_u1() - self.reserved2 = self._io.read_bytes(1) - self.version = self._io.read_u1() - self.reserved3 = self._io.read_bytes(1) - self.body = [] - for i in range(self.num_words): - self.body.append(self._io.read_u4le()) - - - - class NavSat(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.itow = self._io.read_u4le() - self.version = self._io.read_u1() - self.num_svs = self._io.read_u1() - self.reserved = self._io.read_bytes(2) - self._raw_svs = [] - self.svs = [] - for i in range(self.num_svs): - self._raw_svs.append(self._io.read_bytes(12)) - _io__raw_svs = KaitaiStream(BytesIO(self._raw_svs[i])) - self.svs.append(Ubx.NavSat.Nav(_io__raw_svs, self, self._root)) - - - class Nav(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) - self.sv_id = self._io.read_u1() - self.cno = self._io.read_u1() - self.elev = self._io.read_s1() - self.azim = self._io.read_s2le() - self.pr_res = self._io.read_s2le() - self.flags = self._io.read_u4le() - - - - class NavPvt(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.i_tow = self._io.read_u4le() - self.year = self._io.read_u2le() - self.month = self._io.read_u1() - self.day = self._io.read_u1() - self.hour = self._io.read_u1() - self.min = self._io.read_u1() - self.sec = self._io.read_u1() - self.valid = self._io.read_u1() - self.t_acc = self._io.read_u4le() - self.nano = self._io.read_s4le() - self.fix_type = self._io.read_u1() - self.flags = self._io.read_u1() - self.flags2 = self._io.read_u1() - self.num_sv = self._io.read_u1() - self.lon = self._io.read_s4le() - self.lat = self._io.read_s4le() - self.height = self._io.read_s4le() - self.h_msl = self._io.read_s4le() - self.h_acc = self._io.read_u4le() - self.v_acc = self._io.read_u4le() - self.vel_n = self._io.read_s4le() - self.vel_e = self._io.read_s4le() - self.vel_d = self._io.read_s4le() - self.g_speed = self._io.read_s4le() - self.head_mot = self._io.read_s4le() - self.s_acc = self._io.read_s4le() - self.head_acc = self._io.read_u4le() - self.p_dop = self._io.read_u2le() - self.flags3 = self._io.read_u1() - self.reserved1 = self._io.read_bytes(5) - self.head_veh = self._io.read_s4le() - self.mag_dec = self._io.read_s2le() - self.mag_acc = self._io.read_u2le() - - - class MonHw2(KaitaiStruct): - - class ConfigSource(Enum): - flash = 102 - otp = 111 - config_pins = 112 - rom = 113 - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.ofs_i = self._io.read_s1() - self.mag_i = self._io.read_u1() - self.ofs_q = self._io.read_s1() - self.mag_q = self._io.read_u1() - self.cfg_source = KaitaiStream.resolve_enum(Ubx.MonHw2.ConfigSource, self._io.read_u1()) - self.reserved1 = self._io.read_bytes(3) - self.low_lev_cfg = self._io.read_u4le() - self.reserved2 = self._io.read_bytes(8) - self.post_status = self._io.read_u4le() - self.reserved3 = self._io.read_bytes(4) - - - class MonHw(KaitaiStruct): - - class AntennaStatus(Enum): - init = 0 - dontknow = 1 - ok = 2 - short = 3 - open = 4 - - class AntennaPower(Enum): - false = 0 - true = 1 - dontknow = 2 - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.pin_sel = self._io.read_u4le() - self.pin_bank = self._io.read_u4le() - self.pin_dir = self._io.read_u4le() - self.pin_val = self._io.read_u4le() - self.noise_per_ms = self._io.read_u2le() - self.agc_cnt = self._io.read_u2le() - self.a_status = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaStatus, self._io.read_u1()) - self.a_power = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaPower, self._io.read_u1()) - self.flags = self._io.read_u1() - self.reserved1 = self._io.read_bytes(1) - self.used_mask = self._io.read_u4le() - self.vp = self._io.read_bytes(17) - self.jam_ind = self._io.read_u1() - self.reserved2 = self._io.read_bytes(2) - self.pin_irq = self._io.read_u4le() - self.pull_h = self._io.read_u4le() - self.pull_l = self._io.read_u4le() - - - @property - def checksum(self): - if hasattr(self, '_m_checksum'): - return self._m_checksum - - _pos = self._io.pos() - self._io.seek((self.length + 6)) - self._m_checksum = self._io.read_u2le() - self._io.seek(_pos) - return getattr(self, '_m_checksum', None) - - diff --git a/system/ubloxd/glonass.ksy b/system/ubloxd/glonass.ksy deleted file mode 100644 index be99f6e49..000000000 --- a/system/ubloxd/glonass.ksy +++ /dev/null @@ -1,176 +0,0 @@ -# http://gauss.gge.unb.ca/GLONASS.ICD.pdf -# some variables are misprinted but good in the old doc -# https://www.unavco.org/help/glossary/docs/ICD_GLONASS_4.0_(1998)_en.pdf -meta: - id: glonass - endian: be - bit-endian: be -seq: - - id: idle_chip - type: b1 - - id: string_number - type: b4 - - id: data - type: - switch-on: string_number - cases: - 1: string_1 - 2: string_2 - 3: string_3 - 4: string_4 - 5: string_5 - _: string_non_immediate - - id: hamming_code - type: b8 - - id: pad_1 - type: b11 - - id: superframe_number - type: b16 - - id: pad_2 - type: b8 - - id: frame_number - type: b8 - -types: - string_1: - seq: - - id: not_used - type: b2 - - id: p1 - type: b2 - - id: t_k - type: b12 - - id: x_vel_sign - type: b1 - - id: x_vel_value - type: b23 - - id: x_accel_sign - type: b1 - - id: x_accel_value - type: b4 - - id: x_sign - type: b1 - - id: x_value - type: b26 - instances: - x_vel: - value: 'x_vel_sign ? (x_vel_value * (-1)) : x_vel_value' - x_accel: - value: 'x_accel_sign ? (x_accel_value * (-1)) : x_accel_value' - x: - value: 'x_sign ? (x_value * (-1)) : x_value' - string_2: - seq: - - id: b_n - type: b3 - - id: p2 - type: b1 - - id: t_b - type: b7 - - id: not_used - type: b5 - - id: y_vel_sign - type: b1 - - id: y_vel_value - type: b23 - - id: y_accel_sign - type: b1 - - id: y_accel_value - type: b4 - - id: y_sign - type: b1 - - id: y_value - type: b26 - instances: - y_vel: - value: 'y_vel_sign ? (y_vel_value * (-1)) : y_vel_value' - y_accel: - value: 'y_accel_sign ? (y_accel_value * (-1)) : y_accel_value' - y: - value: 'y_sign ? (y_value * (-1)) : y_value' - string_3: - seq: - - id: p3 - type: b1 - - id: gamma_n_sign - type: b1 - - id: gamma_n_value - type: b10 - - id: not_used - type: b1 - - id: p - type: b2 - - id: l_n - type: b1 - - id: z_vel_sign - type: b1 - - id: z_vel_value - type: b23 - - id: z_accel_sign - type: b1 - - id: z_accel_value - type: b4 - - id: z_sign - type: b1 - - id: z_value - type: b26 - instances: - gamma_n: - value: 'gamma_n_sign ? (gamma_n_value * (-1)) : gamma_n_value' - z_vel: - value: 'z_vel_sign ? (z_vel_value * (-1)) : z_vel_value' - z_accel: - value: 'z_accel_sign ? (z_accel_value * (-1)) : z_accel_value' - z: - value: 'z_sign ? (z_value * (-1)) : z_value' - string_4: - seq: - - id: tau_n_sign - type: b1 - - id: tau_n_value - type: b21 - - id: delta_tau_n_sign - type: b1 - - id: delta_tau_n_value - type: b4 - - id: e_n - type: b5 - - id: not_used_1 - type: b14 - - id: p4 - type: b1 - - id: f_t - type: b4 - - id: not_used_2 - type: b3 - - id: n_t - type: b11 - - id: n - type: b5 - - id: m - type: b2 - instances: - tau_n: - value: 'tau_n_sign ? (tau_n_value * (-1)) : tau_n_value' - delta_tau_n: - value: 'delta_tau_n_sign ? (delta_tau_n_value * (-1)) : delta_tau_n_value' - string_5: - seq: - - id: n_a - type: b11 - - id: tau_c - type: b32 - - id: not_used - type: b1 - - id: n_4 - type: b5 - - id: tau_gps - type: b22 - - id: l_n - type: b1 - string_non_immediate: - seq: - - id: data_1 - type: b64 - - id: data_2 - type: b8 diff --git a/system/ubloxd/glonass.py b/system/ubloxd/glonass.py new file mode 100644 index 000000000..144ccdde6 --- /dev/null +++ b/system/ubloxd/glonass.py @@ -0,0 +1,156 @@ +""" +Parses GLONASS navigation strings per GLONASS ICD specification. +http://gauss.gge.unb.ca/GLONASS.ICD.pdf +https://www.unavco.org/help/glossary/docs/ICD_GLONASS_4.0_(1998)_en.pdf +""" + +from typing import Annotated + +from openpilot.system.ubloxd import binary_struct as bs + + +class Glonass(bs.BinaryStruct): + class String1(bs.BinaryStruct): + not_used: Annotated[int, bs.bits(2)] + p1: Annotated[int, bs.bits(2)] + t_k: Annotated[int, bs.bits(12)] + x_vel_sign: Annotated[bool, bs.bits(1)] + x_vel_value: Annotated[int, bs.bits(23)] + x_accel_sign: Annotated[bool, bs.bits(1)] + x_accel_value: Annotated[int, bs.bits(4)] + x_sign: Annotated[bool, bs.bits(1)] + x_value: Annotated[int, bs.bits(26)] + + @property + def x_vel(self) -> int: + """Computed x_vel from sign-magnitude representation.""" + return (self.x_vel_value * -1) if self.x_vel_sign else self.x_vel_value + + @property + def x_accel(self) -> int: + """Computed x_accel from sign-magnitude representation.""" + return (self.x_accel_value * -1) if self.x_accel_sign else self.x_accel_value + + @property + def x(self) -> int: + """Computed x from sign-magnitude representation.""" + return (self.x_value * -1) if self.x_sign else self.x_value + + class String2(bs.BinaryStruct): + b_n: Annotated[int, bs.bits(3)] + p2: Annotated[bool, bs.bits(1)] + t_b: Annotated[int, bs.bits(7)] + not_used: Annotated[int, bs.bits(5)] + y_vel_sign: Annotated[bool, bs.bits(1)] + y_vel_value: Annotated[int, bs.bits(23)] + y_accel_sign: Annotated[bool, bs.bits(1)] + y_accel_value: Annotated[int, bs.bits(4)] + y_sign: Annotated[bool, bs.bits(1)] + y_value: Annotated[int, bs.bits(26)] + + @property + def y_vel(self) -> int: + """Computed y_vel from sign-magnitude representation.""" + return (self.y_vel_value * -1) if self.y_vel_sign else self.y_vel_value + + @property + def y_accel(self) -> int: + """Computed y_accel from sign-magnitude representation.""" + return (self.y_accel_value * -1) if self.y_accel_sign else self.y_accel_value + + @property + def y(self) -> int: + """Computed y from sign-magnitude representation.""" + return (self.y_value * -1) if self.y_sign else self.y_value + + class String3(bs.BinaryStruct): + p3: Annotated[bool, bs.bits(1)] + gamma_n_sign: Annotated[bool, bs.bits(1)] + gamma_n_value: Annotated[int, bs.bits(10)] + not_used: Annotated[bool, bs.bits(1)] + p: Annotated[int, bs.bits(2)] + l_n: Annotated[bool, bs.bits(1)] + z_vel_sign: Annotated[bool, bs.bits(1)] + z_vel_value: Annotated[int, bs.bits(23)] + z_accel_sign: Annotated[bool, bs.bits(1)] + z_accel_value: Annotated[int, bs.bits(4)] + z_sign: Annotated[bool, bs.bits(1)] + z_value: Annotated[int, bs.bits(26)] + + @property + def gamma_n(self) -> int: + """Computed gamma_n from sign-magnitude representation.""" + return (self.gamma_n_value * -1) if self.gamma_n_sign else self.gamma_n_value + + @property + def z_vel(self) -> int: + """Computed z_vel from sign-magnitude representation.""" + return (self.z_vel_value * -1) if self.z_vel_sign else self.z_vel_value + + @property + def z_accel(self) -> int: + """Computed z_accel from sign-magnitude representation.""" + return (self.z_accel_value * -1) if self.z_accel_sign else self.z_accel_value + + @property + def z(self) -> int: + """Computed z from sign-magnitude representation.""" + return (self.z_value * -1) if self.z_sign else self.z_value + + class String4(bs.BinaryStruct): + tau_n_sign: Annotated[bool, bs.bits(1)] + tau_n_value: Annotated[int, bs.bits(21)] + delta_tau_n_sign: Annotated[bool, bs.bits(1)] + delta_tau_n_value: Annotated[int, bs.bits(4)] + e_n: Annotated[int, bs.bits(5)] + not_used_1: Annotated[int, bs.bits(14)] + p4: Annotated[bool, bs.bits(1)] + f_t: Annotated[int, bs.bits(4)] + not_used_2: Annotated[int, bs.bits(3)] + n_t: Annotated[int, bs.bits(11)] + n: Annotated[int, bs.bits(5)] + m: Annotated[int, bs.bits(2)] + + @property + def tau_n(self) -> int: + """Computed tau_n from sign-magnitude representation.""" + return (self.tau_n_value * -1) if self.tau_n_sign else self.tau_n_value + + @property + def delta_tau_n(self) -> int: + """Computed delta_tau_n from sign-magnitude representation.""" + return (self.delta_tau_n_value * -1) if self.delta_tau_n_sign else self.delta_tau_n_value + + class String5(bs.BinaryStruct): + n_a: Annotated[int, bs.bits(11)] + tau_c: Annotated[int, bs.bits(32)] + not_used: Annotated[bool, bs.bits(1)] + n_4: Annotated[int, bs.bits(5)] + tau_gps: Annotated[int, bs.bits(22)] + l_n: Annotated[bool, bs.bits(1)] + + class StringNonImmediate(bs.BinaryStruct): + data_1: Annotated[int, bs.bits(64)] + data_2: Annotated[int, bs.bits(8)] + + idle_chip: Annotated[bool, bs.bits(1)] + string_number: Annotated[int, bs.bits(4)] + data: Annotated[ + object, + bs.switch( + 'string_number', + { + 1: String1, + 2: String2, + 3: String3, + 4: String4, + 5: String5, + }, + default=StringNonImmediate, + ), + ] + hamming_code: Annotated[int, bs.bits(8)] + pad_1: Annotated[int, bs.bits(11)] + superframe_number: Annotated[int, bs.bits(16)] + pad_2: Annotated[int, bs.bits(8)] + frame_number: Annotated[int, bs.bits(8)] diff --git a/system/ubloxd/gps.ksy b/system/ubloxd/gps.ksy deleted file mode 100644 index 893ad1b25..000000000 --- a/system/ubloxd/gps.ksy +++ /dev/null @@ -1,189 +0,0 @@ -# https://www.gps.gov/technical/icwg/IS-GPS-200E.pdf -meta: - id: gps - endian: be - bit-endian: be -seq: - - id: tlm - type: tlm - - id: how - type: how - - id: body - type: - switch-on: how.subframe_id - cases: - 1: subframe_1 - 2: subframe_2 - 3: subframe_3 - 4: subframe_4 -types: - tlm: - seq: - - id: preamble - contents: [0x8b] - - id: tlm - type: b14 - - id: integrity_status - type: b1 - - id: reserved - type: b1 - how: - seq: - - id: tow_count - type: b17 - - id: alert - type: b1 - - id: anti_spoof - type: b1 - - id: subframe_id - type: b3 - - id: reserved - type: b2 - subframe_1: - seq: - # Word 3 - - id: week_no - type: b10 - - id: code - type: b2 - - id: sv_accuracy - type: b4 - - id: sv_health - type: b6 - - id: iodc_msb - type: b2 - # Word 4 - - id: l2_p_data_flag - type: b1 - - id: reserved1 - type: b23 - # Word 5 - - id: reserved2 - type: b24 - # Word 6 - - id: reserved3 - type: b24 - # Word 7 - - id: reserved4 - type: b16 - - id: t_gd - type: s1 - # Word 8 - - id: iodc_lsb - type: u1 - - id: t_oc - type: u2 - # Word 9 - - id: af_2 - type: s1 - - id: af_1 - type: s2 - # Word 10 - - id: af_0_sign - type: b1 - - id: af_0_value - type: b21 - - id: reserved5 - type: b2 - instances: - af_0: - value: 'af_0_sign ? (af_0_value - (1 << 21)) : af_0_value' - subframe_2: - seq: - # Word 3 - - id: iode - type: u1 - - id: c_rs - type: s2 - # Word 4 & 5 - - id: delta_n - type: s2 - - id: m_0 - type: s4 - # Word 6 & 7 - - id: c_uc - type: s2 - - id: e - type: s4 - # Word 8 & 9 - - id: c_us - type: s2 - - id: sqrt_a - type: u4 - # Word 10 - - id: t_oe - type: u2 - - id: fit_interval_flag - type: b1 - - id: aoda - type: b5 - - id: reserved - type: b2 - subframe_3: - seq: - # Word 3 & 4 - - id: c_ic - type: s2 - - id: omega_0 - type: s4 - # Word 5 & 6 - - id: c_is - type: s2 - - id: i_0 - type: s4 - # Word 7 & 8 - - id: c_rc - type: s2 - - id: omega - type: s4 - # Word 9 - - id: omega_dot_sign - type: b1 - - id: omega_dot_value - type: b23 - # Word 10 - - id: iode - type: u1 - - id: idot_sign - type: b1 - - id: idot_value - type: b13 - - id: reserved - type: b2 - instances: - omega_dot: - value: 'omega_dot_sign ? (omega_dot_value - (1 << 23)) : omega_dot_value' - idot: - value: 'idot_sign ? (idot_value - (1 << 13)) : idot_value' - subframe_4: - seq: - # Word 3 - - id: data_id - type: b2 - - id: page_id - type: b6 - - id: body - type: - switch-on: page_id - cases: - 56: ionosphere_data - types: - ionosphere_data: - seq: - - id: a0 - type: s1 - - id: a1 - type: s1 - - id: a2 - type: s1 - - id: a3 - type: s1 - - id: b0 - type: s1 - - id: b1 - type: s1 - - id: b2 - type: s1 - - id: b3 - type: s1 - diff --git a/system/ubloxd/gps.py b/system/ubloxd/gps.py new file mode 100644 index 000000000..1c0833bd9 --- /dev/null +++ b/system/ubloxd/gps.py @@ -0,0 +1,116 @@ +""" +Parses GPS navigation subframes per IS-GPS-200E specification. +https://www.gps.gov/technical/icwg/IS-GPS-200E.pdf +""" + +from typing import Annotated + +from openpilot.system.ubloxd import binary_struct as bs + + +class Gps(bs.BinaryStruct): + class Tlm(bs.BinaryStruct): + preamble: Annotated[bytes, bs.const(bs.bytes_field(1), b"\x8b")] + tlm: Annotated[int, bs.bits(14)] + integrity_status: Annotated[bool, bs.bits(1)] + reserved: Annotated[bool, bs.bits(1)] + + class How(bs.BinaryStruct): + tow_count: Annotated[int, bs.bits(17)] + alert: Annotated[bool, bs.bits(1)] + anti_spoof: Annotated[bool, bs.bits(1)] + subframe_id: Annotated[int, bs.bits(3)] + reserved: Annotated[int, bs.bits(2)] + + class Subframe1(bs.BinaryStruct): + week_no: Annotated[int, bs.bits(10)] + code: Annotated[int, bs.bits(2)] + sv_accuracy: Annotated[int, bs.bits(4)] + sv_health: Annotated[int, bs.bits(6)] + iodc_msb: Annotated[int, bs.bits(2)] + l2_p_data_flag: Annotated[bool, bs.bits(1)] + reserved1: Annotated[int, bs.bits(23)] + reserved2: Annotated[int, bs.bits(24)] + reserved3: Annotated[int, bs.bits(24)] + reserved4: Annotated[int, bs.bits(16)] + t_gd: Annotated[int, bs.s8] + iodc_lsb: Annotated[int, bs.u8] + t_oc: Annotated[int, bs.u16be] + af_2: Annotated[int, bs.s8] + af_1: Annotated[int, bs.s16be] + af_0_sign: Annotated[bool, bs.bits(1)] + af_0_value: Annotated[int, bs.bits(21)] + reserved5: Annotated[int, bs.bits(2)] + + @property + def af_0(self) -> int: + """Computed af_0 from sign-magnitude representation.""" + return (self.af_0_value - (1 << 21)) if self.af_0_sign else self.af_0_value + + class Subframe2(bs.BinaryStruct): + iode: Annotated[int, bs.u8] + c_rs: Annotated[int, bs.s16be] + delta_n: Annotated[int, bs.s16be] + m_0: Annotated[int, bs.s32be] + c_uc: Annotated[int, bs.s16be] + e: Annotated[int, bs.s32be] + c_us: Annotated[int, bs.s16be] + sqrt_a: Annotated[int, bs.u32be] + t_oe: Annotated[int, bs.u16be] + fit_interval_flag: Annotated[bool, bs.bits(1)] + aoda: Annotated[int, bs.bits(5)] + reserved: Annotated[int, bs.bits(2)] + + class Subframe3(bs.BinaryStruct): + c_ic: Annotated[int, bs.s16be] + omega_0: Annotated[int, bs.s32be] + c_is: Annotated[int, bs.s16be] + i_0: Annotated[int, bs.s32be] + c_rc: Annotated[int, bs.s16be] + omega: Annotated[int, bs.s32be] + omega_dot_sign: Annotated[bool, bs.bits(1)] + omega_dot_value: Annotated[int, bs.bits(23)] + iode: Annotated[int, bs.u8] + idot_sign: Annotated[bool, bs.bits(1)] + idot_value: Annotated[int, bs.bits(13)] + reserved: Annotated[int, bs.bits(2)] + + @property + def omega_dot(self) -> int: + """Computed omega_dot from sign-magnitude representation.""" + return (self.omega_dot_value - (1 << 23)) if self.omega_dot_sign else self.omega_dot_value + + @property + def idot(self) -> int: + """Computed idot from sign-magnitude representation.""" + return (self.idot_value - (1 << 13)) if self.idot_sign else self.idot_value + + class Subframe4(bs.BinaryStruct): + class IonosphereData(bs.BinaryStruct): + a0: Annotated[int, bs.s8] + a1: Annotated[int, bs.s8] + a2: Annotated[int, bs.s8] + a3: Annotated[int, bs.s8] + b0: Annotated[int, bs.s8] + b1: Annotated[int, bs.s8] + b2: Annotated[int, bs.s8] + b3: Annotated[int, bs.s8] + + data_id: Annotated[int, bs.bits(2)] + page_id: Annotated[int, bs.bits(6)] + body: Annotated[object, bs.switch('page_id', {56: IonosphereData})] + + tlm: Tlm + how: How + body: Annotated[ + object, + bs.switch( + 'how.subframe_id', + { + 1: Subframe1, + 2: Subframe2, + 3: Subframe3, + 4: Subframe4, + }, + ), + ] diff --git a/system/ubloxd/ubloxd.py b/system/ubloxd/ubloxd.py index 6882ad095..e55cadcf7 100755 --- a/system/ubloxd/ubloxd.py +++ b/system/ubloxd/ubloxd.py @@ -8,9 +8,9 @@ from dataclasses import dataclass from cereal import log from cereal import messaging -from openpilot.system.ubloxd.generated.ubx import Ubx -from openpilot.system.ubloxd.generated.gps import Gps -from openpilot.system.ubloxd.generated.glonass import Glonass +from openpilot.system.ubloxd.ubx import Ubx +from openpilot.system.ubloxd.gps import Gps +from openpilot.system.ubloxd.glonass import Glonass SECS_IN_MIN = 60 @@ -52,7 +52,7 @@ class UbxFramer: # find preamble if len(self.buf) < 2: break - start = self.buf.find(b"\xB5\x62") + start = self.buf.find(b"\xb5\x62") if start < 0: # no preamble in buffer self.buf.clear() @@ -98,9 +98,22 @@ class UbloxMsgParser: # user range accuracy in meters glonass_URA_lookup: dict[int, float] = { - 0: 1, 1: 2, 2: 2.5, 3: 4, 4: 5, 5: 7, - 6: 10, 7: 12, 8: 14, 9: 16, 10: 32, - 11: 64, 12: 128, 13: 256, 14: 512, 15: 1024, + 0: 1, + 1: 2, + 2: 2.5, + 3: 4, + 4: 5, + 5: 7, + 6: 10, + 7: 12, + 8: 14, + 9: 16, + 10: 32, + 11: 64, + 12: 128, + 13: 256, + 14: 512, + 15: 1024, } def __init__(self) -> None: @@ -121,7 +134,7 @@ class UbloxMsgParser: body = Ubx.NavPvt.from_bytes(payload) return self._gen_nav_pvt(body) if msg_type == 0x0213: - # Manually parse RXM-SFRBX to avoid Kaitai EOF on some frames + # Manually parse RXM-SFRBX to avoid EOF on some frames if len(payload) < 8: return None gnss_id = payload[0] @@ -134,7 +147,7 @@ class UbloxMsgParser: words: list[int] = [] off = 8 for _ in range(num_words): - words.append(int.from_bytes(payload[off:off+4], 'little')) + words.append(int.from_bytes(payload[off : off + 4], 'little')) off += 4 class _SfrbxView: @@ -143,6 +156,7 @@ class UbloxMsgParser: self.sv_id = sid self.freq_id = fid self.body = body + view = _SfrbxView(gnss_id, sv_id, freq_id, words) return self._gen_rxm_sfrbx(view) if msg_type == 0x0215: @@ -515,5 +529,6 @@ def main(): service, dat = res pm.send(service, dat) + if __name__ == '__main__': main() diff --git a/system/ubloxd/ubx.ksy b/system/ubloxd/ubx.ksy deleted file mode 100644 index 02c757fe7..000000000 --- a/system/ubloxd/ubx.ksy +++ /dev/null @@ -1,293 +0,0 @@ -meta: - id: ubx - endian: le -seq: - - id: magic - contents: [0xb5, 0x62] - - id: msg_type - type: u2be - - id: length - type: u2 - - id: body - type: - switch-on: msg_type - cases: - 0x0107: nav_pvt - 0x0213: rxm_sfrbx - 0x0215: rxm_rawx - 0x0a09: mon_hw - 0x0a0b: mon_hw2 - 0x0135: nav_sat -instances: - checksum: - pos: length + 6 - type: u2 - -types: - mon_hw: - seq: - - id: pin_sel - type: u4 - - id: pin_bank - type: u4 - - id: pin_dir - type: u4 - - id: pin_val - type: u4 - - id: noise_per_ms - type: u2 - - id: agc_cnt - type: u2 - - id: a_status - type: u1 - enum: antenna_status - - id: a_power - type: u1 - enum: antenna_power - - id: flags - type: u1 - - id: reserved1 - size: 1 - - id: used_mask - type: u4 - - id: vp - size: 17 - - id: jam_ind - type: u1 - - id: reserved2 - size: 2 - - id: pin_irq - type: u4 - - id: pull_h - type: u4 - - id: pull_l - type: u4 - enums: - antenna_status: - 0: init - 1: dontknow - 2: ok - 3: short - 4: open - antenna_power: - 0: off - 1: on - 2: dontknow - - mon_hw2: - seq: - - id: ofs_i - type: s1 - - id: mag_i - type: u1 - - id: ofs_q - type: s1 - - id: mag_q - type: u1 - - id: cfg_source - type: u1 - enum: config_source - - id: reserved1 - size: 3 - - id: low_lev_cfg - type: u4 - - id: reserved2 - size: 8 - - id: post_status - type: u4 - - id: reserved3 - size: 4 - - enums: - config_source: - 113: rom - 111: otp - 112: config_pins - 102: flash - - rxm_sfrbx: - seq: - - id: gnss_id - type: u1 - enum: gnss_type - - id: sv_id - type: u1 - - id: reserved1 - size: 1 - - id: freq_id - type: u1 - - id: num_words - type: u1 - - id: reserved2 - size: 1 - - id: version - type: u1 - - id: reserved3 - size: 1 - - id: body - type: u4 - repeat: expr - repeat-expr: num_words - - rxm_rawx: - seq: - - id: rcv_tow - type: f8 - - id: week - type: u2 - - id: leap_s - type: s1 - - id: num_meas - type: u1 - - id: rec_stat - type: u1 - - id: reserved1 - size: 3 - - id: meas - type: measurement - size: 32 - repeat: expr - repeat-expr: num_meas - types: - measurement: - seq: - - id: pr_mes - type: f8 - - id: cp_mes - type: f8 - - id: do_mes - type: f4 - - id: gnss_id - type: u1 - enum: gnss_type - - id: sv_id - type: u1 - - id: reserved2 - size: 1 - - id: freq_id - type: u1 - - id: lock_time - type: u2 - - id: cno - type: u1 - - id: pr_stdev - type: u1 - - id: cp_stdev - type: u1 - - id: do_stdev - type: u1 - - id: trk_stat - type: u1 - - id: reserved3 - size: 1 - nav_sat: - seq: - - id: itow - type: u4 - - id: version - type: u1 - - id: num_svs - type: u1 - - id: reserved - size: 2 - - id: svs - type: nav - size: 12 - repeat: expr - repeat-expr: num_svs - types: - nav: - seq: - - id: gnss_id - type: u1 - enum: gnss_type - - id: sv_id - type: u1 - - id: cno - type: u1 - - id: elev - type: s1 - - id: azim - type: s2 - - id: pr_res - type: s2 - - id: flags - type: u4 - - nav_pvt: - seq: - - id: i_tow - type: u4 - - id: year - type: u2 - - id: month - type: u1 - - id: day - type: u1 - - id: hour - type: u1 - - id: min - type: u1 - - id: sec - type: u1 - - id: valid - type: u1 - - id: t_acc - type: u4 - - id: nano - type: s4 - - id: fix_type - type: u1 - - id: flags - type: u1 - - id: flags2 - type: u1 - - id: num_sv - type: u1 - - id: lon - type: s4 - - id: lat - type: s4 - - id: height - type: s4 - - id: h_msl - type: s4 - - id: h_acc - type: u4 - - id: v_acc - type: u4 - - id: vel_n - type: s4 - - id: vel_e - type: s4 - - id: vel_d - type: s4 - - id: g_speed - type: s4 - - id: head_mot - type: s4 - - id: s_acc - type: s4 - - id: head_acc - type: u4 - - id: p_dop - type: u2 - - id: flags3 - type: u1 - - id: reserved1 - size: 5 - - id: head_veh - type: s4 - - id: mag_dec - type: s2 - - id: mag_acc - type: u2 -enums: - gnss_type: - 0: gps - 1: sbas - 2: galileo - 3: beidou - 4: imes - 5: qzss - 6: glonass diff --git a/system/ubloxd/ubx.py b/system/ubloxd/ubx.py new file mode 100644 index 000000000..857498ebf --- /dev/null +++ b/system/ubloxd/ubx.py @@ -0,0 +1,180 @@ +""" +UBX protocol parser +""" + +from enum import IntEnum +from typing import Annotated + +from openpilot.system.ubloxd import binary_struct as bs + + +class GnssType(IntEnum): + gps = 0 + sbas = 1 + galileo = 2 + beidou = 3 + imes = 4 + qzss = 5 + glonass = 6 + + +class Ubx(bs.BinaryStruct): + GnssType = GnssType + + class RxmRawx(bs.BinaryStruct): + class Measurement(bs.BinaryStruct): + pr_mes: Annotated[float, bs.f64] + cp_mes: Annotated[float, bs.f64] + do_mes: Annotated[float, bs.f32] + gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)] + sv_id: Annotated[int, bs.u8] + reserved2: Annotated[bytes, bs.bytes_field(1)] + freq_id: Annotated[int, bs.u8] + lock_time: Annotated[int, bs.u16] + cno: Annotated[int, bs.u8] + pr_stdev: Annotated[int, bs.u8] + cp_stdev: Annotated[int, bs.u8] + do_stdev: Annotated[int, bs.u8] + trk_stat: Annotated[int, bs.u8] + reserved3: Annotated[bytes, bs.bytes_field(1)] + + rcv_tow: Annotated[float, bs.f64] + week: Annotated[int, bs.u16] + leap_s: Annotated[int, bs.s8] + num_meas: Annotated[int, bs.u8] + rec_stat: Annotated[int, bs.u8] + reserved1: Annotated[bytes, bs.bytes_field(3)] + meas: Annotated[list[Measurement], bs.array(Measurement, count_field='num_meas')] + + class RxmSfrbx(bs.BinaryStruct): + gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)] + sv_id: Annotated[int, bs.u8] + reserved1: Annotated[bytes, bs.bytes_field(1)] + freq_id: Annotated[int, bs.u8] + num_words: Annotated[int, bs.u8] + reserved2: Annotated[bytes, bs.bytes_field(1)] + version: Annotated[int, bs.u8] + reserved3: Annotated[bytes, bs.bytes_field(1)] + body: Annotated[list[int], bs.array(bs.u32, count_field='num_words')] + + class NavSat(bs.BinaryStruct): + class Nav(bs.BinaryStruct): + gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)] + sv_id: Annotated[int, bs.u8] + cno: Annotated[int, bs.u8] + elev: Annotated[int, bs.s8] + azim: Annotated[int, bs.s16] + pr_res: Annotated[int, bs.s16] + flags: Annotated[int, bs.u32] + + itow: Annotated[int, bs.u32] + version: Annotated[int, bs.u8] + num_svs: Annotated[int, bs.u8] + reserved: Annotated[bytes, bs.bytes_field(2)] + svs: Annotated[list[Nav], bs.array(Nav, count_field='num_svs')] + + class NavPvt(bs.BinaryStruct): + i_tow: Annotated[int, bs.u32] + year: Annotated[int, bs.u16] + month: Annotated[int, bs.u8] + day: Annotated[int, bs.u8] + hour: Annotated[int, bs.u8] + min: Annotated[int, bs.u8] + sec: Annotated[int, bs.u8] + valid: Annotated[int, bs.u8] + t_acc: Annotated[int, bs.u32] + nano: Annotated[int, bs.s32] + fix_type: Annotated[int, bs.u8] + flags: Annotated[int, bs.u8] + flags2: Annotated[int, bs.u8] + num_sv: Annotated[int, bs.u8] + lon: Annotated[int, bs.s32] + lat: Annotated[int, bs.s32] + height: Annotated[int, bs.s32] + h_msl: Annotated[int, bs.s32] + h_acc: Annotated[int, bs.u32] + v_acc: Annotated[int, bs.u32] + vel_n: Annotated[int, bs.s32] + vel_e: Annotated[int, bs.s32] + vel_d: Annotated[int, bs.s32] + g_speed: Annotated[int, bs.s32] + head_mot: Annotated[int, bs.s32] + s_acc: Annotated[int, bs.s32] + head_acc: Annotated[int, bs.u32] + p_dop: Annotated[int, bs.u16] + flags3: Annotated[int, bs.u8] + reserved1: Annotated[bytes, bs.bytes_field(5)] + head_veh: Annotated[int, bs.s32] + mag_dec: Annotated[int, bs.s16] + mag_acc: Annotated[int, bs.u16] + + class MonHw2(bs.BinaryStruct): + class ConfigSource(IntEnum): + flash = 102 + otp = 111 + config_pins = 112 + rom = 113 + + ofs_i: Annotated[int, bs.s8] + mag_i: Annotated[int, bs.u8] + ofs_q: Annotated[int, bs.s8] + mag_q: Annotated[int, bs.u8] + cfg_source: Annotated[ConfigSource | int, bs.enum(bs.u8, ConfigSource)] + reserved1: Annotated[bytes, bs.bytes_field(3)] + low_lev_cfg: Annotated[int, bs.u32] + reserved2: Annotated[bytes, bs.bytes_field(8)] + post_status: Annotated[int, bs.u32] + reserved3: Annotated[bytes, bs.bytes_field(4)] + + class MonHw(bs.BinaryStruct): + class AntennaStatus(IntEnum): + init = 0 + dontknow = 1 + ok = 2 + short = 3 + open = 4 + + class AntennaPower(IntEnum): + false = 0 + true = 1 + dontknow = 2 + + pin_sel: Annotated[int, bs.u32] + pin_bank: Annotated[int, bs.u32] + pin_dir: Annotated[int, bs.u32] + pin_val: Annotated[int, bs.u32] + noise_per_ms: Annotated[int, bs.u16] + agc_cnt: Annotated[int, bs.u16] + a_status: Annotated[AntennaStatus | int, bs.enum(bs.u8, AntennaStatus)] + a_power: Annotated[AntennaPower | int, bs.enum(bs.u8, AntennaPower)] + flags: Annotated[int, bs.u8] + reserved1: Annotated[bytes, bs.bytes_field(1)] + used_mask: Annotated[int, bs.u32] + vp: Annotated[bytes, bs.bytes_field(17)] + jam_ind: Annotated[int, bs.u8] + reserved2: Annotated[bytes, bs.bytes_field(2)] + pin_irq: Annotated[int, bs.u32] + pull_h: Annotated[int, bs.u32] + pull_l: Annotated[int, bs.u32] + + magic: Annotated[bytes, bs.const(bs.bytes_field(2), b"\xb5\x62")] + msg_type: Annotated[int, bs.u16be] + length: Annotated[int, bs.u16] + body: Annotated[ + object, + bs.substream( + 'length', + bs.switch( + 'msg_type', + { + 0x0107: NavPvt, + 0x0213: RxmSfrbx, + 0x0215: RxmRawx, + 0x0A09: MonHw, + 0x0A0B: MonHw2, + 0x0135: NavSat, + }, + ), + ), + ] + checksum: Annotated[int, bs.u16] diff --git a/uv.lock b/uv.lock index 2c5f32ec7..572305f08 100644 --- a/uv.lock +++ b/uv.lock @@ -768,15 +768,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/94/9e/820c4b086ad01ba7d77369fb8b11470a01fac9b4977f02e18659cf378b6b/json_rpc-1.15.0-py2.py3-none-any.whl", hash = "sha256:4a4668bbbe7116feb4abbd0f54e64a4adcf4b8f648f19ffa0848ad0f6606a9bf", size = 39450, upload-time = "2023-06-11T09:45:47.136Z" }, ] -[[package]] -name = "kaitaistruct" -version = "0.11" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/27/b8/ca7319556912f68832daa4b81425314857ec08dfccd8dbc8c0f65c992108/kaitaistruct-0.11.tar.gz", hash = "sha256:053ee764288e78b8e53acf748e9733268acbd579b8d82a427b1805453625d74b", size = 11519, upload-time = "2025-09-08T15:46:25.037Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/4a/4a/cf14bf3b1f5ffb13c69cf5f0ea78031247790558ee88984a8bdd22fae60d/kaitaistruct-0.11-py2.py3-none-any.whl", hash = "sha256:5c6ce79177b4e193a577ecd359e26516d1d6d000a0bffd6e1010f2a46a62a561", size = 11372, upload-time = "2025-09-08T15:46:23.635Z" }, -] - [[package]] name = "kiwisolver" version = "1.4.9" @@ -1296,7 +1287,6 @@ dependencies = [ { name = "inputs" }, { name = "jeepney" }, { name = "json-rpc" }, - { name = "kaitaistruct" }, { name = "libusb1" }, { name = "mapbox-earcut" }, { name = "numpy" }, @@ -1384,7 +1374,6 @@ requires-dist = [ { name = "jeepney" }, { name = "jinja2", marker = "extra == 'docs'" }, { name = "json-rpc" }, - { name = "kaitaistruct" }, { name = "libusb1" }, { name = "mapbox-earcut" }, { name = "matplotlib", marker = "extra == 'dev'" },