ubloxd: remove kaitai (#37055)

* rm kaitai

* lil less

* bs

* lil less

* lil less
This commit is contained in:
Adeeb Shihadeh
2026-02-01 20:00:55 -08:00
committed by GitHub
parent b03e7821d4
commit 5fc4c2b25c
15 changed files with 756 additions and 1407 deletions

View File

@@ -14,7 +14,6 @@ Decider('MD5-timestamp')
SetOption('num_jobs', max(1, int(os.cpu_count()/2)))
AddOption('--kaitai', action='store_true', help='Regenerate kaitai struct parsers')
AddOption('--asan', action='store_true', help='turn on ASAN')
AddOption('--ubsan', action='store_true', help='turn on UBSan')
AddOption('--mutation', action='store_true', help='generate mutation-ready code')
@@ -202,7 +201,6 @@ SConscript(['rednose/SConscript'])
# Build system services
SConscript([
'system/ubloxd/SConscript',
'system/loggerd/SConscript',
])

View File

@@ -33,9 +33,6 @@ dependencies = [
"pyopenssl < 24.3.0",
"pyaudio",
# ubloxd (TODO: just use struct)
"kaitaistruct",
# panda
"libusb1",
"spidev; platform_system == 'Linux'",

View File

@@ -1,11 +0,0 @@
Import('env')
if GetOption('kaitai'):
current_dir = Dir('./generated/').srcnode().abspath
python_cmd = f"kaitai-struct-compiler --target python --outdir {current_dir} $SOURCES"
env.Command(File('./generated/ubx.py'), 'ubx.ksy', python_cmd)
env.Command(File('./generated/gps.py'), 'gps.ksy', python_cmd)
env.Command(File('./generated/glonass.py'), 'glonass.ksy', python_cmd)
# kaitai issue: https://github.com/kaitai-io/kaitai_struct/issues/910
py_glonass_fix = env.Command(None, File('./generated/glonass.py'), "sed -i 's/self._io.align_to_byte()/# self._io.align_to_byte()/' $SOURCES")
env.Depends(py_glonass_fix, File('./generated/glonass.py'))

View File

@@ -0,0 +1,280 @@
"""
Binary struct parsing DSL.
Defines a declarative schema for binary messages using dataclasses
and type annotations.
"""
import struct
from enum import Enum
from dataclasses import dataclass, is_dataclass
from typing import Annotated, Any, TypeVar, get_args, get_origin
class FieldType:
"""Base class for field type descriptors."""
@dataclass(frozen=True)
class IntType(FieldType):
bits: int
signed: bool
big_endian: bool = False
@dataclass(frozen=True)
class FloatType(FieldType):
bits: int
@dataclass(frozen=True)
class BitsType(FieldType):
bits: int
@dataclass(frozen=True)
class BytesType(FieldType):
size: int
@dataclass(frozen=True)
class ArrayType(FieldType):
element_type: Any
count_field: str
@dataclass(frozen=True)
class SwitchType(FieldType):
selector: str
cases: dict[Any, Any]
default: Any = None
@dataclass(frozen=True)
class EnumType(FieldType):
base_type: FieldType
enum_cls: type[Enum]
@dataclass(frozen=True)
class ConstType(FieldType):
base_type: FieldType
expected: Any
@dataclass(frozen=True)
class SubstreamType(FieldType):
length_field: str
element_type: Any
# Common types - little endian
u8 = IntType(8, False)
u16 = IntType(16, False)
u32 = IntType(32, False)
s8 = IntType(8, True)
s16 = IntType(16, True)
s32 = IntType(32, True)
f32 = FloatType(32)
f64 = FloatType(64)
# Big endian variants
u16be = IntType(16, False, big_endian=True)
u32be = IntType(32, False, big_endian=True)
s16be = IntType(16, True, big_endian=True)
s32be = IntType(32, True, big_endian=True)
def bits(n: int) -> BitsType:
"""Create a bit-level field type."""
return BitsType(n)
def bytes_field(size: int) -> BytesType:
"""Create a fixed-size bytes field."""
return BytesType(size)
def array(element_type: Any, count_field: str) -> ArrayType:
"""Create an array/repeated field."""
return ArrayType(element_type, count_field)
def switch(selector: str, cases: dict[Any, Any], default: Any = None) -> SwitchType:
"""Create a switch-on field."""
return SwitchType(selector, cases, default)
def enum(base_type: Any, enum_cls: type[Enum]) -> EnumType:
"""Create an enum-wrapped field."""
field_type = _field_type_from_spec(base_type)
if field_type is None:
raise TypeError(f"Unsupported field type: {base_type!r}")
return EnumType(field_type, enum_cls)
def const(base_type: Any, expected: Any) -> ConstType:
"""Create a constant-value field."""
field_type = _field_type_from_spec(base_type)
if field_type is None:
raise TypeError(f"Unsupported field type: {base_type!r}")
return ConstType(field_type, expected)
def substream(length_field: str, element_type: Any) -> SubstreamType:
"""Parse a fixed-length substream using an inner schema."""
return SubstreamType(length_field, element_type)
class BinaryReader:
def __init__(self, data: bytes):
self.data = data
self.pos = 0
self.bit_pos = 0 # 0-7, position within current byte
def _require(self, n: int) -> None:
if self.pos + n > len(self.data):
raise EOFError("Unexpected end of data")
def _read_struct(self, fmt: str):
self._align_to_byte()
size = struct.calcsize(fmt)
self._require(size)
value = struct.unpack_from(fmt, self.data, self.pos)[0]
self.pos += size
return value
def read_bytes(self, n: int) -> bytes:
self._align_to_byte()
self._require(n)
result = self.data[self.pos : self.pos + n]
self.pos += n
return result
def read_bits_int_be(self, n: int) -> int:
result = 0
bits_remaining = n
while bits_remaining > 0:
if self.pos >= len(self.data):
raise EOFError("Unexpected end of data while reading bits")
bits_in_byte = 8 - self.bit_pos
bits_to_read = min(bits_remaining, bits_in_byte)
byte_val = self.data[self.pos]
shift = bits_in_byte - bits_to_read
mask = (1 << bits_to_read) - 1
extracted = (byte_val >> shift) & mask
result = (result << bits_to_read) | extracted
self.bit_pos += bits_to_read
bits_remaining -= bits_to_read
if self.bit_pos >= 8:
self.bit_pos = 0
self.pos += 1
return result
def _align_to_byte(self) -> None:
if self.bit_pos > 0:
self.bit_pos = 0
self.pos += 1
T = TypeVar('T', bound='BinaryStruct')
class BinaryStruct:
"""Base class for binary struct definitions."""
def __init_subclass__(cls, **kwargs) -> None:
super().__init_subclass__(**kwargs)
if cls is BinaryStruct:
return
if not is_dataclass(cls):
dataclass(init=False)(cls)
fields = list(getattr(cls, '__annotations__', {}).items())
cls.__binary_fields__ = fields # type: ignore[attr-defined]
@classmethod
def _read(inner_cls, reader: BinaryReader):
obj = inner_cls.__new__(inner_cls)
for name, spec in inner_cls.__binary_fields__:
value = _parse_field(spec, reader, obj)
setattr(obj, name, value)
return obj
cls._read = _read # type: ignore[attr-defined]
@classmethod
def from_bytes(cls: type[T], data: bytes) -> T:
"""Parse struct from bytes."""
reader = BinaryReader(data)
return cls._read(reader)
@classmethod
def _read(cls: type[T], reader: BinaryReader) -> T:
"""Override in subclasses to implement parsing."""
raise NotImplementedError
def _resolve_path(obj: Any, path: str) -> Any:
cur = obj
for part in path.split('.'):
cur = getattr(cur, part)
return cur
def _unwrap_annotated(spec: Any) -> tuple[Any, ...]:
if get_origin(spec) is Annotated:
return get_args(spec)[1:]
return ()
def _field_type_from_spec(spec: Any) -> FieldType | None:
if isinstance(spec, FieldType):
return spec
for item in _unwrap_annotated(spec):
if isinstance(item, FieldType):
return item
return None
def _int_format(field_type: IntType) -> str:
if field_type.bits == 8:
return 'b' if field_type.signed else 'B'
endian = '>' if field_type.big_endian else '<'
if field_type.bits == 16:
code = 'h' if field_type.signed else 'H'
elif field_type.bits == 32:
code = 'i' if field_type.signed else 'I'
else:
raise ValueError(f"Unsupported integer size: {field_type.bits}")
return f"{endian}{code}"
def _float_format(field_type: FloatType) -> str:
if field_type.bits == 32:
return '<f'
if field_type.bits == 64:
return '<d'
raise ValueError(f"Unsupported float size: {field_type.bits}")
def _parse_field(spec: Any, reader: BinaryReader, obj: Any) -> Any:
field_type = _field_type_from_spec(spec)
if field_type is not None:
spec = field_type
if isinstance(spec, ConstType):
value = _parse_field(spec.base_type, reader, obj)
if value != spec.expected:
raise ValueError(f"Invalid constant: expected {spec.expected!r}, got {value!r}")
return value
if isinstance(spec, EnumType):
raw = _parse_field(spec.base_type, reader, obj)
try:
return spec.enum_cls(raw)
except ValueError:
return raw
if isinstance(spec, SwitchType):
key = _resolve_path(obj, spec.selector)
target = spec.cases.get(key, spec.default)
if target is None:
return None
return _parse_field(target, reader, obj)
if isinstance(spec, ArrayType):
count = _resolve_path(obj, spec.count_field)
return [_parse_field(spec.element_type, reader, obj) for _ in range(int(count))]
if isinstance(spec, SubstreamType):
length = _resolve_path(obj, spec.length_field)
data = reader.read_bytes(int(length))
sub_reader = BinaryReader(data)
return _parse_field(spec.element_type, sub_reader, obj)
if isinstance(spec, IntType):
return reader._read_struct(_int_format(spec))
if isinstance(spec, FloatType):
return reader._read_struct(_float_format(spec))
if isinstance(spec, BitsType):
value = reader.read_bits_int_be(spec.bits)
return bool(value) if spec.bits == 1 else value
if isinstance(spec, BytesType):
return reader.read_bytes(spec.size)
if isinstance(spec, type) and issubclass(spec, BinaryStruct):
return spec._read(reader)
raise TypeError(f"Unsupported field spec: {spec!r}")

View File

@@ -1,247 +0,0 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))
class Glonass(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.idle_chip = self._io.read_bits_int_be(1) != 0
self.string_number = self._io.read_bits_int_be(4)
# workaround for kaitai bit alignment issue (see glonass_fix.patch for C++)
# self._io.align_to_byte()
_on = self.string_number
if _on == 4:
self.data = Glonass.String4(self._io, self, self._root)
elif _on == 1:
self.data = Glonass.String1(self._io, self, self._root)
elif _on == 3:
self.data = Glonass.String3(self._io, self, self._root)
elif _on == 5:
self.data = Glonass.String5(self._io, self, self._root)
elif _on == 2:
self.data = Glonass.String2(self._io, self, self._root)
else:
self.data = Glonass.StringNonImmediate(self._io, self, self._root)
self.hamming_code = self._io.read_bits_int_be(8)
self.pad_1 = self._io.read_bits_int_be(11)
self.superframe_number = self._io.read_bits_int_be(16)
self.pad_2 = self._io.read_bits_int_be(8)
self.frame_number = self._io.read_bits_int_be(8)
class String4(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.tau_n_sign = self._io.read_bits_int_be(1) != 0
self.tau_n_value = self._io.read_bits_int_be(21)
self.delta_tau_n_sign = self._io.read_bits_int_be(1) != 0
self.delta_tau_n_value = self._io.read_bits_int_be(4)
self.e_n = self._io.read_bits_int_be(5)
self.not_used_1 = self._io.read_bits_int_be(14)
self.p4 = self._io.read_bits_int_be(1) != 0
self.f_t = self._io.read_bits_int_be(4)
self.not_used_2 = self._io.read_bits_int_be(3)
self.n_t = self._io.read_bits_int_be(11)
self.n = self._io.read_bits_int_be(5)
self.m = self._io.read_bits_int_be(2)
@property
def tau_n(self):
if hasattr(self, '_m_tau_n'):
return self._m_tau_n
self._m_tau_n = ((self.tau_n_value * -1) if self.tau_n_sign else self.tau_n_value)
return getattr(self, '_m_tau_n', None)
@property
def delta_tau_n(self):
if hasattr(self, '_m_delta_tau_n'):
return self._m_delta_tau_n
self._m_delta_tau_n = ((self.delta_tau_n_value * -1) if self.delta_tau_n_sign else self.delta_tau_n_value)
return getattr(self, '_m_delta_tau_n', None)
class StringNonImmediate(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.data_1 = self._io.read_bits_int_be(64)
self.data_2 = self._io.read_bits_int_be(8)
class String5(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.n_a = self._io.read_bits_int_be(11)
self.tau_c = self._io.read_bits_int_be(32)
self.not_used = self._io.read_bits_int_be(1) != 0
self.n_4 = self._io.read_bits_int_be(5)
self.tau_gps = self._io.read_bits_int_be(22)
self.l_n = self._io.read_bits_int_be(1) != 0
class String1(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.not_used = self._io.read_bits_int_be(2)
self.p1 = self._io.read_bits_int_be(2)
self.t_k = self._io.read_bits_int_be(12)
self.x_vel_sign = self._io.read_bits_int_be(1) != 0
self.x_vel_value = self._io.read_bits_int_be(23)
self.x_accel_sign = self._io.read_bits_int_be(1) != 0
self.x_accel_value = self._io.read_bits_int_be(4)
self.x_sign = self._io.read_bits_int_be(1) != 0
self.x_value = self._io.read_bits_int_be(26)
@property
def x_vel(self):
if hasattr(self, '_m_x_vel'):
return self._m_x_vel
self._m_x_vel = ((self.x_vel_value * -1) if self.x_vel_sign else self.x_vel_value)
return getattr(self, '_m_x_vel', None)
@property
def x_accel(self):
if hasattr(self, '_m_x_accel'):
return self._m_x_accel
self._m_x_accel = ((self.x_accel_value * -1) if self.x_accel_sign else self.x_accel_value)
return getattr(self, '_m_x_accel', None)
@property
def x(self):
if hasattr(self, '_m_x'):
return self._m_x
self._m_x = ((self.x_value * -1) if self.x_sign else self.x_value)
return getattr(self, '_m_x', None)
class String2(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.b_n = self._io.read_bits_int_be(3)
self.p2 = self._io.read_bits_int_be(1) != 0
self.t_b = self._io.read_bits_int_be(7)
self.not_used = self._io.read_bits_int_be(5)
self.y_vel_sign = self._io.read_bits_int_be(1) != 0
self.y_vel_value = self._io.read_bits_int_be(23)
self.y_accel_sign = self._io.read_bits_int_be(1) != 0
self.y_accel_value = self._io.read_bits_int_be(4)
self.y_sign = self._io.read_bits_int_be(1) != 0
self.y_value = self._io.read_bits_int_be(26)
@property
def y_vel(self):
if hasattr(self, '_m_y_vel'):
return self._m_y_vel
self._m_y_vel = ((self.y_vel_value * -1) if self.y_vel_sign else self.y_vel_value)
return getattr(self, '_m_y_vel', None)
@property
def y_accel(self):
if hasattr(self, '_m_y_accel'):
return self._m_y_accel
self._m_y_accel = ((self.y_accel_value * -1) if self.y_accel_sign else self.y_accel_value)
return getattr(self, '_m_y_accel', None)
@property
def y(self):
if hasattr(self, '_m_y'):
return self._m_y
self._m_y = ((self.y_value * -1) if self.y_sign else self.y_value)
return getattr(self, '_m_y', None)
class String3(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.p3 = self._io.read_bits_int_be(1) != 0
self.gamma_n_sign = self._io.read_bits_int_be(1) != 0
self.gamma_n_value = self._io.read_bits_int_be(10)
self.not_used = self._io.read_bits_int_be(1) != 0
self.p = self._io.read_bits_int_be(2)
self.l_n = self._io.read_bits_int_be(1) != 0
self.z_vel_sign = self._io.read_bits_int_be(1) != 0
self.z_vel_value = self._io.read_bits_int_be(23)
self.z_accel_sign = self._io.read_bits_int_be(1) != 0
self.z_accel_value = self._io.read_bits_int_be(4)
self.z_sign = self._io.read_bits_int_be(1) != 0
self.z_value = self._io.read_bits_int_be(26)
@property
def gamma_n(self):
if hasattr(self, '_m_gamma_n'):
return self._m_gamma_n
self._m_gamma_n = ((self.gamma_n_value * -1) if self.gamma_n_sign else self.gamma_n_value)
return getattr(self, '_m_gamma_n', None)
@property
def z_vel(self):
if hasattr(self, '_m_z_vel'):
return self._m_z_vel
self._m_z_vel = ((self.z_vel_value * -1) if self.z_vel_sign else self.z_vel_value)
return getattr(self, '_m_z_vel', None)
@property
def z_accel(self):
if hasattr(self, '_m_z_accel'):
return self._m_z_accel
self._m_z_accel = ((self.z_accel_value * -1) if self.z_accel_sign else self.z_accel_value)
return getattr(self, '_m_z_accel', None)
@property
def z(self):
if hasattr(self, '_m_z'):
return self._m_z
self._m_z = ((self.z_value * -1) if self.z_sign else self.z_value)
return getattr(self, '_m_z', None)

View File

@@ -1,193 +0,0 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))
class Gps(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.tlm = Gps.Tlm(self._io, self, self._root)
self.how = Gps.How(self._io, self, self._root)
_on = self.how.subframe_id
if _on == 1:
self.body = Gps.Subframe1(self._io, self, self._root)
elif _on == 2:
self.body = Gps.Subframe2(self._io, self, self._root)
elif _on == 3:
self.body = Gps.Subframe3(self._io, self, self._root)
elif _on == 4:
self.body = Gps.Subframe4(self._io, self, self._root)
class Subframe1(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.week_no = self._io.read_bits_int_be(10)
self.code = self._io.read_bits_int_be(2)
self.sv_accuracy = self._io.read_bits_int_be(4)
self.sv_health = self._io.read_bits_int_be(6)
self.iodc_msb = self._io.read_bits_int_be(2)
self.l2_p_data_flag = self._io.read_bits_int_be(1) != 0
self.reserved1 = self._io.read_bits_int_be(23)
self.reserved2 = self._io.read_bits_int_be(24)
self.reserved3 = self._io.read_bits_int_be(24)
self.reserved4 = self._io.read_bits_int_be(16)
self._io.align_to_byte()
self.t_gd = self._io.read_s1()
self.iodc_lsb = self._io.read_u1()
self.t_oc = self._io.read_u2be()
self.af_2 = self._io.read_s1()
self.af_1 = self._io.read_s2be()
self.af_0_sign = self._io.read_bits_int_be(1) != 0
self.af_0_value = self._io.read_bits_int_be(21)
self.reserved5 = self._io.read_bits_int_be(2)
@property
def af_0(self):
if hasattr(self, '_m_af_0'):
return self._m_af_0
self._m_af_0 = ((self.af_0_value - (1 << 21)) if self.af_0_sign else self.af_0_value)
return getattr(self, '_m_af_0', None)
class Subframe3(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.c_ic = self._io.read_s2be()
self.omega_0 = self._io.read_s4be()
self.c_is = self._io.read_s2be()
self.i_0 = self._io.read_s4be()
self.c_rc = self._io.read_s2be()
self.omega = self._io.read_s4be()
self.omega_dot_sign = self._io.read_bits_int_be(1) != 0
self.omega_dot_value = self._io.read_bits_int_be(23)
self._io.align_to_byte()
self.iode = self._io.read_u1()
self.idot_sign = self._io.read_bits_int_be(1) != 0
self.idot_value = self._io.read_bits_int_be(13)
self.reserved = self._io.read_bits_int_be(2)
@property
def omega_dot(self):
if hasattr(self, '_m_omega_dot'):
return self._m_omega_dot
self._m_omega_dot = ((self.omega_dot_value - (1 << 23)) if self.omega_dot_sign else self.omega_dot_value)
return getattr(self, '_m_omega_dot', None)
@property
def idot(self):
if hasattr(self, '_m_idot'):
return self._m_idot
self._m_idot = ((self.idot_value - (1 << 13)) if self.idot_sign else self.idot_value)
return getattr(self, '_m_idot', None)
class Subframe4(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.data_id = self._io.read_bits_int_be(2)
self.page_id = self._io.read_bits_int_be(6)
self._io.align_to_byte()
_on = self.page_id
if _on == 56:
self.body = Gps.Subframe4.IonosphereData(self._io, self, self._root)
class IonosphereData(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.a0 = self._io.read_s1()
self.a1 = self._io.read_s1()
self.a2 = self._io.read_s1()
self.a3 = self._io.read_s1()
self.b0 = self._io.read_s1()
self.b1 = self._io.read_s1()
self.b2 = self._io.read_s1()
self.b3 = self._io.read_s1()
class How(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.tow_count = self._io.read_bits_int_be(17)
self.alert = self._io.read_bits_int_be(1) != 0
self.anti_spoof = self._io.read_bits_int_be(1) != 0
self.subframe_id = self._io.read_bits_int_be(3)
self.reserved = self._io.read_bits_int_be(2)
class Tlm(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.preamble = self._io.read_bytes(1)
if not self.preamble == b"\x8B":
raise kaitaistruct.ValidationNotEqualError(b"\x8B", self.preamble, self._io, u"/types/tlm/seq/0")
self.tlm = self._io.read_bits_int_be(14)
self.integrity_status = self._io.read_bits_int_be(1) != 0
self.reserved = self._io.read_bits_int_be(1) != 0
class Subframe2(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.iode = self._io.read_u1()
self.c_rs = self._io.read_s2be()
self.delta_n = self._io.read_s2be()
self.m_0 = self._io.read_s4be()
self.c_uc = self._io.read_s2be()
self.e = self._io.read_s4be()
self.c_us = self._io.read_s2be()
self.sqrt_a = self._io.read_u4be()
self.t_oe = self._io.read_u2be()
self.fit_interval_flag = self._io.read_bits_int_be(1) != 0
self.aoda = self._io.read_bits_int_be(5)
self.reserved = self._io.read_bits_int_be(2)

View File

@@ -1,273 +0,0 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
from enum import Enum
if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))
class Ubx(KaitaiStruct):
class GnssType(Enum):
gps = 0
sbas = 1
galileo = 2
beidou = 3
imes = 4
qzss = 5
glonass = 6
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.magic = self._io.read_bytes(2)
if not self.magic == b"\xB5\x62":
raise kaitaistruct.ValidationNotEqualError(b"\xB5\x62", self.magic, self._io, u"/seq/0")
self.msg_type = self._io.read_u2be()
self.length = self._io.read_u2le()
_on = self.msg_type
if _on == 2569:
self.body = Ubx.MonHw(self._io, self, self._root)
elif _on == 533:
self.body = Ubx.RxmRawx(self._io, self, self._root)
elif _on == 531:
self.body = Ubx.RxmSfrbx(self._io, self, self._root)
elif _on == 309:
self.body = Ubx.NavSat(self._io, self, self._root)
elif _on == 2571:
self.body = Ubx.MonHw2(self._io, self, self._root)
elif _on == 263:
self.body = Ubx.NavPvt(self._io, self, self._root)
class RxmRawx(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.rcv_tow = self._io.read_f8le()
self.week = self._io.read_u2le()
self.leap_s = self._io.read_s1()
self.num_meas = self._io.read_u1()
self.rec_stat = self._io.read_u1()
self.reserved1 = self._io.read_bytes(3)
self._raw_meas = []
self.meas = []
for i in range(self.num_meas):
self._raw_meas.append(self._io.read_bytes(32))
_io__raw_meas = KaitaiStream(BytesIO(self._raw_meas[i]))
self.meas.append(Ubx.RxmRawx.Measurement(_io__raw_meas, self, self._root))
class Measurement(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.pr_mes = self._io.read_f8le()
self.cp_mes = self._io.read_f8le()
self.do_mes = self._io.read_f4le()
self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1())
self.sv_id = self._io.read_u1()
self.reserved2 = self._io.read_bytes(1)
self.freq_id = self._io.read_u1()
self.lock_time = self._io.read_u2le()
self.cno = self._io.read_u1()
self.pr_stdev = self._io.read_u1()
self.cp_stdev = self._io.read_u1()
self.do_stdev = self._io.read_u1()
self.trk_stat = self._io.read_u1()
self.reserved3 = self._io.read_bytes(1)
class RxmSfrbx(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1())
self.sv_id = self._io.read_u1()
self.reserved1 = self._io.read_bytes(1)
self.freq_id = self._io.read_u1()
self.num_words = self._io.read_u1()
self.reserved2 = self._io.read_bytes(1)
self.version = self._io.read_u1()
self.reserved3 = self._io.read_bytes(1)
self.body = []
for i in range(self.num_words):
self.body.append(self._io.read_u4le())
class NavSat(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.itow = self._io.read_u4le()
self.version = self._io.read_u1()
self.num_svs = self._io.read_u1()
self.reserved = self._io.read_bytes(2)
self._raw_svs = []
self.svs = []
for i in range(self.num_svs):
self._raw_svs.append(self._io.read_bytes(12))
_io__raw_svs = KaitaiStream(BytesIO(self._raw_svs[i]))
self.svs.append(Ubx.NavSat.Nav(_io__raw_svs, self, self._root))
class Nav(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1())
self.sv_id = self._io.read_u1()
self.cno = self._io.read_u1()
self.elev = self._io.read_s1()
self.azim = self._io.read_s2le()
self.pr_res = self._io.read_s2le()
self.flags = self._io.read_u4le()
class NavPvt(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.i_tow = self._io.read_u4le()
self.year = self._io.read_u2le()
self.month = self._io.read_u1()
self.day = self._io.read_u1()
self.hour = self._io.read_u1()
self.min = self._io.read_u1()
self.sec = self._io.read_u1()
self.valid = self._io.read_u1()
self.t_acc = self._io.read_u4le()
self.nano = self._io.read_s4le()
self.fix_type = self._io.read_u1()
self.flags = self._io.read_u1()
self.flags2 = self._io.read_u1()
self.num_sv = self._io.read_u1()
self.lon = self._io.read_s4le()
self.lat = self._io.read_s4le()
self.height = self._io.read_s4le()
self.h_msl = self._io.read_s4le()
self.h_acc = self._io.read_u4le()
self.v_acc = self._io.read_u4le()
self.vel_n = self._io.read_s4le()
self.vel_e = self._io.read_s4le()
self.vel_d = self._io.read_s4le()
self.g_speed = self._io.read_s4le()
self.head_mot = self._io.read_s4le()
self.s_acc = self._io.read_s4le()
self.head_acc = self._io.read_u4le()
self.p_dop = self._io.read_u2le()
self.flags3 = self._io.read_u1()
self.reserved1 = self._io.read_bytes(5)
self.head_veh = self._io.read_s4le()
self.mag_dec = self._io.read_s2le()
self.mag_acc = self._io.read_u2le()
class MonHw2(KaitaiStruct):
class ConfigSource(Enum):
flash = 102
otp = 111
config_pins = 112
rom = 113
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.ofs_i = self._io.read_s1()
self.mag_i = self._io.read_u1()
self.ofs_q = self._io.read_s1()
self.mag_q = self._io.read_u1()
self.cfg_source = KaitaiStream.resolve_enum(Ubx.MonHw2.ConfigSource, self._io.read_u1())
self.reserved1 = self._io.read_bytes(3)
self.low_lev_cfg = self._io.read_u4le()
self.reserved2 = self._io.read_bytes(8)
self.post_status = self._io.read_u4le()
self.reserved3 = self._io.read_bytes(4)
class MonHw(KaitaiStruct):
class AntennaStatus(Enum):
init = 0
dontknow = 1
ok = 2
short = 3
open = 4
class AntennaPower(Enum):
false = 0
true = 1
dontknow = 2
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()
def _read(self):
self.pin_sel = self._io.read_u4le()
self.pin_bank = self._io.read_u4le()
self.pin_dir = self._io.read_u4le()
self.pin_val = self._io.read_u4le()
self.noise_per_ms = self._io.read_u2le()
self.agc_cnt = self._io.read_u2le()
self.a_status = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaStatus, self._io.read_u1())
self.a_power = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaPower, self._io.read_u1())
self.flags = self._io.read_u1()
self.reserved1 = self._io.read_bytes(1)
self.used_mask = self._io.read_u4le()
self.vp = self._io.read_bytes(17)
self.jam_ind = self._io.read_u1()
self.reserved2 = self._io.read_bytes(2)
self.pin_irq = self._io.read_u4le()
self.pull_h = self._io.read_u4le()
self.pull_l = self._io.read_u4le()
@property
def checksum(self):
if hasattr(self, '_m_checksum'):
return self._m_checksum
_pos = self._io.pos()
self._io.seek((self.length + 6))
self._m_checksum = self._io.read_u2le()
self._io.seek(_pos)
return getattr(self, '_m_checksum', None)

View File

@@ -1,176 +0,0 @@
# http://gauss.gge.unb.ca/GLONASS.ICD.pdf
# some variables are misprinted but good in the old doc
# https://www.unavco.org/help/glossary/docs/ICD_GLONASS_4.0_(1998)_en.pdf
meta:
id: glonass
endian: be
bit-endian: be
seq:
- id: idle_chip
type: b1
- id: string_number
type: b4
- id: data
type:
switch-on: string_number
cases:
1: string_1
2: string_2
3: string_3
4: string_4
5: string_5
_: string_non_immediate
- id: hamming_code
type: b8
- id: pad_1
type: b11
- id: superframe_number
type: b16
- id: pad_2
type: b8
- id: frame_number
type: b8
types:
string_1:
seq:
- id: not_used
type: b2
- id: p1
type: b2
- id: t_k
type: b12
- id: x_vel_sign
type: b1
- id: x_vel_value
type: b23
- id: x_accel_sign
type: b1
- id: x_accel_value
type: b4
- id: x_sign
type: b1
- id: x_value
type: b26
instances:
x_vel:
value: 'x_vel_sign ? (x_vel_value * (-1)) : x_vel_value'
x_accel:
value: 'x_accel_sign ? (x_accel_value * (-1)) : x_accel_value'
x:
value: 'x_sign ? (x_value * (-1)) : x_value'
string_2:
seq:
- id: b_n
type: b3
- id: p2
type: b1
- id: t_b
type: b7
- id: not_used
type: b5
- id: y_vel_sign
type: b1
- id: y_vel_value
type: b23
- id: y_accel_sign
type: b1
- id: y_accel_value
type: b4
- id: y_sign
type: b1
- id: y_value
type: b26
instances:
y_vel:
value: 'y_vel_sign ? (y_vel_value * (-1)) : y_vel_value'
y_accel:
value: 'y_accel_sign ? (y_accel_value * (-1)) : y_accel_value'
y:
value: 'y_sign ? (y_value * (-1)) : y_value'
string_3:
seq:
- id: p3
type: b1
- id: gamma_n_sign
type: b1
- id: gamma_n_value
type: b10
- id: not_used
type: b1
- id: p
type: b2
- id: l_n
type: b1
- id: z_vel_sign
type: b1
- id: z_vel_value
type: b23
- id: z_accel_sign
type: b1
- id: z_accel_value
type: b4
- id: z_sign
type: b1
- id: z_value
type: b26
instances:
gamma_n:
value: 'gamma_n_sign ? (gamma_n_value * (-1)) : gamma_n_value'
z_vel:
value: 'z_vel_sign ? (z_vel_value * (-1)) : z_vel_value'
z_accel:
value: 'z_accel_sign ? (z_accel_value * (-1)) : z_accel_value'
z:
value: 'z_sign ? (z_value * (-1)) : z_value'
string_4:
seq:
- id: tau_n_sign
type: b1
- id: tau_n_value
type: b21
- id: delta_tau_n_sign
type: b1
- id: delta_tau_n_value
type: b4
- id: e_n
type: b5
- id: not_used_1
type: b14
- id: p4
type: b1
- id: f_t
type: b4
- id: not_used_2
type: b3
- id: n_t
type: b11
- id: n
type: b5
- id: m
type: b2
instances:
tau_n:
value: 'tau_n_sign ? (tau_n_value * (-1)) : tau_n_value'
delta_tau_n:
value: 'delta_tau_n_sign ? (delta_tau_n_value * (-1)) : delta_tau_n_value'
string_5:
seq:
- id: n_a
type: b11
- id: tau_c
type: b32
- id: not_used
type: b1
- id: n_4
type: b5
- id: tau_gps
type: b22
- id: l_n
type: b1
string_non_immediate:
seq:
- id: data_1
type: b64
- id: data_2
type: b8

156
system/ubloxd/glonass.py Normal file
View File

@@ -0,0 +1,156 @@
"""
Parses GLONASS navigation strings per GLONASS ICD specification.
http://gauss.gge.unb.ca/GLONASS.ICD.pdf
https://www.unavco.org/help/glossary/docs/ICD_GLONASS_4.0_(1998)_en.pdf
"""
from typing import Annotated
from openpilot.system.ubloxd import binary_struct as bs
class Glonass(bs.BinaryStruct):
class String1(bs.BinaryStruct):
not_used: Annotated[int, bs.bits(2)]
p1: Annotated[int, bs.bits(2)]
t_k: Annotated[int, bs.bits(12)]
x_vel_sign: Annotated[bool, bs.bits(1)]
x_vel_value: Annotated[int, bs.bits(23)]
x_accel_sign: Annotated[bool, bs.bits(1)]
x_accel_value: Annotated[int, bs.bits(4)]
x_sign: Annotated[bool, bs.bits(1)]
x_value: Annotated[int, bs.bits(26)]
@property
def x_vel(self) -> int:
"""Computed x_vel from sign-magnitude representation."""
return (self.x_vel_value * -1) if self.x_vel_sign else self.x_vel_value
@property
def x_accel(self) -> int:
"""Computed x_accel from sign-magnitude representation."""
return (self.x_accel_value * -1) if self.x_accel_sign else self.x_accel_value
@property
def x(self) -> int:
"""Computed x from sign-magnitude representation."""
return (self.x_value * -1) if self.x_sign else self.x_value
class String2(bs.BinaryStruct):
b_n: Annotated[int, bs.bits(3)]
p2: Annotated[bool, bs.bits(1)]
t_b: Annotated[int, bs.bits(7)]
not_used: Annotated[int, bs.bits(5)]
y_vel_sign: Annotated[bool, bs.bits(1)]
y_vel_value: Annotated[int, bs.bits(23)]
y_accel_sign: Annotated[bool, bs.bits(1)]
y_accel_value: Annotated[int, bs.bits(4)]
y_sign: Annotated[bool, bs.bits(1)]
y_value: Annotated[int, bs.bits(26)]
@property
def y_vel(self) -> int:
"""Computed y_vel from sign-magnitude representation."""
return (self.y_vel_value * -1) if self.y_vel_sign else self.y_vel_value
@property
def y_accel(self) -> int:
"""Computed y_accel from sign-magnitude representation."""
return (self.y_accel_value * -1) if self.y_accel_sign else self.y_accel_value
@property
def y(self) -> int:
"""Computed y from sign-magnitude representation."""
return (self.y_value * -1) if self.y_sign else self.y_value
class String3(bs.BinaryStruct):
p3: Annotated[bool, bs.bits(1)]
gamma_n_sign: Annotated[bool, bs.bits(1)]
gamma_n_value: Annotated[int, bs.bits(10)]
not_used: Annotated[bool, bs.bits(1)]
p: Annotated[int, bs.bits(2)]
l_n: Annotated[bool, bs.bits(1)]
z_vel_sign: Annotated[bool, bs.bits(1)]
z_vel_value: Annotated[int, bs.bits(23)]
z_accel_sign: Annotated[bool, bs.bits(1)]
z_accel_value: Annotated[int, bs.bits(4)]
z_sign: Annotated[bool, bs.bits(1)]
z_value: Annotated[int, bs.bits(26)]
@property
def gamma_n(self) -> int:
"""Computed gamma_n from sign-magnitude representation."""
return (self.gamma_n_value * -1) if self.gamma_n_sign else self.gamma_n_value
@property
def z_vel(self) -> int:
"""Computed z_vel from sign-magnitude representation."""
return (self.z_vel_value * -1) if self.z_vel_sign else self.z_vel_value
@property
def z_accel(self) -> int:
"""Computed z_accel from sign-magnitude representation."""
return (self.z_accel_value * -1) if self.z_accel_sign else self.z_accel_value
@property
def z(self) -> int:
"""Computed z from sign-magnitude representation."""
return (self.z_value * -1) if self.z_sign else self.z_value
class String4(bs.BinaryStruct):
tau_n_sign: Annotated[bool, bs.bits(1)]
tau_n_value: Annotated[int, bs.bits(21)]
delta_tau_n_sign: Annotated[bool, bs.bits(1)]
delta_tau_n_value: Annotated[int, bs.bits(4)]
e_n: Annotated[int, bs.bits(5)]
not_used_1: Annotated[int, bs.bits(14)]
p4: Annotated[bool, bs.bits(1)]
f_t: Annotated[int, bs.bits(4)]
not_used_2: Annotated[int, bs.bits(3)]
n_t: Annotated[int, bs.bits(11)]
n: Annotated[int, bs.bits(5)]
m: Annotated[int, bs.bits(2)]
@property
def tau_n(self) -> int:
"""Computed tau_n from sign-magnitude representation."""
return (self.tau_n_value * -1) if self.tau_n_sign else self.tau_n_value
@property
def delta_tau_n(self) -> int:
"""Computed delta_tau_n from sign-magnitude representation."""
return (self.delta_tau_n_value * -1) if self.delta_tau_n_sign else self.delta_tau_n_value
class String5(bs.BinaryStruct):
n_a: Annotated[int, bs.bits(11)]
tau_c: Annotated[int, bs.bits(32)]
not_used: Annotated[bool, bs.bits(1)]
n_4: Annotated[int, bs.bits(5)]
tau_gps: Annotated[int, bs.bits(22)]
l_n: Annotated[bool, bs.bits(1)]
class StringNonImmediate(bs.BinaryStruct):
data_1: Annotated[int, bs.bits(64)]
data_2: Annotated[int, bs.bits(8)]
idle_chip: Annotated[bool, bs.bits(1)]
string_number: Annotated[int, bs.bits(4)]
data: Annotated[
object,
bs.switch(
'string_number',
{
1: String1,
2: String2,
3: String3,
4: String4,
5: String5,
},
default=StringNonImmediate,
),
]
hamming_code: Annotated[int, bs.bits(8)]
pad_1: Annotated[int, bs.bits(11)]
superframe_number: Annotated[int, bs.bits(16)]
pad_2: Annotated[int, bs.bits(8)]
frame_number: Annotated[int, bs.bits(8)]

View File

@@ -1,189 +0,0 @@
# https://www.gps.gov/technical/icwg/IS-GPS-200E.pdf
meta:
id: gps
endian: be
bit-endian: be
seq:
- id: tlm
type: tlm
- id: how
type: how
- id: body
type:
switch-on: how.subframe_id
cases:
1: subframe_1
2: subframe_2
3: subframe_3
4: subframe_4
types:
tlm:
seq:
- id: preamble
contents: [0x8b]
- id: tlm
type: b14
- id: integrity_status
type: b1
- id: reserved
type: b1
how:
seq:
- id: tow_count
type: b17
- id: alert
type: b1
- id: anti_spoof
type: b1
- id: subframe_id
type: b3
- id: reserved
type: b2
subframe_1:
seq:
# Word 3
- id: week_no
type: b10
- id: code
type: b2
- id: sv_accuracy
type: b4
- id: sv_health
type: b6
- id: iodc_msb
type: b2
# Word 4
- id: l2_p_data_flag
type: b1
- id: reserved1
type: b23
# Word 5
- id: reserved2
type: b24
# Word 6
- id: reserved3
type: b24
# Word 7
- id: reserved4
type: b16
- id: t_gd
type: s1
# Word 8
- id: iodc_lsb
type: u1
- id: t_oc
type: u2
# Word 9
- id: af_2
type: s1
- id: af_1
type: s2
# Word 10
- id: af_0_sign
type: b1
- id: af_0_value
type: b21
- id: reserved5
type: b2
instances:
af_0:
value: 'af_0_sign ? (af_0_value - (1 << 21)) : af_0_value'
subframe_2:
seq:
# Word 3
- id: iode
type: u1
- id: c_rs
type: s2
# Word 4 & 5
- id: delta_n
type: s2
- id: m_0
type: s4
# Word 6 & 7
- id: c_uc
type: s2
- id: e
type: s4
# Word 8 & 9
- id: c_us
type: s2
- id: sqrt_a
type: u4
# Word 10
- id: t_oe
type: u2
- id: fit_interval_flag
type: b1
- id: aoda
type: b5
- id: reserved
type: b2
subframe_3:
seq:
# Word 3 & 4
- id: c_ic
type: s2
- id: omega_0
type: s4
# Word 5 & 6
- id: c_is
type: s2
- id: i_0
type: s4
# Word 7 & 8
- id: c_rc
type: s2
- id: omega
type: s4
# Word 9
- id: omega_dot_sign
type: b1
- id: omega_dot_value
type: b23
# Word 10
- id: iode
type: u1
- id: idot_sign
type: b1
- id: idot_value
type: b13
- id: reserved
type: b2
instances:
omega_dot:
value: 'omega_dot_sign ? (omega_dot_value - (1 << 23)) : omega_dot_value'
idot:
value: 'idot_sign ? (idot_value - (1 << 13)) : idot_value'
subframe_4:
seq:
# Word 3
- id: data_id
type: b2
- id: page_id
type: b6
- id: body
type:
switch-on: page_id
cases:
56: ionosphere_data
types:
ionosphere_data:
seq:
- id: a0
type: s1
- id: a1
type: s1
- id: a2
type: s1
- id: a3
type: s1
- id: b0
type: s1
- id: b1
type: s1
- id: b2
type: s1
- id: b3
type: s1

116
system/ubloxd/gps.py Normal file
View File

@@ -0,0 +1,116 @@
"""
Parses GPS navigation subframes per IS-GPS-200E specification.
https://www.gps.gov/technical/icwg/IS-GPS-200E.pdf
"""
from typing import Annotated
from openpilot.system.ubloxd import binary_struct as bs
class Gps(bs.BinaryStruct):
class Tlm(bs.BinaryStruct):
preamble: Annotated[bytes, bs.const(bs.bytes_field(1), b"\x8b")]
tlm: Annotated[int, bs.bits(14)]
integrity_status: Annotated[bool, bs.bits(1)]
reserved: Annotated[bool, bs.bits(1)]
class How(bs.BinaryStruct):
tow_count: Annotated[int, bs.bits(17)]
alert: Annotated[bool, bs.bits(1)]
anti_spoof: Annotated[bool, bs.bits(1)]
subframe_id: Annotated[int, bs.bits(3)]
reserved: Annotated[int, bs.bits(2)]
class Subframe1(bs.BinaryStruct):
week_no: Annotated[int, bs.bits(10)]
code: Annotated[int, bs.bits(2)]
sv_accuracy: Annotated[int, bs.bits(4)]
sv_health: Annotated[int, bs.bits(6)]
iodc_msb: Annotated[int, bs.bits(2)]
l2_p_data_flag: Annotated[bool, bs.bits(1)]
reserved1: Annotated[int, bs.bits(23)]
reserved2: Annotated[int, bs.bits(24)]
reserved3: Annotated[int, bs.bits(24)]
reserved4: Annotated[int, bs.bits(16)]
t_gd: Annotated[int, bs.s8]
iodc_lsb: Annotated[int, bs.u8]
t_oc: Annotated[int, bs.u16be]
af_2: Annotated[int, bs.s8]
af_1: Annotated[int, bs.s16be]
af_0_sign: Annotated[bool, bs.bits(1)]
af_0_value: Annotated[int, bs.bits(21)]
reserved5: Annotated[int, bs.bits(2)]
@property
def af_0(self) -> int:
"""Computed af_0 from sign-magnitude representation."""
return (self.af_0_value - (1 << 21)) if self.af_0_sign else self.af_0_value
class Subframe2(bs.BinaryStruct):
iode: Annotated[int, bs.u8]
c_rs: Annotated[int, bs.s16be]
delta_n: Annotated[int, bs.s16be]
m_0: Annotated[int, bs.s32be]
c_uc: Annotated[int, bs.s16be]
e: Annotated[int, bs.s32be]
c_us: Annotated[int, bs.s16be]
sqrt_a: Annotated[int, bs.u32be]
t_oe: Annotated[int, bs.u16be]
fit_interval_flag: Annotated[bool, bs.bits(1)]
aoda: Annotated[int, bs.bits(5)]
reserved: Annotated[int, bs.bits(2)]
class Subframe3(bs.BinaryStruct):
c_ic: Annotated[int, bs.s16be]
omega_0: Annotated[int, bs.s32be]
c_is: Annotated[int, bs.s16be]
i_0: Annotated[int, bs.s32be]
c_rc: Annotated[int, bs.s16be]
omega: Annotated[int, bs.s32be]
omega_dot_sign: Annotated[bool, bs.bits(1)]
omega_dot_value: Annotated[int, bs.bits(23)]
iode: Annotated[int, bs.u8]
idot_sign: Annotated[bool, bs.bits(1)]
idot_value: Annotated[int, bs.bits(13)]
reserved: Annotated[int, bs.bits(2)]
@property
def omega_dot(self) -> int:
"""Computed omega_dot from sign-magnitude representation."""
return (self.omega_dot_value - (1 << 23)) if self.omega_dot_sign else self.omega_dot_value
@property
def idot(self) -> int:
"""Computed idot from sign-magnitude representation."""
return (self.idot_value - (1 << 13)) if self.idot_sign else self.idot_value
class Subframe4(bs.BinaryStruct):
class IonosphereData(bs.BinaryStruct):
a0: Annotated[int, bs.s8]
a1: Annotated[int, bs.s8]
a2: Annotated[int, bs.s8]
a3: Annotated[int, bs.s8]
b0: Annotated[int, bs.s8]
b1: Annotated[int, bs.s8]
b2: Annotated[int, bs.s8]
b3: Annotated[int, bs.s8]
data_id: Annotated[int, bs.bits(2)]
page_id: Annotated[int, bs.bits(6)]
body: Annotated[object, bs.switch('page_id', {56: IonosphereData})]
tlm: Tlm
how: How
body: Annotated[
object,
bs.switch(
'how.subframe_id',
{
1: Subframe1,
2: Subframe2,
3: Subframe3,
4: Subframe4,
},
),
]

View File

@@ -8,9 +8,9 @@ from dataclasses import dataclass
from cereal import log
from cereal import messaging
from openpilot.system.ubloxd.generated.ubx import Ubx
from openpilot.system.ubloxd.generated.gps import Gps
from openpilot.system.ubloxd.generated.glonass import Glonass
from openpilot.system.ubloxd.ubx import Ubx
from openpilot.system.ubloxd.gps import Gps
from openpilot.system.ubloxd.glonass import Glonass
SECS_IN_MIN = 60
@@ -52,7 +52,7 @@ class UbxFramer:
# find preamble
if len(self.buf) < 2:
break
start = self.buf.find(b"\xB5\x62")
start = self.buf.find(b"\xb5\x62")
if start < 0:
# no preamble in buffer
self.buf.clear()
@@ -98,9 +98,22 @@ class UbloxMsgParser:
# user range accuracy in meters
glonass_URA_lookup: dict[int, float] = {
0: 1, 1: 2, 2: 2.5, 3: 4, 4: 5, 5: 7,
6: 10, 7: 12, 8: 14, 9: 16, 10: 32,
11: 64, 12: 128, 13: 256, 14: 512, 15: 1024,
0: 1,
1: 2,
2: 2.5,
3: 4,
4: 5,
5: 7,
6: 10,
7: 12,
8: 14,
9: 16,
10: 32,
11: 64,
12: 128,
13: 256,
14: 512,
15: 1024,
}
def __init__(self) -> None:
@@ -121,7 +134,7 @@ class UbloxMsgParser:
body = Ubx.NavPvt.from_bytes(payload)
return self._gen_nav_pvt(body)
if msg_type == 0x0213:
# Manually parse RXM-SFRBX to avoid Kaitai EOF on some frames
# Manually parse RXM-SFRBX to avoid EOF on some frames
if len(payload) < 8:
return None
gnss_id = payload[0]
@@ -134,7 +147,7 @@ class UbloxMsgParser:
words: list[int] = []
off = 8
for _ in range(num_words):
words.append(int.from_bytes(payload[off:off+4], 'little'))
words.append(int.from_bytes(payload[off : off + 4], 'little'))
off += 4
class _SfrbxView:
@@ -143,6 +156,7 @@ class UbloxMsgParser:
self.sv_id = sid
self.freq_id = fid
self.body = body
view = _SfrbxView(gnss_id, sv_id, freq_id, words)
return self._gen_rxm_sfrbx(view)
if msg_type == 0x0215:
@@ -515,5 +529,6 @@ def main():
service, dat = res
pm.send(service, dat)
if __name__ == '__main__':
main()

View File

@@ -1,293 +0,0 @@
meta:
id: ubx
endian: le
seq:
- id: magic
contents: [0xb5, 0x62]
- id: msg_type
type: u2be
- id: length
type: u2
- id: body
type:
switch-on: msg_type
cases:
0x0107: nav_pvt
0x0213: rxm_sfrbx
0x0215: rxm_rawx
0x0a09: mon_hw
0x0a0b: mon_hw2
0x0135: nav_sat
instances:
checksum:
pos: length + 6
type: u2
types:
mon_hw:
seq:
- id: pin_sel
type: u4
- id: pin_bank
type: u4
- id: pin_dir
type: u4
- id: pin_val
type: u4
- id: noise_per_ms
type: u2
- id: agc_cnt
type: u2
- id: a_status
type: u1
enum: antenna_status
- id: a_power
type: u1
enum: antenna_power
- id: flags
type: u1
- id: reserved1
size: 1
- id: used_mask
type: u4
- id: vp
size: 17
- id: jam_ind
type: u1
- id: reserved2
size: 2
- id: pin_irq
type: u4
- id: pull_h
type: u4
- id: pull_l
type: u4
enums:
antenna_status:
0: init
1: dontknow
2: ok
3: short
4: open
antenna_power:
0: off
1: on
2: dontknow
mon_hw2:
seq:
- id: ofs_i
type: s1
- id: mag_i
type: u1
- id: ofs_q
type: s1
- id: mag_q
type: u1
- id: cfg_source
type: u1
enum: config_source
- id: reserved1
size: 3
- id: low_lev_cfg
type: u4
- id: reserved2
size: 8
- id: post_status
type: u4
- id: reserved3
size: 4
enums:
config_source:
113: rom
111: otp
112: config_pins
102: flash
rxm_sfrbx:
seq:
- id: gnss_id
type: u1
enum: gnss_type
- id: sv_id
type: u1
- id: reserved1
size: 1
- id: freq_id
type: u1
- id: num_words
type: u1
- id: reserved2
size: 1
- id: version
type: u1
- id: reserved3
size: 1
- id: body
type: u4
repeat: expr
repeat-expr: num_words
rxm_rawx:
seq:
- id: rcv_tow
type: f8
- id: week
type: u2
- id: leap_s
type: s1
- id: num_meas
type: u1
- id: rec_stat
type: u1
- id: reserved1
size: 3
- id: meas
type: measurement
size: 32
repeat: expr
repeat-expr: num_meas
types:
measurement:
seq:
- id: pr_mes
type: f8
- id: cp_mes
type: f8
- id: do_mes
type: f4
- id: gnss_id
type: u1
enum: gnss_type
- id: sv_id
type: u1
- id: reserved2
size: 1
- id: freq_id
type: u1
- id: lock_time
type: u2
- id: cno
type: u1
- id: pr_stdev
type: u1
- id: cp_stdev
type: u1
- id: do_stdev
type: u1
- id: trk_stat
type: u1
- id: reserved3
size: 1
nav_sat:
seq:
- id: itow
type: u4
- id: version
type: u1
- id: num_svs
type: u1
- id: reserved
size: 2
- id: svs
type: nav
size: 12
repeat: expr
repeat-expr: num_svs
types:
nav:
seq:
- id: gnss_id
type: u1
enum: gnss_type
- id: sv_id
type: u1
- id: cno
type: u1
- id: elev
type: s1
- id: azim
type: s2
- id: pr_res
type: s2
- id: flags
type: u4
nav_pvt:
seq:
- id: i_tow
type: u4
- id: year
type: u2
- id: month
type: u1
- id: day
type: u1
- id: hour
type: u1
- id: min
type: u1
- id: sec
type: u1
- id: valid
type: u1
- id: t_acc
type: u4
- id: nano
type: s4
- id: fix_type
type: u1
- id: flags
type: u1
- id: flags2
type: u1
- id: num_sv
type: u1
- id: lon
type: s4
- id: lat
type: s4
- id: height
type: s4
- id: h_msl
type: s4
- id: h_acc
type: u4
- id: v_acc
type: u4
- id: vel_n
type: s4
- id: vel_e
type: s4
- id: vel_d
type: s4
- id: g_speed
type: s4
- id: head_mot
type: s4
- id: s_acc
type: s4
- id: head_acc
type: u4
- id: p_dop
type: u2
- id: flags3
type: u1
- id: reserved1
size: 5
- id: head_veh
type: s4
- id: mag_dec
type: s2
- id: mag_acc
type: u2
enums:
gnss_type:
0: gps
1: sbas
2: galileo
3: beidou
4: imes
5: qzss
6: glonass

180
system/ubloxd/ubx.py Normal file
View File

@@ -0,0 +1,180 @@
"""
UBX protocol parser
"""
from enum import IntEnum
from typing import Annotated
from openpilot.system.ubloxd import binary_struct as bs
class GnssType(IntEnum):
gps = 0
sbas = 1
galileo = 2
beidou = 3
imes = 4
qzss = 5
glonass = 6
class Ubx(bs.BinaryStruct):
GnssType = GnssType
class RxmRawx(bs.BinaryStruct):
class Measurement(bs.BinaryStruct):
pr_mes: Annotated[float, bs.f64]
cp_mes: Annotated[float, bs.f64]
do_mes: Annotated[float, bs.f32]
gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)]
sv_id: Annotated[int, bs.u8]
reserved2: Annotated[bytes, bs.bytes_field(1)]
freq_id: Annotated[int, bs.u8]
lock_time: Annotated[int, bs.u16]
cno: Annotated[int, bs.u8]
pr_stdev: Annotated[int, bs.u8]
cp_stdev: Annotated[int, bs.u8]
do_stdev: Annotated[int, bs.u8]
trk_stat: Annotated[int, bs.u8]
reserved3: Annotated[bytes, bs.bytes_field(1)]
rcv_tow: Annotated[float, bs.f64]
week: Annotated[int, bs.u16]
leap_s: Annotated[int, bs.s8]
num_meas: Annotated[int, bs.u8]
rec_stat: Annotated[int, bs.u8]
reserved1: Annotated[bytes, bs.bytes_field(3)]
meas: Annotated[list[Measurement], bs.array(Measurement, count_field='num_meas')]
class RxmSfrbx(bs.BinaryStruct):
gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)]
sv_id: Annotated[int, bs.u8]
reserved1: Annotated[bytes, bs.bytes_field(1)]
freq_id: Annotated[int, bs.u8]
num_words: Annotated[int, bs.u8]
reserved2: Annotated[bytes, bs.bytes_field(1)]
version: Annotated[int, bs.u8]
reserved3: Annotated[bytes, bs.bytes_field(1)]
body: Annotated[list[int], bs.array(bs.u32, count_field='num_words')]
class NavSat(bs.BinaryStruct):
class Nav(bs.BinaryStruct):
gnss_id: Annotated[GnssType | int, bs.enum(bs.u8, GnssType)]
sv_id: Annotated[int, bs.u8]
cno: Annotated[int, bs.u8]
elev: Annotated[int, bs.s8]
azim: Annotated[int, bs.s16]
pr_res: Annotated[int, bs.s16]
flags: Annotated[int, bs.u32]
itow: Annotated[int, bs.u32]
version: Annotated[int, bs.u8]
num_svs: Annotated[int, bs.u8]
reserved: Annotated[bytes, bs.bytes_field(2)]
svs: Annotated[list[Nav], bs.array(Nav, count_field='num_svs')]
class NavPvt(bs.BinaryStruct):
i_tow: Annotated[int, bs.u32]
year: Annotated[int, bs.u16]
month: Annotated[int, bs.u8]
day: Annotated[int, bs.u8]
hour: Annotated[int, bs.u8]
min: Annotated[int, bs.u8]
sec: Annotated[int, bs.u8]
valid: Annotated[int, bs.u8]
t_acc: Annotated[int, bs.u32]
nano: Annotated[int, bs.s32]
fix_type: Annotated[int, bs.u8]
flags: Annotated[int, bs.u8]
flags2: Annotated[int, bs.u8]
num_sv: Annotated[int, bs.u8]
lon: Annotated[int, bs.s32]
lat: Annotated[int, bs.s32]
height: Annotated[int, bs.s32]
h_msl: Annotated[int, bs.s32]
h_acc: Annotated[int, bs.u32]
v_acc: Annotated[int, bs.u32]
vel_n: Annotated[int, bs.s32]
vel_e: Annotated[int, bs.s32]
vel_d: Annotated[int, bs.s32]
g_speed: Annotated[int, bs.s32]
head_mot: Annotated[int, bs.s32]
s_acc: Annotated[int, bs.s32]
head_acc: Annotated[int, bs.u32]
p_dop: Annotated[int, bs.u16]
flags3: Annotated[int, bs.u8]
reserved1: Annotated[bytes, bs.bytes_field(5)]
head_veh: Annotated[int, bs.s32]
mag_dec: Annotated[int, bs.s16]
mag_acc: Annotated[int, bs.u16]
class MonHw2(bs.BinaryStruct):
class ConfigSource(IntEnum):
flash = 102
otp = 111
config_pins = 112
rom = 113
ofs_i: Annotated[int, bs.s8]
mag_i: Annotated[int, bs.u8]
ofs_q: Annotated[int, bs.s8]
mag_q: Annotated[int, bs.u8]
cfg_source: Annotated[ConfigSource | int, bs.enum(bs.u8, ConfigSource)]
reserved1: Annotated[bytes, bs.bytes_field(3)]
low_lev_cfg: Annotated[int, bs.u32]
reserved2: Annotated[bytes, bs.bytes_field(8)]
post_status: Annotated[int, bs.u32]
reserved3: Annotated[bytes, bs.bytes_field(4)]
class MonHw(bs.BinaryStruct):
class AntennaStatus(IntEnum):
init = 0
dontknow = 1
ok = 2
short = 3
open = 4
class AntennaPower(IntEnum):
false = 0
true = 1
dontknow = 2
pin_sel: Annotated[int, bs.u32]
pin_bank: Annotated[int, bs.u32]
pin_dir: Annotated[int, bs.u32]
pin_val: Annotated[int, bs.u32]
noise_per_ms: Annotated[int, bs.u16]
agc_cnt: Annotated[int, bs.u16]
a_status: Annotated[AntennaStatus | int, bs.enum(bs.u8, AntennaStatus)]
a_power: Annotated[AntennaPower | int, bs.enum(bs.u8, AntennaPower)]
flags: Annotated[int, bs.u8]
reserved1: Annotated[bytes, bs.bytes_field(1)]
used_mask: Annotated[int, bs.u32]
vp: Annotated[bytes, bs.bytes_field(17)]
jam_ind: Annotated[int, bs.u8]
reserved2: Annotated[bytes, bs.bytes_field(2)]
pin_irq: Annotated[int, bs.u32]
pull_h: Annotated[int, bs.u32]
pull_l: Annotated[int, bs.u32]
magic: Annotated[bytes, bs.const(bs.bytes_field(2), b"\xb5\x62")]
msg_type: Annotated[int, bs.u16be]
length: Annotated[int, bs.u16]
body: Annotated[
object,
bs.substream(
'length',
bs.switch(
'msg_type',
{
0x0107: NavPvt,
0x0213: RxmSfrbx,
0x0215: RxmRawx,
0x0A09: MonHw,
0x0A0B: MonHw2,
0x0135: NavSat,
},
),
),
]
checksum: Annotated[int, bs.u16]

11
uv.lock generated
View File

@@ -768,15 +768,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/94/9e/820c4b086ad01ba7d77369fb8b11470a01fac9b4977f02e18659cf378b6b/json_rpc-1.15.0-py2.py3-none-any.whl", hash = "sha256:4a4668bbbe7116feb4abbd0f54e64a4adcf4b8f648f19ffa0848ad0f6606a9bf", size = 39450, upload-time = "2023-06-11T09:45:47.136Z" },
]
[[package]]
name = "kaitaistruct"
version = "0.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/27/b8/ca7319556912f68832daa4b81425314857ec08dfccd8dbc8c0f65c992108/kaitaistruct-0.11.tar.gz", hash = "sha256:053ee764288e78b8e53acf748e9733268acbd579b8d82a427b1805453625d74b", size = 11519, upload-time = "2025-09-08T15:46:25.037Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/4a/cf14bf3b1f5ffb13c69cf5f0ea78031247790558ee88984a8bdd22fae60d/kaitaistruct-0.11-py2.py3-none-any.whl", hash = "sha256:5c6ce79177b4e193a577ecd359e26516d1d6d000a0bffd6e1010f2a46a62a561", size = 11372, upload-time = "2025-09-08T15:46:23.635Z" },
]
[[package]]
name = "kiwisolver"
version = "1.4.9"
@@ -1296,7 +1287,6 @@ dependencies = [
{ name = "inputs" },
{ name = "jeepney" },
{ name = "json-rpc" },
{ name = "kaitaistruct" },
{ name = "libusb1" },
{ name = "mapbox-earcut" },
{ name = "numpy" },
@@ -1384,7 +1374,6 @@ requires-dist = [
{ name = "jeepney" },
{ name = "jinja2", marker = "extra == 'docs'" },
{ name = "json-rpc" },
{ name = "kaitaistruct" },
{ name = "libusb1" },
{ name = "mapbox-earcut" },
{ name = "matplotlib", marker = "extra == 'dev'" },