mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-03-01 00:33:55 +08:00
car helpers file (#33379)
* helpers file
* clean up
old-commit-hash: 6d392a2e7b
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import capnp
|
||||
import os
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import cereal.messaging as messaging
|
||||
|
||||
@@ -21,10 +19,10 @@ from opendbc.car.car_helpers import get_car
|
||||
from opendbc.car.interfaces import CarInterfaceBase
|
||||
from openpilot.selfdrive.pandad import can_capnp_to_list, can_list_to_can_capnp
|
||||
from openpilot.selfdrive.car.car_specific import CarSpecificEvents, MockCarState
|
||||
from openpilot.selfdrive.car.helpers import convert_carControl, convert_to_capnp
|
||||
from openpilot.selfdrive.controls.lib.events import Events
|
||||
|
||||
REPLAY = "REPLAY" in os.environ
|
||||
_FIELDS = '__dataclass_fields__' # copy of dataclasses._FIELDS
|
||||
|
||||
EventName = car.CarEvent.EventName
|
||||
|
||||
@@ -61,71 +59,6 @@ def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket
|
||||
return can_recv, can_send
|
||||
|
||||
|
||||
def is_dataclass(obj):
|
||||
"""Similar to dataclasses.is_dataclass without instance type check checking"""
|
||||
return hasattr(obj, _FIELDS)
|
||||
|
||||
|
||||
def _asdictref_inner(obj) -> dict[str, Any] | Any:
|
||||
if is_dataclass(obj):
|
||||
ret = {}
|
||||
for field in getattr(obj, _FIELDS): # similar to dataclasses.fields()
|
||||
ret[field] = _asdictref_inner(getattr(obj, field))
|
||||
return ret
|
||||
elif isinstance(obj, (tuple, list)):
|
||||
return type(obj)(_asdictref_inner(v) for v in obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
def asdictref(obj) -> dict[str, Any]:
|
||||
"""
|
||||
Similar to dataclasses.asdict without recursive type checking and copy.deepcopy
|
||||
Note that the resulting dict will contain references to the original struct as a result
|
||||
"""
|
||||
if not is_dataclass(obj):
|
||||
raise TypeError("asdictref() should be called on dataclass instances")
|
||||
|
||||
return _asdictref_inner(obj)
|
||||
|
||||
|
||||
def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarControl.Actuators) -> capnp.lib.capnp._DynamicStructBuilder:
|
||||
struct_dict = asdictref(struct)
|
||||
|
||||
if isinstance(struct, structs.CarParams):
|
||||
del struct_dict['lateralTuning']
|
||||
struct_capnp = car.CarParams.new_message(**struct_dict)
|
||||
|
||||
# this is the only union, special handling
|
||||
which = struct.lateralTuning.which()
|
||||
struct_capnp.lateralTuning.init(which)
|
||||
lateralTuning_dict = asdictref(getattr(struct.lateralTuning, which))
|
||||
setattr(struct_capnp.lateralTuning, which, lateralTuning_dict)
|
||||
elif isinstance(struct, structs.CarState):
|
||||
struct_capnp = car.CarState.new_message(**struct_dict)
|
||||
elif isinstance(struct, structs.CarControl.Actuators):
|
||||
struct_capnp = car.CarControl.Actuators.new_message(**struct_dict)
|
||||
else:
|
||||
raise ValueError(f"Unsupported struct type: {type(struct)}")
|
||||
|
||||
return struct_capnp
|
||||
|
||||
|
||||
def convert_carControl(struct: capnp.lib.capnp._DynamicStructReader) -> structs.CarControl:
|
||||
# TODO: recursively handle any car struct as needed
|
||||
def remove_deprecated(s: dict) -> dict:
|
||||
return {k: v for k, v in s.items() if not k.endswith('DEPRECATED')}
|
||||
|
||||
struct_dict = struct.to_dict()
|
||||
struct_dataclass = structs.CarControl(**remove_deprecated({k: v for k, v in struct_dict.items() if not isinstance(k, dict)}))
|
||||
|
||||
struct_dataclass.actuators = structs.CarControl.Actuators(**remove_deprecated(struct_dict.get('actuators', {})))
|
||||
struct_dataclass.cruiseControl = structs.CarControl.CruiseControl(**remove_deprecated(struct_dict.get('cruiseControl', {})))
|
||||
struct_dataclass.hudControl = structs.CarControl.HUDControl(**remove_deprecated(struct_dict.get('hudControl', {})))
|
||||
|
||||
return struct_dataclass
|
||||
|
||||
|
||||
class Car:
|
||||
CI: CarInterfaceBase
|
||||
CP: structs.CarParams
|
||||
|
||||
72
selfdrive/car/helpers.py
Normal file
72
selfdrive/car/helpers.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import capnp
|
||||
from typing import Any
|
||||
|
||||
from cereal import car
|
||||
from opendbc.car import structs
|
||||
|
||||
_FIELDS = '__dataclass_fields__' # copy of dataclasses._FIELDS
|
||||
|
||||
|
||||
def is_dataclass(obj):
|
||||
"""Similar to dataclasses.is_dataclass without instance type check checking"""
|
||||
return hasattr(obj, _FIELDS)
|
||||
|
||||
|
||||
def _asdictref_inner(obj) -> dict[str, Any] | Any:
|
||||
if is_dataclass(obj):
|
||||
ret = {}
|
||||
for field in getattr(obj, _FIELDS): # similar to dataclasses.fields()
|
||||
ret[field] = _asdictref_inner(getattr(obj, field))
|
||||
return ret
|
||||
elif isinstance(obj, (tuple, list)):
|
||||
return type(obj)(_asdictref_inner(v) for v in obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
def asdictref(obj) -> dict[str, Any]:
|
||||
"""
|
||||
Similar to dataclasses.asdict without recursive type checking and copy.deepcopy
|
||||
Note that the resulting dict will contain references to the original struct as a result
|
||||
"""
|
||||
if not is_dataclass(obj):
|
||||
raise TypeError("asdictref() should be called on dataclass instances")
|
||||
|
||||
return _asdictref_inner(obj)
|
||||
|
||||
|
||||
def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarControl.Actuators) -> capnp.lib.capnp._DynamicStructBuilder:
|
||||
struct_dict = asdictref(struct)
|
||||
|
||||
if isinstance(struct, structs.CarParams):
|
||||
del struct_dict['lateralTuning']
|
||||
struct_capnp = car.CarParams.new_message(**struct_dict)
|
||||
|
||||
# this is the only union, special handling
|
||||
which = struct.lateralTuning.which()
|
||||
struct_capnp.lateralTuning.init(which)
|
||||
lateralTuning_dict = asdictref(getattr(struct.lateralTuning, which))
|
||||
setattr(struct_capnp.lateralTuning, which, lateralTuning_dict)
|
||||
elif isinstance(struct, structs.CarState):
|
||||
struct_capnp = car.CarState.new_message(**struct_dict)
|
||||
elif isinstance(struct, structs.CarControl.Actuators):
|
||||
struct_capnp = car.CarControl.Actuators.new_message(**struct_dict)
|
||||
else:
|
||||
raise ValueError(f"Unsupported struct type: {type(struct)}")
|
||||
|
||||
return struct_capnp
|
||||
|
||||
|
||||
def convert_carControl(struct: capnp.lib.capnp._DynamicStructReader) -> structs.CarControl:
|
||||
# TODO: recursively handle any car struct as needed
|
||||
def remove_deprecated(s: dict) -> dict:
|
||||
return {k: v for k, v in s.items() if not k.endswith('DEPRECATED')}
|
||||
|
||||
struct_dict = struct.to_dict()
|
||||
struct_dataclass = structs.CarControl(**remove_deprecated({k: v for k, v in struct_dict.items() if not isinstance(k, dict)}))
|
||||
|
||||
struct_dataclass.actuators = structs.CarControl.Actuators(**remove_deprecated(struct_dict.get('actuators', {})))
|
||||
struct_dataclass.cruiseControl = structs.CarControl.CruiseControl(**remove_deprecated(struct_dict.get('cruiseControl', {})))
|
||||
struct_dataclass.hudControl = structs.CarControl.HUDControl(**remove_deprecated(struct_dict.get('hudControl', {})))
|
||||
|
||||
return struct_dataclass
|
||||
Reference in New Issue
Block a user