446 lines
18 KiB
Python
446 lines
18 KiB
Python
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
import requests
|
|
import shutil
|
|
import zipfile
|
|
|
|
from datetime import date, timedelta
|
|
from dateutil import easter
|
|
|
|
from openpilot.common.basedir import BASEDIR
|
|
from openpilot.common.params import Params
|
|
|
|
from openpilot.selfdrive.frogpilot.assets.download_functions import GITHUB_URL, GITLAB_URL, download_file, get_repository_url, handle_error, handle_request_error, verify_download
|
|
from openpilot.selfdrive.frogpilot.frogpilot_functions import ACTIVE_THEME_PATH, THEME_SAVE_PATH
|
|
from openpilot.selfdrive.frogpilot.frogpilot_utilities import update_frogpilot_toggles
|
|
|
|
CANCEL_DOWNLOAD_PARAM = "CancelThemeDownload"
|
|
DOWNLOAD_PROGRESS_PARAM = "ThemeDownloadProgress"
|
|
|
|
|
|
def update_theme_asset(asset_type, theme, holiday_theme, params):
|
|
save_location = os.path.join(ACTIVE_THEME_PATH, asset_type)
|
|
|
|
if holiday_theme:
|
|
asset_location = os.path.join(BASEDIR, "selfdrive", "frogpilot", "assets", "holiday_themes", holiday_theme, asset_type)
|
|
elif asset_type == "distance_icons":
|
|
asset_location = os.path.join(THEME_SAVE_PATH, "distance_icons", theme)
|
|
else:
|
|
asset_location = os.path.join(THEME_SAVE_PATH, "theme_packs", theme, asset_type)
|
|
|
|
if not os.path.exists(asset_location):
|
|
if asset_type == "colors":
|
|
params.put_bool("UseStockColors", True)
|
|
print("Using the stock color scheme instead")
|
|
return
|
|
elif asset_type in ("distance_icons", "icons"):
|
|
asset_location = os.path.join(BASEDIR, "selfdrive", "frogpilot", "assets", "stock_theme", asset_type)
|
|
print("Using the stock icon pack instead")
|
|
else:
|
|
if os.path.exists(save_location):
|
|
shutil.rmtree(save_location)
|
|
print(f"Using the stock {asset_type[:-1]} instead")
|
|
return
|
|
elif asset_type == "colors":
|
|
params.put_bool("UseStockColors", False)
|
|
|
|
if os.path.exists(save_location):
|
|
shutil.rmtree(save_location)
|
|
|
|
shutil.copytree(asset_location, save_location)
|
|
print(f"Copied {asset_location} to {save_location}")
|
|
|
|
|
|
def update_wheel_image(image, holiday_theme=None, random_event=True):
|
|
wheel_save_location = os.path.join(ACTIVE_THEME_PATH, "steering_wheel")
|
|
|
|
if holiday_theme:
|
|
wheel_location = os.path.join(BASEDIR, "selfdrive", "frogpilot", "assets", "holiday_themes", holiday_theme, "steering_wheel")
|
|
elif random_event:
|
|
wheel_location = os.path.join(BASEDIR, "selfdrive", "frogpilot", "assets", "random_events", "icons")
|
|
elif image == "stock":
|
|
wheel_location = os.path.join(BASEDIR, "selfdrive", "frogpilot", "assets", "stock_theme", "steering_wheel")
|
|
else:
|
|
wheel_location = os.path.join(THEME_SAVE_PATH, "steering_wheels")
|
|
|
|
if os.path.exists(wheel_save_location):
|
|
shutil.rmtree(wheel_save_location)
|
|
os.makedirs(wheel_save_location, exist_ok=True)
|
|
|
|
image_name = image.replace(" ", "_").lower()
|
|
for filename in os.listdir(wheel_location):
|
|
if os.path.splitext(filename)[0].lower() in (image_name, "wheel"):
|
|
source_file = os.path.join(wheel_location, filename)
|
|
destination_file = os.path.join(wheel_save_location, f"wheel{os.path.splitext(filename)[1]}")
|
|
shutil.copy2(source_file, destination_file)
|
|
print(f"Copied {source_file} to {destination_file}")
|
|
break
|
|
|
|
|
|
class ThemeManager:
|
|
def __init__(self):
|
|
self.params = Params()
|
|
self.params_memory = Params("/dev/shm/params")
|
|
|
|
self.previous_assets = {}
|
|
|
|
@staticmethod
|
|
def calculate_thanksgiving(year):
|
|
november_first = date(year, 11, 1)
|
|
day_of_week = november_first.weekday()
|
|
return november_first + timedelta(days=(3 - day_of_week + 21) % 7 + 21)
|
|
|
|
@staticmethod
|
|
def is_within_week_of(target_date, now):
|
|
start_of_week = target_date - timedelta(days=target_date.weekday())
|
|
return start_of_week <= now < target_date
|
|
|
|
@staticmethod
|
|
def fetch_files(url):
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
return [name for name in re.findall(r'href="[^"]*\/blob\/[^"]*\/([^"]*)"', response.text) if name.lower() != "license"]
|
|
except Exception as error:
|
|
handle_request_error(error, None, None, None, None)
|
|
return []
|
|
|
|
@staticmethod
|
|
def fetch_assets(repo_url):
|
|
repo = "FrogAi/FrogPilot-Resources"
|
|
branches = ["Themes", "Distance-Icons", "Steering-Wheels"]
|
|
|
|
assets = {
|
|
"themes": {},
|
|
"distance_icons": [],
|
|
"wheels": []
|
|
}
|
|
|
|
if "github" in repo_url:
|
|
base_api_url = "https://api.github.com/repos"
|
|
elif "gitlab" in repo_url:
|
|
base_api_url = "https://gitlab.com/api/v4/projects"
|
|
repo = repo.replace("/", "%2F")
|
|
else:
|
|
print(f"Unsupported repository URL: {repo_url}")
|
|
return assets
|
|
|
|
for branch in branches:
|
|
if "github" in repo_url:
|
|
api_url = f"{base_api_url}/{repo}/git/trees/{branch}?recursive=1"
|
|
elif "gitlab" in repo_url:
|
|
api_url = f"{base_api_url}/{repo}/repository/tree?ref={branch}&recursive=true"
|
|
|
|
try:
|
|
print(f"Fetching assets from branch '{branch}': {api_url}")
|
|
response = requests.get(api_url, timeout=10)
|
|
response.raise_for_status()
|
|
content = response.json()
|
|
|
|
if "github" in repo_url:
|
|
items = content.get('tree', [])
|
|
elif "gitlab" in repo_url:
|
|
items = content
|
|
|
|
for item in items:
|
|
if item["type"] != "blob":
|
|
continue
|
|
|
|
item_path = item["path"].lower()
|
|
if branch == "Themes":
|
|
theme_name = item["path"].split('/')[0]
|
|
assets["themes"].setdefault(theme_name, set())
|
|
if "icons" in item_path:
|
|
assets["themes"][theme_name].add("icons")
|
|
elif "signals" in item_path:
|
|
assets["themes"][theme_name].add("signals")
|
|
elif "sounds" in item_path:
|
|
assets["themes"][theme_name].add("sounds")
|
|
else:
|
|
assets["themes"][theme_name].add("colors")
|
|
|
|
elif branch == "Distance-Icons":
|
|
assets["distance_icons"].append(item["path"])
|
|
|
|
elif branch == "Steering-Wheels":
|
|
assets["wheels"].append(item["path"])
|
|
|
|
except requests.exceptions.RequestException as error:
|
|
print(f"Error occurred when fetching from branch '{branch}': {error}")
|
|
|
|
assets["themes"] = {k: list(v) for k, v in assets["themes"].items()}
|
|
return assets
|
|
|
|
def update_holiday(self):
|
|
now = date.today()
|
|
year = now.year
|
|
|
|
holidays = {
|
|
"new_years": date(year, 1, 1),
|
|
"valentines": date(year, 2, 14),
|
|
"st_patricks": date(year, 3, 17),
|
|
"world_frog_day": date(year, 3, 20),
|
|
"april_fools": date(year, 4, 1),
|
|
"easter_week": easter.easter(year),
|
|
"may_the_fourth": date(year, 5, 4),
|
|
"cinco_de_mayo": date(year, 5, 5),
|
|
"fourth_of_july": date(year, 7, 4),
|
|
"halloween_week": date(year, 10, 31),
|
|
"thanksgiving_week": self.calculate_thanksgiving(year),
|
|
"christmas_week": date(year, 12, 25)
|
|
}
|
|
|
|
for holiday, holiday_date in holidays.items():
|
|
if (holiday.endswith("_week") and self.is_within_week_of(holiday_date, now)) or (now == holiday_date):
|
|
if holiday != self.previous_assets.get("holiday_theme"):
|
|
self.params.put("CurrentHolidayTheme", holiday)
|
|
self.params_memory.put_bool("UpdateTheme", True)
|
|
return
|
|
|
|
if "holiday_theme" in self.previous_assets:
|
|
self.params.remove("CurrentHolidayTheme")
|
|
self.params_memory.put_bool("UpdateTheme", True)
|
|
self.previous_assets.pop("holiday_theme")
|
|
|
|
def update_active_theme(self):
|
|
if not os.path.exists(THEME_SAVE_PATH):
|
|
return
|
|
|
|
holiday_themes = self.params.get_bool("HolidayThemes")
|
|
current_holiday_theme = self.params.get("CurrentHolidayTheme", encoding='utf-8') if holiday_themes else None
|
|
|
|
if not current_holiday_theme and self.params.get_bool("PersonalizeOpenpilot"):
|
|
asset_mappings = {
|
|
"color_scheme": ("colors", self.params.get("CustomColors", encoding='utf-8')),
|
|
"distance_icons": ("distance_icons", self.params.get("CustomDistanceIcons", encoding='utf-8')),
|
|
"icon_pack": ("icons", self.params.get("CustomIcons", encoding='utf-8')),
|
|
"sound_pack": ("sounds", self.params.get("CustomSounds", encoding='utf-8')),
|
|
"turn_signal_pack": ("signals", self.params.get("CustomSignals", encoding='utf-8')),
|
|
"wheel_image": ("wheel_image", self.params.get("WheelIcon", encoding='utf-8'))
|
|
}
|
|
else:
|
|
asset_mappings = {
|
|
"color_scheme": ("colors", "stock"),
|
|
"distance_icons": ("distance_icons", "stock"),
|
|
"icon_pack": ("icons", "stock"),
|
|
"sound_pack": ("sounds", "stock"),
|
|
"turn_signal_pack": ("signals", "stock"),
|
|
"wheel_image": ("wheel_image", "stock")
|
|
}
|
|
|
|
theme_changed = False
|
|
for asset, (asset_type, current_value) in asset_mappings.items():
|
|
if current_value != self.previous_assets.get(asset) or current_holiday_theme != self.previous_assets.get("holiday_theme"):
|
|
print(f"Updating {asset}: {asset_type} with value {current_holiday_theme if current_holiday_theme else current_value}")
|
|
|
|
if asset_type == "wheel_image":
|
|
update_wheel_image(current_value, current_holiday_theme, random_event=False)
|
|
else:
|
|
update_theme_asset(asset_type, current_value, current_holiday_theme, self.params)
|
|
|
|
self.previous_assets[asset] = current_value
|
|
theme_changed = True
|
|
|
|
if theme_changed:
|
|
if current_holiday_theme:
|
|
self.previous_assets["holiday_theme"] = current_holiday_theme
|
|
self.params_memory.put_bool("ThemeUpdated", True)
|
|
update_frogpilot_toggles()
|
|
|
|
def extract_zip(self, zip_file, extract_path):
|
|
print(f"Extracting {zip_file} to {extract_path}")
|
|
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_path)
|
|
os.remove(zip_file)
|
|
print(f"Extraction completed and zip file deleted.")
|
|
|
|
def handle_existing_theme(self, theme_name, theme_param):
|
|
print(f"Theme {theme_name} already exists, skipping download...")
|
|
self.params_memory.put(DOWNLOAD_PROGRESS_PARAM, "Theme already exists...")
|
|
self.params_memory.remove(theme_param)
|
|
|
|
def handle_verification_failure(self, extensions, theme_component, theme_name, theme_param, download_path):
|
|
if theme_component == "distance_icons":
|
|
download_link = f"{GITLAB_URL}Distance-Icons/{theme_name}"
|
|
elif theme_component == "steering_wheels":
|
|
download_link = f"{GITLAB_URL}Steering-Wheels/{theme_name}"
|
|
else:
|
|
download_link = f"{GITLAB_URL}Themes/{theme_name}/{theme_component}"
|
|
|
|
for ext in extensions:
|
|
theme_path = download_path + ext
|
|
temp_theme_path = f"{os.path.splitext(theme_path)[0]}_temp{ext}"
|
|
theme_url = download_link + ext
|
|
print(f"Downloading theme from GitLab: {theme_name}")
|
|
download_file(CANCEL_DOWNLOAD_PARAM, theme_path, temp_theme_path, DOWNLOAD_PROGRESS_PARAM, theme_url, theme_param, self.params_memory)
|
|
|
|
if verify_download(theme_path, temp_theme_path, theme_url):
|
|
print(f"Theme {theme_name} downloaded and verified successfully from GitLab!")
|
|
if ext == ".zip":
|
|
self.params_memory.put(DOWNLOAD_PROGRESS_PARAM, "Unpacking theme...")
|
|
self.extract_zip(theme_path, os.path.join(THEME_SAVE_PATH, theme_name))
|
|
self.params_memory.put(DOWNLOAD_PROGRESS_PARAM, "Downloaded!")
|
|
self.params_memory.remove(theme_param)
|
|
return True
|
|
|
|
handle_error(download_path, "GitLab verification failed", "Verification failed", theme_param, DOWNLOAD_PROGRESS_PARAM, self.params_memory)
|
|
return False
|
|
|
|
def download_theme(self, theme_component, theme_name, theme_param):
|
|
repo_url = get_repository_url()
|
|
if not repo_url:
|
|
handle_error(None, "GitHub and GitLab are offline...", "Repository unavailable", theme_param, DOWNLOAD_PROGRESS_PARAM, self.params_memory)
|
|
return
|
|
|
|
if theme_component == "distance_icons":
|
|
download_link = f"{repo_url}Distance-Icons/{theme_name}"
|
|
download_path = os.path.join(THEME_SAVE_PATH, theme_component, theme_name)
|
|
extensions = [".zip"]
|
|
elif theme_component == "steering_wheels":
|
|
download_link = f"{repo_url}Steering-Wheels/{theme_name}"
|
|
download_path = os.path.join(THEME_SAVE_PATH, theme_component, theme_name)
|
|
extensions = [".gif", ".png"]
|
|
else:
|
|
download_link = f"{repo_url}Themes/{theme_name}/{theme_component}"
|
|
download_path = os.path.join(THEME_SAVE_PATH, "theme_packs", theme_name, theme_component)
|
|
extensions = [".zip"]
|
|
|
|
for ext in extensions:
|
|
theme_path = download_path + ext
|
|
temp_theme_path = f"{os.path.splitext(theme_path)[0]}_temp{ext}"
|
|
|
|
if os.path.isfile(theme_path):
|
|
handle_error(theme_path, "Theme already exists...", "Theme already exists...", theme_param, DOWNLOAD_PROGRESS_PARAM, self.params_memory)
|
|
return
|
|
|
|
theme_url = download_link + ext
|
|
print(f"Downloading theme from GitHub: {theme_name}")
|
|
download_file(CANCEL_DOWNLOAD_PARAM, theme_path, temp_theme_path, DOWNLOAD_PROGRESS_PARAM, theme_url, theme_param, self.params_memory)
|
|
|
|
if verify_download(theme_path, temp_theme_path, theme_url):
|
|
print(f"Theme {theme_name} downloaded and verified successfully from GitHub!")
|
|
if ext == ".zip":
|
|
self.params_memory.put(DOWNLOAD_PROGRESS_PARAM, "Unpacking theme...")
|
|
self.extract_zip(theme_path, download_path)
|
|
self.params_memory.put(DOWNLOAD_PROGRESS_PARAM, "Downloaded!")
|
|
self.params_memory.remove(theme_param)
|
|
return
|
|
|
|
self.handle_verification_failure(extensions, theme_component, theme_name, theme_param, download_path)
|
|
|
|
def update_theme_params(self, downloadable_colors, downloadable_distance_icons, downloadable_icons, downloadable_signals, downloadable_sounds, downloadable_wheels):
|
|
def filter_existing_assets(assets, subfolder):
|
|
existing_themes = {
|
|
theme.replace('_', ' ').title()
|
|
for theme in os.listdir(os.path.join(THEME_SAVE_PATH, "theme_packs"))
|
|
if os.path.isdir(os.path.join(THEME_SAVE_PATH, "theme_packs", theme, subfolder))
|
|
}
|
|
return sorted(set(assets) - existing_themes)
|
|
|
|
self.params.put("DownloadableColors", ','.join(filter_existing_assets(downloadable_colors, "colors")))
|
|
print("Colors list updated successfully.")
|
|
|
|
distance_icons_directory = os.path.join(THEME_SAVE_PATH, "distance_icons")
|
|
self.params.put("DownloadableDistanceIcons", ','.join(sorted(set(downloadable_distance_icons) - {
|
|
distance_icons.replace('_', ' ').split('.')[0].title()
|
|
for distance_icons in os.listdir(distance_icons_directory)
|
|
}))
|
|
)
|
|
|
|
self.params.put("DownloadableIcons", ','.join(filter_existing_assets(downloadable_icons, "icons")))
|
|
print("Icons list updated successfully.")
|
|
|
|
self.params.put("DownloadableSignals", ','.join(filter_existing_assets(downloadable_signals, "signals")))
|
|
print("Signals list updated successfully.")
|
|
|
|
self.params.put("DownloadableSounds", ','.join(filter_existing_assets(downloadable_sounds, "sounds")))
|
|
print("Sounds list updated successfully.")
|
|
|
|
wheel_directory = os.path.join(THEME_SAVE_PATH, "steering_wheels")
|
|
self.params.put("DownloadableWheels", ','.join(sorted(set(downloadable_wheels) - {
|
|
wheel.replace('_', ' ').split('.')[0].title()
|
|
for wheel in os.listdir(wheel_directory) if wheel != "img_chffr_wheel.png"
|
|
}))
|
|
)
|
|
|
|
def validate_themes(self):
|
|
asset_mappings = {
|
|
"CustomColors": "colors",
|
|
"CustomDistanceIcons": "distance_icons",
|
|
"CustomIcons": "icons",
|
|
"CustomSounds": "sounds",
|
|
"CustomSignals": "signals",
|
|
"WheelIcon": "steering_wheels"
|
|
}
|
|
|
|
for theme_param, theme_component in asset_mappings.items():
|
|
theme_name = self.params.get(theme_param, encoding='utf-8')
|
|
if not theme_name or theme_name == "stock":
|
|
continue
|
|
|
|
if theme_component == "distance_icons":
|
|
theme_path = os.path.join(THEME_SAVE_PATH, theme_component, theme_name)
|
|
elif theme_component == "steering_wheels":
|
|
pattern = os.path.join(THEME_SAVE_PATH, theme_component, theme_name + ".*")
|
|
matching_files = glob.glob(pattern)
|
|
|
|
if matching_files:
|
|
theme_path = matching_files[0]
|
|
else:
|
|
theme_path = None
|
|
else:
|
|
theme_path = os.path.join(THEME_SAVE_PATH, "theme_packs", theme_name, theme_component)
|
|
|
|
if theme_path is None or not os.path.exists(theme_path):
|
|
print(f"{theme_name} for {theme_component} not found. Downloading...")
|
|
self.download_theme(theme_component, theme_name, theme_param)
|
|
self.previous_assets = {}
|
|
self.update_active_theme()
|
|
|
|
def update_themes(self, boot_run=False):
|
|
if not os.path.exists(THEME_SAVE_PATH):
|
|
print(f"Theme save path does not exist: {THEME_SAVE_PATH}")
|
|
return
|
|
|
|
repo_url = get_repository_url()
|
|
if repo_url is None:
|
|
print("GitHub and GitLab are offline...")
|
|
return
|
|
|
|
if boot_run:
|
|
self.validate_themes()
|
|
|
|
assets = self.fetch_assets(repo_url)
|
|
|
|
downloadable_colors = []
|
|
downloadable_icons = []
|
|
downloadable_signals = []
|
|
downloadable_sounds = []
|
|
|
|
for theme, available_assets in assets["themes"].items():
|
|
theme_name = theme.replace('_', ' ').split('.')[0].title()
|
|
print(f"Checking theme: {theme_name}")
|
|
|
|
if "colors" in available_assets:
|
|
downloadable_colors.append(theme_name)
|
|
if "icons" in available_assets:
|
|
downloadable_icons.append(theme_name)
|
|
if "signals" in available_assets:
|
|
downloadable_signals.append(theme_name)
|
|
if "sounds" in available_assets:
|
|
downloadable_sounds.append(theme_name)
|
|
|
|
downloadable_distance_icons = [distance_icon.replace('_', ' ').split('.')[0].title() for distance_icon in assets["distance_icons"]]
|
|
downloadable_wheels = [wheel.replace('_', ' ').split('.')[0].title() for wheel in assets["wheels"]]
|
|
|
|
print(f"Downloadable Colors: {downloadable_colors}")
|
|
print(f"Downloadable Icons: {downloadable_icons}")
|
|
print(f"Downloadable Signals: {downloadable_signals}")
|
|
print(f"Downloadable Sounds: {downloadable_sounds}")
|
|
print(f"Downloadable Distance Icons: {downloadable_distance_icons}")
|
|
print(f"Downloadable Wheels: {downloadable_wheels}")
|
|
|
|
self.update_theme_params(downloadable_colors, downloadable_distance_icons, downloadable_icons, downloadable_signals, downloadable_sounds, downloadable_wheels)
|