version: sunnypilot v2025.003.000 (dev) date: 2026-02-09T02:04:38 master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
66 lines
2.0 KiB
Python
66 lines
2.0 KiB
Python
"""
|
|
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
|
|
|
|
This file is part of sunnypilot and is licensed under the MIT License.
|
|
See the LICENSE.md file in the root directory for more details.
|
|
"""
|
|
from abc import abstractmethod, ABC
|
|
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.constants import CV
|
|
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
|
|
from openpilot.sunnypilot.navd.helpers import coordinate_from_param
|
|
|
|
MAX_SPEED_LIMIT = V_CRUISE_UNSET * CV.KPH_TO_MS
|
|
|
|
|
|
class BaseMapData(ABC):
|
|
def __init__(self):
|
|
self.params = Params()
|
|
|
|
self.sm = messaging.SubMaster(['liveLocationKalman'])
|
|
self.pm = messaging.PubMaster(['liveMapDataSP'])
|
|
|
|
self.localizer_valid = False
|
|
self.last_bearing = None
|
|
self.last_position = coordinate_from_param("LastGPSPositionLLK", self.params)
|
|
|
|
@abstractmethod
|
|
def update_location(self) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_current_speed_limit(self) -> float:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_next_speed_limit_and_distance(self) -> tuple[float, float]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_current_road_name(self) -> str:
|
|
pass
|
|
|
|
def publish(self) -> None:
|
|
speed_limit = self.get_current_speed_limit()
|
|
next_speed_limit, next_speed_limit_distance = self.get_next_speed_limit_and_distance()
|
|
|
|
mapd_sp_send = messaging.new_message('liveMapDataSP')
|
|
mapd_sp_send.valid = self.sm['liveLocationKalman'].gpsOK
|
|
live_map_data = mapd_sp_send.liveMapDataSP
|
|
|
|
live_map_data.speedLimitValid = bool(MAX_SPEED_LIMIT > speed_limit > 0)
|
|
live_map_data.speedLimit = speed_limit
|
|
live_map_data.speedLimitAheadValid = bool(MAX_SPEED_LIMIT > next_speed_limit > 0)
|
|
live_map_data.speedLimitAhead = next_speed_limit
|
|
live_map_data.speedLimitAheadDistance = next_speed_limit_distance
|
|
live_map_data.roadName = self.get_current_road_name()
|
|
|
|
self.pm.send('liveMapDataSP', mapd_sp_send)
|
|
|
|
def tick(self) -> None:
|
|
self.sm.update(0)
|
|
self.update_location()
|
|
self.publish()
|