Files
onepilot/system/ui/mici_setup.py
Shane Smiskol a7de971334 mici setup: use nav stack (#37507)
* pressable

* slow

* fast and looks great

* 0.075

* clean up

* fix missing

* clean up

* mici setup use nav stack!

* remove flat state!

* todo

* clean up

* clean up ordering

* clean up

* reset progress on show, dont mutate nav stack from thread

* reset text on show too

* rename

* clean up
2026-03-01 02:41:51 -08:00

722 lines
27 KiB
Python
Executable File

#!/usr/bin/env python3
from abc import abstractmethod
import os
import re
import threading
import time
import urllib.request
import urllib.error
from urllib.parse import urlparse
import shutil
from collections.abc import Callable
import pyray as rl
from cereal import log
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.realtime import config_realtime_process, set_core_affinity
from openpilot.common.swaglog import cloudlog
from openpilot.common.utils import run_cmd
from openpilot.system.hardware import HARDWARE, TICI
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.wifi_manager import WifiManager
from openpilot.system.ui.lib.scroll_panel2 import GuiScrollPanel2
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.nav_widget import NavWidget
from openpilot.system.ui.widgets.button import (IconButton, SmallButton, WideRoundedButton, SmallerRoundedButton,
SmallCircleIconButton, WidishRoundedButton, FullRoundedButton)
from openpilot.system.ui.widgets.label import UnifiedLabel
from openpilot.system.ui.widgets.slider import LargerSlider, SmallSlider
from openpilot.selfdrive.ui.mici.layouts.settings.network import WifiUIMici
from openpilot.selfdrive.ui.mici.widgets.dialog import BigInputDialog
NetworkType = log.DeviceState.NetworkType
OPENPILOT_URL = "https://openpilot.comma.ai"
USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
CONTINUE_PATH = "/data/continue.sh"
TMP_CONTINUE_PATH = "/data/continue.sh.new"
INSTALL_PATH = "/data/openpilot"
VALID_CACHE_PATH = "/data/.openpilot_cache"
INSTALLER_SOURCE_PATH = "/usr/comma/installer"
INSTALLER_DESTINATION_PATH = "/tmp/installer"
INSTALLER_URL_PATH = "/tmp/installer_url"
CONTINUE = """#!/usr/bin/env bash
cd /data/openpilot
exec ./launch_openpilot.sh
"""
class NetworkConnectivityMonitor:
def __init__(self, should_check: Callable[[], bool] | None = None):
self.network_connected = threading.Event()
self.wifi_connected = threading.Event()
self._should_check = should_check or (lambda: True)
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
def start(self):
self._stop_event.clear()
if self._thread is None or not self._thread.is_alive():
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
if self._thread is not None:
self._stop_event.set()
self._thread.join()
self._thread = None
def reset(self):
self.network_connected.clear()
self.wifi_connected.clear()
def _run(self):
while not self._stop_event.is_set():
if self._should_check():
try:
request = urllib.request.Request(OPENPILOT_URL, method="HEAD")
urllib.request.urlopen(request, timeout=2.0)
self.network_connected.set()
if HARDWARE.get_network_type() == NetworkType.wifi:
self.wifi_connected.set()
except Exception:
self.reset()
else:
self.reset()
if self._stop_event.wait(timeout=1.0):
break
class StartPage(Widget):
def __init__(self):
super().__init__()
self._title = UnifiedLabel("start", 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER,
alignment_vertical=rl.GuiTextAlignmentVertical.TEXT_ALIGN_MIDDLE)
self._start_bg_txt = gui_app.texture("icons_mici/setup/start_button.png", 500, 224, keep_aspect_ratio=False)
self._start_bg_pressed_txt = gui_app.texture("icons_mici/setup/start_button_pressed.png", 500, 224, keep_aspect_ratio=False)
self._scale_filter = FirstOrderFilter(1.0, 0.1, 1 / gui_app.target_fps)
self._click_delay = 0.075
def _render(self, rect: rl.Rectangle):
scale = self._scale_filter.update(1.07 if self.is_pressed else 1.0)
base_draw_x = rect.x + (rect.width - self._start_bg_txt.width) / 2
base_draw_y = rect.y + (rect.height - self._start_bg_txt.height) / 2
draw_x = base_draw_x + (self._start_bg_txt.width * (1 - scale)) / 2
draw_y = base_draw_y + (self._start_bg_txt.height * (1 - scale)) / 2
texture = self._start_bg_pressed_txt if self.is_pressed else self._start_bg_txt
rl.draw_texture_ex(texture, (draw_x, draw_y), 0, scale, rl.WHITE)
self._title.render(rl.Rectangle(rect.x, rect.y + (draw_y - base_draw_y), rect.width, rect.height))
class SoftwareSelectionPage(NavWidget):
def __init__(self, use_openpilot_callback: Callable,
use_custom_software_callback: Callable):
super().__init__()
self._openpilot_slider = LargerSlider("slide to use\nopenpilot", use_openpilot_callback)
self._openpilot_slider.set_enabled(lambda: self.enabled and not self.is_dismissing)
self._custom_software_slider = LargerSlider("slide to use\ncustom software", use_custom_software_callback, green=False)
self._custom_software_slider.set_enabled(lambda: self.enabled and not self.is_dismissing)
def show_event(self):
super().show_event()
self._nav_bar._alpha = 0.0
def _update_state(self):
super()._update_state()
if self.is_dismissing:
self.reset()
def reset(self):
self._openpilot_slider.reset()
self._custom_software_slider.reset()
def _render(self, rect: rl.Rectangle):
self._openpilot_slider.set_opacity(1.0 - self._custom_software_slider.slider_percentage)
self._custom_software_slider.set_opacity(1.0 - self._openpilot_slider.slider_percentage)
openpilot_rect = rl.Rectangle(
rect.x + (rect.width - self._openpilot_slider.rect.width) / 2,
rect.y,
self._openpilot_slider.rect.width,
rect.height / 2,
)
self._openpilot_slider.render(openpilot_rect)
custom_software_rect = rl.Rectangle(
rect.x + (rect.width - self._custom_software_slider.rect.width) / 2,
rect.y + rect.height / 2,
self._custom_software_slider.rect.width,
rect.height / 2,
)
self._custom_software_slider.render(custom_software_rect)
class TermsHeader(Widget):
def __init__(self, text: str, icon_texture: rl.Texture):
super().__init__()
self._title = UnifiedLabel(text, 36, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.BOLD, alignment_vertical=rl.GuiTextAlignmentVertical.TEXT_ALIGN_MIDDLE,
line_height=0.8)
self._icon_texture = icon_texture
self.set_rect(rl.Rectangle(0, 0, gui_app.width - 16 * 2, self._icon_texture.height))
def set_title(self, text: str):
self._title.set_text(text)
def set_icon(self, icon_texture: rl.Texture):
self._icon_texture = icon_texture
def _render(self, _):
rl.draw_texture_ex(self._icon_texture, rl.Vector2(self._rect.x, self._rect.y),
0.0, 1.0, rl.WHITE)
# May expand outside parent rect
title_content_height = self._title.get_content_height(int(self._rect.width - self._icon_texture.width - 16))
title_rect = rl.Rectangle(
self._rect.x + self._icon_texture.width + 16,
self._rect.y + (self._rect.height - title_content_height) / 2,
self._rect.width - self._icon_texture.width - 16,
title_content_height,
)
self._title.render(title_rect)
class TermsPage(Widget):
ITEM_SPACING = 20
def __init__(self, continue_callback: Callable, back_callback: Callable | None = None,
back_text: str = "back", continue_text: str = "accept"):
super().__init__()
# TODO: use Scroller
self._scroll_panel = GuiScrollPanel2(horizontal=False)
self._continue_text = continue_text
self._continue_slider: bool = continue_text in ("reboot", "power off")
self._continue_button: WideRoundedButton | FullRoundedButton | SmallSlider
if self._continue_slider:
self._continue_button = SmallSlider(continue_text, confirm_callback=continue_callback)
self._scroll_panel.set_enabled(lambda: not self._continue_button.is_pressed)
elif back_callback is not None:
self._continue_button = WideRoundedButton(continue_text)
else:
self._continue_button = FullRoundedButton(continue_text)
self._continue_button.set_enabled(False)
self._continue_button.set_opacity(0.0)
self._continue_button.set_touch_valid_callback(self._scroll_panel.is_touch_valid)
if not self._continue_slider:
self._continue_button.set_click_callback(continue_callback)
self._enable_back = back_callback is not None
self._back_button = SmallButton(back_text)
self._back_button.set_opacity(0.0)
self._back_button.set_touch_valid_callback(self._scroll_panel.is_touch_valid)
self._back_button.set_click_callback(back_callback)
self._scroll_down_indicator = IconButton(gui_app.texture("icons_mici/setup/scroll_down_indicator.png", 64, 78))
self._scroll_down_indicator.set_enabled(False)
def reset(self):
self._scroll_panel.set_offset(0)
self._continue_button.set_enabled(False)
self._continue_button.set_opacity(0.0)
self._back_button.set_enabled(False)
self._back_button.set_opacity(0.0)
self._scroll_down_indicator.set_opacity(1.0)
def show_event(self):
super().show_event()
self.reset()
@property
@abstractmethod
def _content_height(self):
pass
@property
def _scrolled_down_offset(self):
return -self._content_height + (self._continue_button.rect.height + 16 + 30)
@abstractmethod
def _render_content(self, scroll_offset):
pass
def _render(self, _):
rl.draw_rectangle_rec(self._rect, rl.BLACK)
scroll_offset = round(self._scroll_panel.update(self._rect, self._content_height + self._continue_button.rect.height + 16))
if scroll_offset <= self._scrolled_down_offset:
# don't show back if not enabled
if self._enable_back:
self._back_button.set_enabled(True)
self._back_button.set_opacity(1.0, smooth=True)
self._continue_button.set_enabled(True)
self._continue_button.set_opacity(1.0, smooth=True)
self._scroll_down_indicator.set_opacity(0.0, smooth=True)
else:
self._back_button.set_enabled(False)
self._back_button.set_opacity(0.0, smooth=True)
self._continue_button.set_enabled(False)
self._continue_button.set_opacity(0.0, smooth=True)
self._scroll_down_indicator.set_opacity(1.0, smooth=True)
# Render content
self._render_content(scroll_offset)
# black gradient at top and bottom for scrolling content
rl.draw_rectangle_gradient_v(int(self._rect.x), int(self._rect.y),
int(self._rect.width), 20, rl.BLACK, rl.BLANK)
rl.draw_rectangle_gradient_v(int(self._rect.x), int(self._rect.y + self._rect.height - 20),
int(self._rect.width), 20, rl.BLANK, rl.BLACK)
# fade out back button as slider is moved
if self._continue_slider and scroll_offset <= self._scrolled_down_offset:
self._back_button.set_opacity(1.0 - self._continue_button.slider_percentage)
self._back_button.set_visible(self._continue_button.slider_percentage < 0.99)
self._back_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._back_button.rect.height,
self._back_button.rect.width,
self._back_button.rect.height,
))
continue_x = self._rect.x + 8
if self._enable_back:
continue_x = self._rect.x + self._rect.width - self._continue_button.rect.width - 8
if self._continue_slider:
continue_x += 8
self._continue_button.render(rl.Rectangle(
continue_x,
self._rect.y + self._rect.height - self._continue_button.rect.height,
self._continue_button.rect.width,
self._continue_button.rect.height,
))
self._scroll_down_indicator.render(rl.Rectangle(
self._rect.x + self._rect.width - self._scroll_down_indicator.rect.width - 8,
self._rect.y + self._rect.height - self._scroll_down_indicator.rect.height - 8,
self._scroll_down_indicator.rect.width,
self._scroll_down_indicator.rect.height,
))
class CustomSoftwareWarningPage(TermsPage):
def __init__(self, continue_callback: Callable, back_callback: Callable):
super().__init__(continue_callback, back_callback)
self._title_header = TermsHeader("use caution installing\n3rd party software",
gui_app.texture("icons_mici/setup/warning.png", 66, 60))
self._body = UnifiedLabel("• It has not been tested by comma.\n" +
"• It may not comply with relevant safety standards.\n" +
"• It may cause damage to your device and/or vehicle.\n", 36, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.ROMAN)
self._restore_header = TermsHeader("how to backup &\nrestore", gui_app.texture("icons_mici/setup/restore.png", 60, 60))
self._restore_body = UnifiedLabel("To restore your device to a factory state later, use https://flash.comma.ai",
36, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.ROMAN)
@property
def _content_height(self):
return self._restore_body.rect.y + self._restore_body.rect.height - self._scroll_panel.get_offset()
def _render_content(self, scroll_offset):
self._title_header.set_position(self._rect.x + 16, self._rect.y + 8 + scroll_offset)
self._title_header.render()
body_rect = rl.Rectangle(
self._rect.x + 8,
self._title_header.rect.y + self._title_header.rect.height + self.ITEM_SPACING,
self._rect.width - 50,
self._body.get_content_height(int(self._rect.width - 50)),
)
self._body.render(body_rect)
self._restore_header.set_position(self._rect.x + 16, self._body.rect.y + self._body.rect.height + self.ITEM_SPACING)
self._restore_header.render()
self._restore_body.render(rl.Rectangle(
self._rect.x + 8,
self._restore_header.rect.y + self._restore_header.rect.height + self.ITEM_SPACING,
self._rect.width - 50,
self._restore_body.get_content_height(int(self._rect.width - 50)),
))
class DownloadingPage(Widget):
def __init__(self):
super().__init__()
self._title_label = UnifiedLabel("downloading...", 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY)
self._progress_label = UnifiedLabel("", 132, text_color=rl.Color(255, 255, 255, int(255 * 0.9 * 0.65)),
font_weight=FontWeight.ROMAN, alignment_vertical=rl.GuiTextAlignmentVertical.TEXT_ALIGN_BOTTOM)
self._progress = 0
def show_event(self):
super().show_event()
self.set_progress(0)
def set_progress(self, progress: int):
self._progress = progress
self._progress_label.set_text(f"{progress}%")
def _render(self, rect: rl.Rectangle):
rl.draw_rectangle_rec(rect, rl.BLACK)
self._title_label.render(rl.Rectangle(
rect.x + 12,
rect.y + 2,
rect.width,
64,
))
self._progress_label.render(rl.Rectangle(
rect.x + 12,
rect.y + 18,
rect.width,
rect.height,
))
class FailedPage(NavWidget):
def __init__(self, reboot_callback: Callable, retry_callback: Callable, title: str = "download failed"):
super().__init__()
self.set_back_callback(retry_callback)
self._title_label = UnifiedLabel(title, 64, text_color=rl.Color(255, 255, 255, int(255 * 0.9)),
font_weight=FontWeight.DISPLAY)
self._reason_label = UnifiedLabel("", 36, text_color=rl.Color(255, 255, 255, int(255 * 0.9 * 0.65)),
font_weight=FontWeight.ROMAN)
self._reboot_slider = SmallSlider("reboot", reboot_callback)
self._reboot_slider.set_enabled(lambda: self.enabled) # for nav stack
self._retry_button = SmallButton("retry")
self._retry_button.set_click_callback(retry_callback)
self._retry_button.set_enabled(lambda: self.enabled) # for nav stack
def set_reason(self, reason: str):
self._reason_label.set_text(reason)
def show_event(self):
super().show_event()
self._reboot_slider.reset()
def _render(self, rect: rl.Rectangle):
self._title_label.render(rl.Rectangle(
rect.x + 8,
rect.y + 10,
rect.width,
64,
))
self._reason_label.render(rl.Rectangle(
rect.x + 8,
rect.y + 10 + 64,
rect.width,
36,
))
self._retry_button.set_opacity(1 - self._reboot_slider.slider_percentage)
self._retry_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._retry_button.rect.height,
self._retry_button.rect.width,
self._retry_button.rect.height,
))
self._reboot_slider.render(rl.Rectangle(
self._rect.x + self._rect.width - self._reboot_slider.rect.width,
self._rect.y + self._rect.height - self._reboot_slider.rect.height,
self._reboot_slider.rect.width,
self._reboot_slider.rect.height,
))
class NetworkSetupPage(NavWidget):
def __init__(self, network_monitor: NetworkConnectivityMonitor, continue_callback: Callable[[bool], None],
back_callback: Callable[[], None] | None):
super().__init__()
self.set_back_callback(back_callback)
self._wifi_manager = WifiManager()
self._wifi_manager.set_active(True)
self._network_monitor = network_monitor
self._custom_software = False
self._prev_has_internet = False
self._wifi_ui = WifiUIMici(self._wifi_manager)
self._no_wifi_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_slash.png", 58, 50)
self._wifi_full_txt = gui_app.texture("icons_mici/settings/network/wifi_strength_full.png", 58, 50)
self._waiting_text = "waiting for internet..."
self._network_header = TermsHeader(self._waiting_text, self._no_wifi_txt)
back_txt = gui_app.texture("icons_mici/setup/back_new.png", 37, 32)
self._back_button = SmallCircleIconButton(back_txt)
self._back_button.set_click_callback(back_callback)
self._back_button.set_enabled(lambda: self.enabled) # for nav stack
self._wifi_button = SmallerRoundedButton("wifi")
self._wifi_button.set_click_callback(lambda: gui_app.push_widget(self._wifi_ui))
self._wifi_button.set_enabled(lambda: self.enabled)
self._continue_button = WidishRoundedButton("continue")
self._continue_button.set_enabled(False)
self._continue_button.set_click_callback(lambda: continue_callback(self._custom_software))
gui_app.add_nav_stack_tick(self._nav_stack_tick)
def show_event(self):
super().show_event()
self._prev_has_internet = False
self._network_monitor.reset()
self._set_has_internet(False)
def _nav_stack_tick(self):
self._wifi_manager.process_callbacks()
has_internet = self._network_monitor.network_connected.is_set()
if has_internet != self._prev_has_internet:
self._set_has_internet(has_internet)
if has_internet:
gui_app.pop_widgets_to(self)
self._prev_has_internet = has_internet
def _set_has_internet(self, has_internet: bool):
if has_internet:
self._network_header.set_title("connected to internet")
self._network_header.set_icon(self._wifi_full_txt)
self._continue_button.set_enabled(self.enabled)
else:
self._network_header.set_title(self._waiting_text)
self._network_header.set_icon(self._no_wifi_txt)
self._continue_button.set_enabled(False)
def set_custom_software(self, custom_software: bool):
self._custom_software = custom_software
def _render(self, _):
self._network_header.render(rl.Rectangle(
self._rect.x + 16,
self._rect.y + 16,
self._rect.width - 32,
self._network_header.rect.height,
))
self._back_button.render(rl.Rectangle(
self._rect.x + 8,
self._rect.y + self._rect.height - self._back_button.rect.height,
self._back_button.rect.width,
self._back_button.rect.height,
))
self._wifi_button.render(rl.Rectangle(
self._rect.x + 8 + self._back_button.rect.width + 10,
self._rect.y + self._rect.height - self._wifi_button.rect.height,
self._wifi_button.rect.width,
self._wifi_button.rect.height,
))
self._continue_button.render(rl.Rectangle(
self._rect.x + self._rect.width - self._continue_button.rect.width - 8,
self._rect.y + self._rect.height - self._continue_button.rect.height,
self._continue_button.rect.width,
self._continue_button.rect.height,
))
class Setup(Widget):
def __init__(self):
super().__init__()
self.download_url = ""
self.download_progress = 0
self.download_thread = None
self._download_failed_reason: str | None = None
self._network_monitor = NetworkConnectivityMonitor()
self._network_monitor.start()
def getting_started_button_callback():
self._software_selection_page.reset()
gui_app.push_widget(self._software_selection_page)
self._start_page = StartPage()
self._start_page.set_click_callback(getting_started_button_callback)
self._start_page.set_enabled(lambda: self.enabled) # for nav stack
self._network_setup_page = NetworkSetupPage(self._network_monitor, self._network_setup_continue_button_callback,
self._pop_to_software_selection)
self._software_selection_page = SoftwareSelectionPage(self._use_openpilot, lambda: gui_app.push_widget(self._custom_software_warning_page))
self._download_failed_page = FailedPage(HARDWARE.reboot, self._pop_to_software_selection)
self._custom_software_warning_page = CustomSoftwareWarningPage(self._software_selection_custom_software_continue, self._pop_to_software_selection)
self._downloading_page = DownloadingPage()
gui_app.add_nav_stack_tick(self._nav_stack_tick)
def _nav_stack_tick(self):
self._downloading_page.set_progress(self.download_progress)
if self._download_failed_reason is not None:
reason = self._download_failed_reason
self._download_failed_reason = None
self._download_failed_page.set_reason(reason)
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
gui_app.push_widget(self._download_failed_page)
def _render(self, rect: rl.Rectangle):
self._start_page.render(rect)
def close(self):
self._network_monitor.stop()
def _pop_to_software_selection(self):
# reset sliders after dismiss completes
gui_app.pop_widgets_to(self._software_selection_page, self._software_selection_page.reset)
def _use_openpilot(self):
if os.path.isdir(INSTALL_PATH) and os.path.isfile(VALID_CACHE_PATH):
os.remove(VALID_CACHE_PATH)
with open(TMP_CONTINUE_PATH, "w") as f:
f.write(CONTINUE)
run_cmd(["chmod", "+x", TMP_CONTINUE_PATH])
shutil.move(TMP_CONTINUE_PATH, CONTINUE_PATH)
shutil.copyfile(INSTALLER_SOURCE_PATH, INSTALLER_DESTINATION_PATH)
# give time for installer UI to take over
time.sleep(0.1)
gui_app.request_close()
else:
self._push_network_setup(custom_software=False)
def _push_network_setup(self, custom_software: bool):
self._network_setup_page.set_custom_software(custom_software)
gui_app.push_widget(self._network_setup_page)
def _software_selection_custom_software_continue(self):
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
self._push_network_setup(custom_software=True)
def _network_setup_continue_button_callback(self, custom_software):
if not custom_software:
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
self._download(OPENPILOT_URL)
else:
def handle_keyboard_result(text):
url = text.strip()
if url:
gui_app.pop_widgets_to(self._software_selection_page, instant=True) # don't reset sliders
self._download(url)
keyboard = BigInputDialog("custom software URL", "openpilot.comma.ai", confirm_callback=handle_keyboard_result)
gui_app.push_widget(keyboard)
def _download(self, url: str):
# autocomplete incomplete URLs
if re.match("^([^/.]+)/([^/]+)$", url):
url = f"https://installer.comma.ai/{url}"
parsed = urlparse(url, scheme='https')
self.download_url = (urlparse(f"https://{url}") if not parsed.netloc else parsed).geturl()
self.download_progress = 0
gui_app.push_widget(self._downloading_page)
self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
self.download_thread.start()
def _download_thread(self):
try:
import tempfile
fd, tmpfile = tempfile.mkstemp(prefix="installer_")
headers = {"User-Agent": USER_AGENT,
"X-openpilot-serial": HARDWARE.get_serial(),
"X-openpilot-device-type": HARDWARE.get_device_type()}
req = urllib.request.Request(self.download_url, headers=headers)
with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
block_size = 8192
while True:
buffer = response.read(block_size)
if not buffer:
break
downloaded += len(buffer)
f.write(buffer)
if total_size:
self.download_progress = int(downloaded * 100 / total_size)
is_elf = False
with open(tmpfile, 'rb') as f:
header = f.read(4)
is_elf = header == b'\x7fELF'
if not is_elf:
self._download_failed_reason = "No custom software found at this URL."
return
# AGNOS might try to execute the installer before this process exits.
# Therefore, important to close the fd before renaming the installer.
os.close(fd)
os.rename(tmpfile, INSTALLER_DESTINATION_PATH)
with open(INSTALLER_URL_PATH, "w") as f:
f.write(self.download_url)
# give time for installer UI to take over
time.sleep(0.1)
gui_app.request_close()
except urllib.error.HTTPError as e:
if e.code == 409:
self._download_failed_reason = "Incompatible openpilot version"
except Exception:
self._download_failed_reason = "Invalid URL"
def main():
config_realtime_process(0, 51)
# attempt to affine. AGNOS will start setup with all cores, should only fail when manually launching with screen off
if TICI:
try:
set_core_affinity([5])
except OSError:
cloudlog.exception("Failed to set core affinity for setup process")
try:
gui_app.init_window("Setup")
setup = Setup()
gui_app.push_widget(setup)
for _ in gui_app.render():
pass
setup.close()
except Exception as e:
print(f"Setup error: {e}")
finally:
gui_app.close()
if __name__ == "__main__":
main()