Files
onepilot/sunnypilot/mapd/mapd_manager.py
github-actions[bot] 7fa972be6a sunnypilot v2026.02.09-4080
version: sunnypilot v2025.003.000 (dev)
date: 2026-02-09T02:04:38
master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
2026-02-09 02:04:38 +00:00

145 lines
5.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import json
import platform
import os
import glob
import shutil
from datetime import datetime
from openpilot.common.params import Params
from openpilot.common.realtime import Ratekeeper, config_realtime_process
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
from openpilot.sunnypilot.mapd.live_map_data.osm_map_data import OsmMapData
from openpilot.system.hardware.hw import Paths
from openpilot.sunnypilot.mapd import MAPD_PATH
from openpilot.sunnypilot.mapd.mapd_installer import VERSION, update_installed_version
# PFEIFER - MAPD {{
params = Params()
mem_params = Params("/dev/shm/params") if platform.system() != "Darwin" else params
# }} PFEIFER - MAPD
def get_files_for_cleanup() -> list[str]:
paths = [
f"{Paths.mapd_root()}/db",
f"{Paths.mapd_root()}/v*"
]
files_to_remove = []
for path in paths:
if os.path.exists(path):
files = glob.glob(path + '/**', recursive=True)
files_to_remove.extend(files)
# check for version and mapd files
if not os.path.isfile(MAPD_PATH):
files_to_remove.append(MAPD_PATH)
return files_to_remove
def cleanup_old_osm_data(files_to_remove: list[str]) -> None:
for file in files_to_remove:
# Remove trailing slash if path is file
if file.endswith('/') and os.path.isfile(file[:-1]):
file = file[:-1]
# Try to remove as file or symbolic link first
if os.path.islink(file) or os.path.isfile(file):
os.remove(file)
elif os.path.isdir(file): # If it's a directory
shutil.rmtree(file, ignore_errors=False)
def request_refresh_osm_location_data(nations: list[str], states: list[str] | None = None) -> None:
params.put("OsmDownloadedDate", str(datetime.now().timestamp()))
params.put_bool("OsmDbUpdatesCheck", False)
osm_download_locations = {
"nations": nations,
"states": states or []
}
print(f"Downloading maps for {json.dumps(osm_download_locations)}")
mem_params.put("OSMDownloadLocations", osm_download_locations)
def filter_nations_and_states(nations: list[str], states: list[str] | None = None) -> tuple[list[str], list[str]]:
"""Filters and prepares nation and state data for OSM map download.
If the nation is 'US' and a specific state is provided, the nation 'US' is removed from the list.
If the nation is 'US' and the state is 'All', the 'All' is removed from the list.
The idea behind these filters is that if a specific state in the US is provided,
there's no need to download map data for the entire US. Conversely,
if the state is unspecified (i.e., 'All'), we intend to download map data for the whole US,
and 'All' isn't a valid state name, so it's removed.
Parameters:
nations (list): A list of nations for which the map data is to be downloaded.
states (list, optional): A list of states for which the map data is to be downloaded. Defaults to None.
Returns:
tuple: Two lists. The first list is filtered nations and the second list is filtered states.
"""
if "US" in nations and states and not any(x.lower() == "all" for x in states):
# If a specific state in the US is provided, remove 'US' from nations
nations.remove("US")
elif "US" in nations and states and any(x.lower() == "all" for x in states):
# If 'All' is provided as a state (case invariant), remove those instances from states
states = [x for x in states if x.lower() != "all"]
elif "US" not in nations and states and any(x.lower() == "all" for x in states):
states.remove("All")
return nations, states or []
def update_osm_db() -> None:
if params.get_bool("OsmDbUpdatesCheck"):
cleanup_old_osm_data(get_files_for_cleanup())
country = params.get("OsmLocationName", return_default=True)
state = params.get("OsmStateName", return_default=True)
filtered_nations, filtered_states = filter_nations_and_states([country], [state])
request_refresh_osm_location_data(filtered_nations, filtered_states)
if not mem_params.get("OSMDownloadBounds"):
mem_params.put("OSMDownloadBounds", "")
if not mem_params.get("LastGPSPosition"):
mem_params.put("LastGPSPosition", "{}")
def main_thread():
update_installed_version(VERSION, params)
config_realtime_process([0, 1, 2, 3], 5)
rk = Ratekeeper(1, print_delay_threshold=None)
live_map_sp = OsmMapData()
# Create folder needed for OSM
try:
os.mkdir(Paths.mapd_root())
except FileExistsError:
pass
except PermissionError:
cloudlog.exception(f"mapd: failed to make {Paths.mapd_root()}")
while True:
show_alert = get_files_for_cleanup() and params.get_bool("OsmLocal")
set_offroad_alert("Offroad_OSMUpdateRequired", show_alert, "This alert will be cleared when new maps are downloaded.")
update_osm_db()
live_map_sp.tick()
rk.keep_time()
def main():
main_thread()
if __name__ == "__main__":
main()