From 0b1d97fc723be3a2353f2e7bf66bdc98d1b3b998 Mon Sep 17 00:00:00 2001 From: DevTekVE Date: Sun, 5 Jan 2025 08:27:44 +0100 Subject: [PATCH] sunnylink support (#499) * Add Sunnylink integration for improved device communication This commit introduces Sunnylink support, including modules for API interactions, device registration, logging, and uploader processes. Key changes involve adding Sunnylink-related components, such as sunnylinkd, manage_sunnylinkd, and associated utilities, along with seamless integration into process management. * Refactor Sunnylink modules and update import paths Standardize parameter handling in Sunnylink functions by initializing Params within functions as needed. Update imports to use fully-qualified paths for better clarity and consistency. Also, refactor logging messages for improved readability and maintainability. * Add Sunnylink support and improve log handling Introduced Sunnylink-specific functionality, including compression for oversized logs and platform-specific socket handling for macOS. Improved logging mechanisms, refactored log queue management, and fixed exception handling in sunnylinkd. * Refactor and fix minor coding style inconsistencies Remove unnecessary string concatenation, adjust spacing for better readability, and ensure cleaner code in `athenad.py` and `sunnylink.py`. Added a macOS-specific comment for TCP_KEEPALIVE configuration to improve code clarity. * Replace platform system check with sys platform in athenad.py To check for macOS platform, the code in athenad.py has been altered. Originally, the platform.system() function was used. However, the function has been replaced with sys.platform for a more consistent and preferable syntax. Particularly, this has been modified in the context of setting socket options. * Apply suggestions from code review Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * Simplify imports and reformat API function. Removed unused `platform` import for cleanup in `athenad.py`. Improved readability of `api_get` in `__init__.py` by reformatting the long return statement into multiple lines. * Adjust backoff logic and refactor API call formatting. Introduce randomness to backoff calculation in Sunnylink API to reduce synchronization issues. Minor code refactoring improves readability in the API call logic. * Refactor Sunnylink network check logic. Removed hardware-based network check due to performance concerns and replaced it with a real-time device state monitoring loop. This improves efficiency and ensures accurate online status before proceeding with Sunnylink registration. * Apply suggestions from code review * `Refactor saveParams error handling and simplify logic` Removed redundant try-except block wrapping the entire method for clarity. Moved error logging directly inside the loop to handle individual parameter exceptions more effectively. Simplified dictionary construction and improved error logging format. * Add BACKUP flag to select persistent parameters This commit introduces a new BACKUP flag and applies it to specific persistent parameters in `params.cc` and `params.h`. The BACKUP flag enhances data retention by designating parameters for inclusion in backups, ensuring crucial information is preserved across sessions. * Simplify Sunnypilot params formatting Removed unnecessary blank lines and adjusted the Sunnypilot comment format for better readability and consistency. No functional changes were made. * SP: Move Sunnypilot-related code to sunnypilot/sunnylink (#504) * Refactor and relocate sunnylink-related modules sunnylink components have been reorganized for better modularity and clarity, with files moved under `sunnypilot/sunnylink`. Unused code was removed, and reusable utilities were separated for easier maintenance. Adjusted references across the project to reflect these changes. * Permissions * adding init py * more --------- Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> Co-authored-by: Jason Wen --- common/api/__init__.py | 51 +-- common/api/base.py | 56 ++++ common/api/comma_connect.py | 11 + common/params.cc | 55 ++-- common/params.h | 1 + sunnypilot/sunnylink/__init__.py | 0 sunnypilot/sunnylink/api.py | 158 +++++++++ sunnypilot/sunnylink/athena/__init__.py | 0 .../sunnylink/athena/manage_sunnylinkd.py | 5 + sunnypilot/sunnylink/athena/sunnylinkd.py | 256 +++++++++++++++ sunnypilot/sunnylink/registration_manager.py | 29 ++ sunnypilot/sunnylink/uploader.py | 310 ++++++++++++++++++ sunnypilot/sunnylink/utils.py | 48 +++ system/athena/athenad.py | 121 +++++-- system/athena/manage_athenad.py | 17 +- system/manager/process_config.py | 20 ++ 16 files changed, 1045 insertions(+), 93 deletions(-) create mode 100644 common/api/base.py create mode 100644 common/api/comma_connect.py create mode 100644 sunnypilot/sunnylink/__init__.py create mode 100644 sunnypilot/sunnylink/api.py create mode 100644 sunnypilot/sunnylink/athena/__init__.py create mode 100755 sunnypilot/sunnylink/athena/manage_sunnylinkd.py create mode 100755 sunnypilot/sunnylink/athena/sunnylinkd.py create mode 100755 sunnypilot/sunnylink/registration_manager.py create mode 100755 sunnypilot/sunnylink/uploader.py create mode 100644 sunnypilot/sunnylink/utils.py diff --git a/common/api/__init__.py b/common/api/__init__.py index ac231400a..554276080 100644 --- a/common/api/__init__.py +++ b/common/api/__init__.py @@ -1,46 +1,27 @@ -import jwt -import os -import requests -from datetime import datetime, timedelta, UTC -from openpilot.system.hardware.hw import Paths -from openpilot.system.version import get_version +from openpilot.common.api.comma_connect import CommaConnectApi +from sunnypilot.sunnylink.api import SunnylinkApi -API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') class Api: - def __init__(self, dongle_id): - self.dongle_id = dongle_id - with open(Paths.persist_root()+'/comma/id_rsa') as f: - self.private_key = f.read() + def __init__(self, dongle_id, use_sunnylink=False): + if use_sunnylink: + self.service = SunnylinkApi(dongle_id) + else: + self.service = CommaConnectApi(dongle_id) + + def request(self, method, endpoint, **params): + return self.service.request(method, endpoint, **params) def get(self, *args, **kwargs): - return self.request('GET', *args, **kwargs) + return self.service.get(*args, **kwargs) def post(self, *args, **kwargs): - return self.request('POST', *args, **kwargs) - - def request(self, method, endpoint, timeout=None, access_token=None, **params): - return api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params) + return self.service.post(*args, **kwargs) def get_token(self, expiry_hours=1): - now = datetime.now(UTC).replace(tzinfo=None) - payload = { - 'identity': self.dongle_id, - 'nbf': now, - 'iat': now, - 'exp': now + timedelta(hours=expiry_hours) - } - token = jwt.encode(payload, self.private_key, algorithm='RS256') - if isinstance(token, bytes): - token = token.decode('utf8') - return token + return self.service.get_token(expiry_hours) -def api_get(endpoint, method='GET', timeout=None, access_token=None, **params): - headers = {} - if access_token is not None: - headers['Authorization'] = "JWT " + access_token - - headers['User-Agent'] = "openpilot-" + get_version() - - return requests.request(method, API_HOST + "/" + endpoint, timeout=timeout, headers=headers, params=params) +def api_get(endpoint, method='GET', timeout=None, access_token=None, use_sunnylink=False, **params): + return SunnylinkApi(None).api_get(endpoint, method, timeout, access_token, **params) if use_sunnylink \ + else CommaConnectApi(None).api_get(endpoint, method, timeout, access_token, **params) diff --git a/common/api/base.py b/common/api/base.py new file mode 100644 index 000000000..0707c1cca --- /dev/null +++ b/common/api/base.py @@ -0,0 +1,56 @@ +import jwt +import requests +import unicodedata +from datetime import datetime, timedelta, UTC +from openpilot.system.hardware.hw import Paths +from openpilot.system.version import get_version + + +class BaseApi: + def __init__(self, dongle_id, api_host, user_agent="openpilot-"): + self.dongle_id = dongle_id + self.api_host = api_host + self.user_agent = user_agent + with open(f'{Paths.persist_root()}/comma/id_rsa') as f: + self.private_key = f.read() + + def get(self, *args, **kwargs): + return self.request('GET', *args, **kwargs) + + def post(self, *args, **kwargs): + return self.request('POST', *args, **kwargs) + + def request(self, method, endpoint, timeout=None, access_token=None, **params): + return self.api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params) + + def _get_token(self, expiry_hours=1, **extra_payload): + now = datetime.now(UTC).replace(tzinfo=None) + payload = { + 'identity': self.dongle_id, + 'nbf': now, + 'iat': now, + 'exp': now + timedelta(hours=expiry_hours), + **extra_payload + } + token = jwt.encode(payload, self.private_key, algorithm='RS256') + if isinstance(token, bytes): + token = token.decode('utf8') + return token + + def get_token(self, expiry_hours=1): + return self._get_token(expiry_hours) + + def remove_non_ascii_chars(self, text): + normalized_text = unicodedata.normalize('NFD', text) + ascii_encoded_text = normalized_text.encode('ascii', 'ignore') + return ascii_encoded_text.decode() + + def api_get(self, endpoint, method='GET', timeout=None, access_token=None, **params): + headers = {} + if access_token is not None: + headers['Authorization'] = "JWT " + access_token + + version = self.remove_non_ascii_chars(get_version()) + headers['User-Agent'] = self.user_agent + version + + return requests.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, params=params) diff --git a/common/api/comma_connect.py b/common/api/comma_connect.py new file mode 100644 index 000000000..1c705f372 --- /dev/null +++ b/common/api/comma_connect.py @@ -0,0 +1,11 @@ +import os + +from openpilot.common.api.base import BaseApi + +API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') + + +class CommaConnectApi(BaseApi): + def __init__(self, dongle_id): + super().__init__(dongle_id, API_HOST) + self.user_agent = "openpilot-" diff --git a/common/params.cc b/common/params.cc index 3eccd937e..85dc56463 100644 --- a/common/params.cc +++ b/common/params.cc @@ -109,36 +109,36 @@ std::unordered_map keys = { {"CurrentBootlog", PERSISTENT}, {"CurrentRoute", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"DisableLogging", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, - {"DisablePowerDown", PERSISTENT}, - {"DisableUpdates", PERSISTENT}, - {"DisengageOnAccelerator", PERSISTENT}, + {"DisablePowerDown", PERSISTENT | BACKUP}, + {"DisableUpdates", PERSISTENT | BACKUP}, + {"DisengageOnAccelerator", PERSISTENT | BACKUP}, {"DongleId", PERSISTENT}, {"DoReboot", CLEAR_ON_MANAGER_START}, {"DoShutdown", CLEAR_ON_MANAGER_START}, {"DoUninstall", CLEAR_ON_MANAGER_START}, - {"ExperimentalLongitudinalEnabled", PERSISTENT | DEVELOPMENT_ONLY}, - {"ExperimentalMode", PERSISTENT}, - {"ExperimentalModeConfirmed", PERSISTENT}, + {"ExperimentalLongitudinalEnabled", PERSISTENT | DEVELOPMENT_ONLY | BACKUP}, + {"ExperimentalMode", PERSISTENT | BACKUP}, + {"ExperimentalModeConfirmed", PERSISTENT | BACKUP}, {"FirmwareQueryDone", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"ForcePowerDown", PERSISTENT}, {"GitBranch", PERSISTENT}, {"GitCommit", PERSISTENT}, {"GitCommitDate", PERSISTENT}, {"GitDiff", PERSISTENT}, - {"GithubSshKeys", PERSISTENT}, - {"GithubUsername", PERSISTENT}, + {"GithubSshKeys", PERSISTENT | BACKUP}, + {"GithubUsername", PERSISTENT | BACKUP}, {"GitRemote", PERSISTENT}, - {"GsmApn", PERSISTENT}, - {"GsmMetered", PERSISTENT}, - {"GsmRoaming", PERSISTENT}, + {"GsmApn", PERSISTENT | BACKUP}, + {"GsmMetered", PERSISTENT | BACKUP}, + {"GsmRoaming", PERSISTENT | BACKUP}, {"HardwareSerial", PERSISTENT}, {"HasAcceptedTerms", PERSISTENT}, {"IMEI", PERSISTENT}, {"InstallDate", PERSISTENT}, {"IsDriverViewEnabled", CLEAR_ON_MANAGER_START}, {"IsEngaged", PERSISTENT}, - {"IsLdwEnabled", PERSISTENT}, - {"IsMetric", PERSISTENT}, + {"IsLdwEnabled", PERSISTENT | BACKUP}, + {"IsMetric", PERSISTENT | BACKUP}, {"IsOffroad", CLEAR_ON_MANAGER_START}, {"IsOnroad", PERSISTENT}, {"IsRhdDetected", PERSISTENT}, @@ -146,7 +146,7 @@ std::unordered_map keys = { {"IsTakingSnapshot", CLEAR_ON_MANAGER_START}, {"IsTestedBranch", CLEAR_ON_MANAGER_START}, {"JoystickDebugMode", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"LanguageSetting", PERSISTENT}, + {"LanguageSetting", PERSISTENT | BACKUP}, {"LastAthenaPingTime", CLEAR_ON_MANAGER_START}, {"LastGPSPosition", PERSISTENT}, {"LastManagerExitReason", CLEAR_ON_MANAGER_START}, @@ -158,7 +158,7 @@ std::unordered_map keys = { {"LiveTorqueParameters", PERSISTENT | DONT_LOG}, {"LocationFilterInitialState", PERSISTENT}, {"LongitudinalManeuverMode", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"LongitudinalPersonality", PERSISTENT}, + {"LongitudinalPersonality", PERSISTENT | BACKUP}, {"NetworkMetered", PERSISTENT}, {"ObdMultiplexingChanged", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"ObdMultiplexingEnabled", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, @@ -174,17 +174,17 @@ std::unordered_map keys = { {"Offroad_TemperatureTooHigh", CLEAR_ON_MANAGER_START}, {"Offroad_UnofficialHardware", CLEAR_ON_MANAGER_START}, {"Offroad_UpdateFailed", CLEAR_ON_MANAGER_START}, - {"OpenpilotEnabledToggle", PERSISTENT}, + {"OpenpilotEnabledToggle", PERSISTENT | BACKUP}, {"PandaHeartbeatLost", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, {"PandaSomResetTriggered", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, {"PandaSignatures", CLEAR_ON_MANAGER_START}, {"PrimeType", PERSISTENT}, - {"RecordFront", PERSISTENT}, + {"RecordFront", PERSISTENT | BACKUP}, {"RecordFrontLock", PERSISTENT}, // for the internal fleet - {"SecOCKey", PERSISTENT | DONT_LOG}, + {"SecOCKey", PERSISTENT | DONT_LOG}, // Candidate for | BACKUP {"RouteCount", PERSISTENT}, {"SnoozeUpdate", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"SshEnabled", PERSISTENT}, + {"SshEnabled", PERSISTENT | BACKUP}, {"TermsVersion", PERSISTENT}, {"TrainingVersion", PERSISTENT}, {"UbloxAvailable", PERSISTENT}, @@ -202,15 +202,20 @@ std::unordered_map keys = { {"Version", PERSISTENT}, // sunnypilot params - {"EnableGithubRunner", PERSISTENT}, - {"Mads", PERSISTENT}, - {"MadsMainCruiseAllowed", PERSISTENT}, - {"MadsPauseLateralOnBrake", PERSISTENT}, - {"MadsUnifiedEngagementMode", PERSISTENT}, + {"EnableGithubRunner", PERSISTENT | BACKUP}, + {"EnableSunnylinkUploader", PERSISTENT | BACKUP}, + {"LastSunnylinkPingTime", CLEAR_ON_MANAGER_START}, + {"Mads", PERSISTENT | BACKUP}, + {"MadsMainCruiseAllowed", PERSISTENT | BACKUP}, + {"MadsPauseLateralOnBrake", PERSISTENT | BACKUP}, + {"MadsUnifiedEngagementMode", PERSISTENT | BACKUP}, {"ModelManager_ActiveBundle", PERSISTENT}, {"ModelManager_DownloadIndex", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION | CLEAR_ON_ONROAD_TRANSITION}, {"ModelManager_LastSyncTime", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"ModelManager_ModelsCache", PERSISTENT}, + {"ModelManager_ModelsCache", PERSISTENT | BACKUP}, + {"SunnylinkDongleId", PERSISTENT}, + {"SunnylinkdPid", PERSISTENT}, + {"SunnylinkEnabled", PERSISTENT}, }; } // namespace diff --git a/common/params.h b/common/params.h index d726a6185..b40371f97 100644 --- a/common/params.h +++ b/common/params.h @@ -16,6 +16,7 @@ enum ParamKeyType { CLEAR_ON_OFFROAD_TRANSITION = 0x10, DONT_LOG = 0x20, DEVELOPMENT_ONLY = 0x40, + BACKUP = 0x80, ALL = 0xFFFFFFFF }; diff --git a/sunnypilot/sunnylink/__init__.py b/sunnypilot/sunnylink/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sunnypilot/sunnylink/api.py b/sunnypilot/sunnylink/api.py new file mode 100644 index 000000000..558a4e8d4 --- /dev/null +++ b/sunnypilot/sunnylink/api.py @@ -0,0 +1,158 @@ +import json +import os +import random +import time +from datetime import datetime, timedelta +from pathlib import Path + +import jwt +from openpilot.common.api.base import BaseApi +from openpilot.common.params import Params +from openpilot.system.hardware import HARDWARE +from openpilot.system.hardware.hw import Paths + +API_HOST = os.getenv('SUNNYLINK_API_HOST', 'https://stg.api.sunnypilot.ai') +UNREGISTERED_SUNNYLINK_DONGLE_ID = "UnregisteredDevice" +MAX_RETRIES = 6 +CRASH_LOG_DIR = '/data/community/crashes' + + +class SunnylinkApi(BaseApi): + def __init__(self, dongle_id): + super().__init__(dongle_id, API_HOST) + self.user_agent = "sunnypilot-" + self.spinner = None + self.params = Params() + + def api_get(self, endpoint, method='GET', timeout=10, access_token=None, **kwargs): + if not self.params.get_bool("SunnylinkEnabled"): + return None + + return super().api_get(endpoint, method, timeout, access_token, **kwargs) + + def resume_queued(self, timeout=10, **kwargs): + sunnylinkId, commaId = self._resolve_dongle_ids() + return self.api_get(f"ws/{sunnylinkId}/resume_queued", "POST", timeout, access_token=self.get_token(), **kwargs) + + def get_token(self, expiry_hours=1): + # Add your additional data here + additional_data = {} + return super()._get_token(expiry_hours, **additional_data) + + def _status_update(self, message): + print(message) + if self.spinner: + self.spinner.update(message) + time.sleep(0.5) + + def _resolve_dongle_ids(self): + sunnylink_dongle_id = self.params.get("SunnylinkDongleId", encoding='utf-8') + comma_dongle_id = self.dongle_id or self.params.get("DongleId", encoding='utf-8') + return sunnylink_dongle_id, comma_dongle_id + + def _resolve_imeis(self): + imei1, imei2 = None, None + imei_try = 0 + while imei1 is None and imei2 is None and imei_try < MAX_RETRIES: + try: + imei1, imei2 = self.params.get("IMEI", encoding='utf8') or HARDWARE.get_imei(0), HARDWARE.get_imei(1) + except Exception: + self._status_update(f"Error getting imei, trying again... [{imei_try + 1}/{MAX_RETRIES}]") + time.sleep(1) + imei_try += 1 + return imei1, imei2 + + def _resolve_serial(self): + return (self.params.get("HardwareSerial", encoding='utf8') + or HARDWARE.get_serial()) + + def register_device(self, spinner=None, timeout=60, verbose=False): + self.spinner = spinner + + sunnylink_dongle_id, comma_dongle_id = self._resolve_dongle_ids() + + if comma_dongle_id is None: + self._status_update("Comma dongle ID not found, deferring sunnylink's registration to comma's registration process.") + return None + + imei1, imei2 = self._resolve_imeis() + serial = self._resolve_serial() + + if sunnylink_dongle_id not in (None, UNREGISTERED_SUNNYLINK_DONGLE_ID): + return sunnylink_dongle_id + + privkey_path = Path(f"{Paths.persist_root()}/comma/id_rsa") + pubkey_path = Path(f"{Paths.persist_root()}/comma/id_rsa.pub") + + start_time = time.monotonic() + successful_registration = False + if not pubkey_path.is_file(): + sunnylink_dongle_id = UNREGISTERED_SUNNYLINK_DONGLE_ID + self._status_update("Public key not found, setting dongle ID to unregistered.") + else: + Params().put("LastSunnylinkPingTime", "0") # Reset the last ping time to 0 if we are trying to register + with pubkey_path.open() as f1, privkey_path.open() as f2: + public_key = f1.read() + private_key = f2.read() + + backoff = 1 + while True: + register_token = jwt.encode({'register': True, 'exp': datetime.utcnow() + timedelta(hours=1)}, private_key, algorithm='RS256') + try: + if verbose or time.monotonic() - start_time < timeout / 2: + self._status_update("Registering device to sunnylink...") + elif time.monotonic() - start_time >= timeout / 2: + self._status_update("Still registering device to sunnylink...") + + resp = self.api_get("v2/pilotauth/", method='POST', timeout=15, imei=imei1, imei2=imei2, serial=serial, + comma_dongle_id=comma_dongle_id, public_key=public_key, register_token=register_token) + + if resp is None: + raise Exception("Unable to register device, request was None") + + if resp.status_code in (409, 412): + timeout = time.monotonic() - start_time # Don't retry if the public key is already in use + key_in_use = "Public key is already in use, is your key unique? Contact your vendor for a new key." + unsafe_key = "Public key is known to not be unique and it's unsafe. Contact your vendor for a new key." + error_message = key_in_use if resp.status_code == 409 else unsafe_key + raise Exception(error_message) + + if resp.status_code != 200: + raise Exception(f"Failed to register with sunnylink. Status code: {resp.status_code}\nData\n:{resp.text}") + + dongleauth = json.loads(resp.text) + sunnylink_dongle_id = dongleauth["device_id"] + if sunnylink_dongle_id: + self._status_update("Device registered successfully.") + successful_registration = True + break + except Exception as e: + if verbose: + self._status_update(f"Waiting {backoff}s before retry, Exception occurred during registration: [{str(e)}]") + + if not os.path.exists(CRASH_LOG_DIR): + os.makedirs(CRASH_LOG_DIR) + + with open(f'{CRASH_LOG_DIR}/error.txt', 'a') as f: + f.write(f"[{datetime.now()}] sunnylink: {str(e)}\n") + + backoff = min(backoff * 2 * (0.5 + random.random()), 60) + time.sleep(backoff) + + if time.monotonic() - start_time > timeout: + self._status_update(f"Giving up on sunnylink's registration after {timeout}s. Will retry on next boot.") + time.sleep(3) + break + + self.params.put("SunnylinkDongleId", sunnylink_dongle_id or UNREGISTERED_SUNNYLINK_DONGLE_ID) + + # Set the last ping time to the current time since we were just talking to the API + last_ping = int(time.monotonic() * 1e9) if successful_registration else start_time + Params().put("LastSunnylinkPingTime", str(last_ping)) + + # Disable sunnylink if registration was not successful + if not successful_registration: + Params().put_bool("SunnylinkEnabled", False) + + self.spinner = None + return sunnylink_dongle_id diff --git a/sunnypilot/sunnylink/athena/__init__.py b/sunnypilot/sunnylink/athena/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sunnypilot/sunnylink/athena/manage_sunnylinkd.py b/sunnypilot/sunnylink/athena/manage_sunnylinkd.py new file mode 100755 index 000000000..377b6990f --- /dev/null +++ b/sunnypilot/sunnylink/athena/manage_sunnylinkd.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from openpilot.system.athena.manage_athenad import manage_athenad + +if __name__ == '__main__': + manage_athenad("SunnylinkDongleId", "SunnylinkdPid", 'sunnylinkd', 'sunnypilot.sunnylink.athena.sunnylinkd') diff --git a/sunnypilot/sunnylink/athena/sunnylinkd.py b/sunnypilot/sunnylink/athena/sunnylinkd.py new file mode 100755 index 000000000..0fb7ea6d9 --- /dev/null +++ b/sunnypilot/sunnylink/athena/sunnylinkd.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import base64 +import gzip +import os +import threading +import time + +from jsonrpc import dispatcher +from openpilot.common.params import Params +from openpilot.common.realtime import set_core_affinity +from openpilot.common.swaglog import cloudlog +from openpilot.system.athena.athenad import ws_send, jsonrpc_handler, \ + recv_queue, UploadQueueCache, upload_queue, cur_upload_items, backoff, ws_manage, log_handler +from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException, + create_connection) + +import cereal.messaging as messaging +from sunnypilot.sunnylink.api import SunnylinkApi +from sunnypilot.sunnylink.utils import sunnylink_need_register, sunnylink_ready + +SUNNYLINK_ATHENA_HOST = os.getenv('SUNNYLINK_ATHENA_HOST', 'wss://ws.stg.api.sunnypilot.ai') +HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) +LOCAL_PORT_WHITELIST = {8022} +SUNNYLINK_LOG_ATTR_NAME = "user.sunny.upload" +SUNNYLINK_RECONNECT_TIMEOUT_S = 70 # FYI changing this will also would require a change on sidebar.cc +DISALLOW_LOG_UPLOAD = threading.Event() + +params = Params() +sunnylink_api = SunnylinkApi(params.get("SunnylinkDongleId", encoding='utf-8')) + + +def handle_long_poll(ws: WebSocket, exit_event: threading.Event | None) -> None: + cloudlog.info("sunnylinkd.handle_long_poll started") + sm = messaging.SubMaster(['deviceState']) + end_event = threading.Event() + comma_prime_cellular_end_event = threading.Event() + + threads = [ + threading.Thread(target=ws_manage, args=(ws, end_event), name='ws_manage'), + threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'), + threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'), + threading.Thread(target=ws_ping, args=(ws, end_event), name='ws_ping'), + threading.Thread(target=ws_queue, args=(end_event,), name='ws_queue'), + # threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), + # threading.Thread(target=sunny_log_handler, args=(end_event, comma_prime_cellular_end_event), name='log_handler'), + # threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'), + ] + [ + threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}') + for x in range(HANDLER_THREADS) + ] + + for thread in threads: + thread.start() + try: + while not end_event.wait(0.1): + if not sunnylink_ready(params): + cloudlog.warning("Exiting sunnylinkd.handle_long_poll as SunnylinkEnabled is False") + break + + sm.update(0) + if exit_event is not None and exit_event.is_set(): + end_event.set() + comma_prime_cellular_end_event.set() + + prime_type = params.get("PrimeType", encoding='utf-8') or 0 + metered = sm['deviceState'].networkMetered + + if DISALLOW_LOG_UPLOAD.is_set() and not comma_prime_cellular_end_event.is_set(): + cloudlog.debug("sunnylinkd.handle_long_poll: DISALLOW_LOG_UPLOAD, setting comma_prime_cellular_end_event") + comma_prime_cellular_end_event.set() + elif metered and int(prime_type) > 2: + cloudlog.debug(f"sunnylinkd.handle_long_poll: PrimeType({prime_type}) > 2 and networkMetered({metered})") + comma_prime_cellular_end_event.set() + elif comma_prime_cellular_end_event.is_set() and not DISALLOW_LOG_UPLOAD.is_set(): + cloudlog.debug( + f"sunnylinkd.handle_long_poll: comma_prime_cellular_end_event is set and not PrimeType({prime_type}) > 2 or not networkMetered({metered})") + comma_prime_cellular_end_event.clear() + finally: + end_event.set() + comma_prime_cellular_end_event.set() + for thread in threads: + cloudlog.debug(f"sunnylinkd athena.joining {thread.name}") + thread.join() + cloudlog.debug(f"sunnylinkd athena.joined {thread.name}") + + +def ws_recv(ws: WebSocket, end_event: threading.Event) -> None: + last_ping = int(time.monotonic() * 1e9) + while not end_event.is_set(): + try: + opcode, data = ws.recv_data(control_frame=True) + if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): + if opcode == ABNF.OPCODE_TEXT: + data = data.decode("utf-8") + recv_queue.put_nowait(data) + cloudlog.debug(f"sunnylinkd.ws_recv.recv {data}") + elif opcode in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG): + cloudlog.debug("sunnylinkd.ws_recv.pong") + last_ping = int(time.monotonic() * 1e9) + Params().put("LastSunnylinkPingTime", str(last_ping)) + except WebSocketTimeoutException: + ns_since_last_ping = int(time.monotonic() * 1e9) - last_ping + if ns_since_last_ping > SUNNYLINK_RECONNECT_TIMEOUT_S * 1e9: + cloudlog.exception("sunnylinkd.ws_recv.timeout") + end_event.set() + except Exception: + cloudlog.exception("sunnylinkd.ws_recv.exception") + end_event.set() + + +def ws_ping(ws: WebSocket, end_event: threading.Event) -> None: + ws.ping() # Send the first ping + while not end_event.wait(SUNNYLINK_RECONNECT_TIMEOUT_S * 0.7): # Sleep about 70% before a timeout + try: + ws.ping() + cloudlog.debug("sunnylinkd.ws_recv.ws_ping: Pinging") + except Exception: + cloudlog.exception("sunnylinkd.ws_ping.exception") + end_event.set() + cloudlog.debug("sunnylinkd.ws_ping.end_event is set, exiting ws_ping thread") + + +def ws_queue(end_event: threading.Event) -> None: + resume_requested = False + tries = 0 + + while not end_event.is_set() and not resume_requested: + try: + if not resume_requested: + cloudlog.debug("sunnylinkd.ws_queue.resume_queued") + sunnylink_api.resume_queued(timeout=29) + resume_requested = True + tries = 0 + except Exception: + cloudlog.exception("sunnylinkd.ws_queue.resume_queued.exception") + resume_requested = False + tries += 1 + time.sleep(backoff(tries)) # Wait for the backoff time before the next attempt + + if end_event.is_set(): + cloudlog.debug("end_event is set, exiting ws_queue thread") + elif resume_requested: + cloudlog.debug(f"Resume requested to server after {tries} tries") + else: + cloudlog.error(f"Reached end of ws_queue while end_event is not set and resume_requested is {resume_requested}") + + +def sunny_log_handler(end_event: threading.Event, comma_prime_cellular_end_event: threading.Event) -> None: + while not end_event.wait(0.1): + if not comma_prime_cellular_end_event.is_set(): + log_handler(comma_prime_cellular_end_event, SUNNYLINK_LOG_ATTR_NAME) + comma_prime_cellular_end_event.set() + + +@dispatcher.add_method +def toggleLogUpload(enabled: bool): + DISALLOW_LOG_UPLOAD.clear() if enabled and DISALLOW_LOG_UPLOAD.is_set() else DISALLOW_LOG_UPLOAD.set() + + +@dispatcher.add_method +def getParamsAllKeys() -> list[str]: + keys: list[str] = [k.decode('utf-8') for k in Params().all_keys()] + return keys + + +@dispatcher.add_method +def getParams(params_keys: list[str], compression: bool = False) -> str | dict[str, str]: + try: + params = Params() + params_dict: dict[str, bytes] = {key: params.get(key) or b'' for key in params_keys} + + # Compress the values before encoding to base64 as output from params.get is bytes and same for compression + if compression: + params_dict = {key: gzip.compress(value) for key, value in params_dict.items()} + + # Last step is to encode the values to base64 and decode to utf-8 for JSON serialization + return {key: base64.b64encode(value).decode('utf-8') for key, value in params_dict.items()} + + except Exception as e: + cloudlog.exception("sunnylinkd.getParams.exception", e) + raise + + +@dispatcher.add_method +def saveParams(params_to_update: dict[str, str], compression: bool = False) -> None: + params = Params() + params_dict = {key: base64.b64decode(value) for key, value in params_to_update.items()} + + if compression: + params_dict = {key: gzip.decompress(value) for key, value in params_dict.items()} + + for key, value in params_dict.items(): + try: + params.put(key, value) + except Exception as e: + cloudlog.error(f"sunnylinkd.saveParams.exception {e}") + + +def main(exit_event: threading.Event = None): + try: + set_core_affinity([0, 1, 2, 3]) + except Exception: + cloudlog.exception("failed to set core affinity") + + while sunnylink_need_register(params): + cloudlog.info("Waiting for sunnylink registration to complete") + time.sleep(10) + + UploadQueueCache.initialize(upload_queue) + + ws_uri = SUNNYLINK_ATHENA_HOST + conn_start = None + conn_retries = 0 + while (exit_event is None or not exit_event.is_set()) and sunnylink_ready(params): + try: + if conn_start is None: + conn_start = time.monotonic() + + cloudlog.event("sunnylinkd.main.connecting_ws", ws_uri=ws_uri, retries=conn_retries) + ws = create_connection( + ws_uri, + cookie=f"jwt={sunnylink_api.get_token()}", + enable_multithread=True, + timeout=SUNNYLINK_RECONNECT_TIMEOUT_S, + ) + cloudlog.event("sunnylinkd.main.connected_ws", ws_uri=ws_uri, retries=conn_retries, + duration=time.monotonic() - conn_start) + conn_start = None + + conn_retries = 0 + cur_upload_items.clear() + + handle_long_poll(ws, exit_event) + except (KeyboardInterrupt, SystemExit): + break + except (ConnectionError, TimeoutError, WebSocketException): + conn_retries += 1 + params.remove("LastSunnylinkPingTime") + except Exception: + cloudlog.exception("sunnylinkd.main.exception") + + conn_retries += 1 + params.remove("LastSunnylinkPingTime") + + time.sleep(backoff(conn_retries)) + + if not sunnylink_ready(params): + cloudlog.debug("Reached end of sunnylinkd.main while sunnylink is not ready. Waiting 60s before retrying") + time.sleep(60) + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/registration_manager.py b/sunnypilot/sunnylink/registration_manager.py new file mode 100755 index 000000000..34323c278 --- /dev/null +++ b/sunnypilot/sunnylink/registration_manager.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +import time + +from openpilot.common.realtime import Ratekeeper +from openpilot.common.swaglog import cloudlog + +from cereal import log, messaging +from sunnypilot.sunnylink.utils import register_sunnylink + +NetworkType = log.DeviceState.NetworkType + + +def main(): + """The main method is expected to be called by the manager when the device boots up.""" + rk = Ratekeeper(.5) + sm = messaging.SubMaster(['deviceState'], poll='deviceState') + while True: + sm.update(1000) + if sm['deviceState'].networkType != NetworkType.none: + break + + cloudlog.info(f"Waiting to become online... {time.monotonic()}") + rk.keep_time() + + register_sunnylink() + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/uploader.py b/sunnypilot/sunnylink/uploader.py new file mode 100755 index 000000000..69ef2cfd8 --- /dev/null +++ b/sunnypilot/sunnylink/uploader.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +import bz2 +import datetime +import io +import json +import os +import random +import threading +import time +import traceback +from collections.abc import Iterator +from typing import BinaryIO + +import requests +from openpilot.common.params import Params +from openpilot.common.realtime import set_core_affinity +from openpilot.common.swaglog import cloudlog +from openpilot.system.hardware.hw import Paths +from openpilot.system.loggerd.xattr_cache import getxattr, setxattr + +import cereal.messaging as messaging +from cereal import log +from sunnypilot.sunnylink.api import SunnylinkApi + +NetworkType = log.DeviceState.NetworkType +UPLOAD_ATTR_NAME = 'user.sunny.upload' + +UPLOAD_ATTR_VALUE = b'1' + +UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB + +allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) +force_wifi = os.getenv("FORCEWIFI") is not None +fake_upload = os.getenv("FAKEUPLOAD") is not None + +OFFROAD_TRANSITION_TIMEOUT = 900. # wait until offroad for 15 minutes before allowing uploads + + +class FakeRequest: + def __init__(self): + self.headers = {"Content-Length": "0"} + + +class FakeResponse: + def __init__(self): + self.status_code = 200 + self.request = FakeRequest() + + +def get_directory_sort(d: str) -> list[str]: + # ensure old format is sorted sooner + o = ["0", ] if d.startswith("2024-") else ["1", ] + return o + [s.rjust(10, '0') for s in d.rsplit('--', 1)] + + +def listdir_by_creation(d: str) -> list[str]: + if not os.path.isdir(d): + return [] + + try: + paths = [f for f in os.listdir(d) if os.path.isdir(os.path.join(d, f))] + paths = sorted(paths, key=get_directory_sort) + return paths + except OSError: + cloudlog.exception("listdir_by_creation failed") + return [] + + +def clear_locks(root: str) -> None: + for logdir in os.listdir(root): + path = os.path.join(root, logdir) + try: + for fname in os.listdir(path): + if fname.endswith(".lock"): + os.unlink(os.path.join(path, fname)) + except OSError: + cloudlog.exception("clear_locks failed") + + +class Uploader: + def __init__(self, dongle_id: str, root: str): + self.dongle_id = dongle_id + self.api = SunnylinkApi(dongle_id) + self.root = root + + self.params = Params() + + # stats for last successfully uploaded file + self.last_filename = "" + + self.immediate_folders = ["crash/", "boot/"] + self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} + + def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]: + r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8") + requested_routes = [] if r is None else r.split(",") + + for logdir in listdir_by_creation(self.root): + path = os.path.join(self.root, logdir) + try: + names = os.listdir(path) + except OSError: + continue + + if any(name.endswith(".lock") for name in names): + continue + + for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): + key = os.path.join(logdir, name) + fn = os.path.join(path, name) + # skip files already uploaded + try: + ctime = os.path.getctime(fn) + is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE + except OSError: + cloudlog.event("uploader_getxattr_failed", key=key, fn=fn) + # deleter could have deleted, so skip + continue + if is_uploaded: + continue + + # limit uploading on metered connections + if metered: + dt = datetime.timedelta(hours=12) + if logdir in self.immediate_folders and (datetime.datetime.now() - datetime.datetime.fromtimestamp(ctime)) < dt: + continue + + if name == "qcamera.ts" and not any(logdir.startswith(r.split('|')[-1]) for r in requested_routes): + continue + + yield name, key, fn + + def next_file_to_upload(self, metered: bool) -> tuple[str, str, str] | None: + upload_files = list(self.list_upload_files(metered)) + + for name, key, fn in upload_files: + if any(f in fn for f in self.immediate_folders): + return name, key, fn + + return next( + ((name, key, fn) + for name, key, fn in upload_files if name in self.immediate_priority), + None, + ) + + def do_upload(self, key: str, fn: str): + url_resp = self.api.get( + f"device/{self.dongle_id}/upload_url/", + timeout=10, + path=key, + access_token=self.api.get_token(), + ) + if url_resp.status_code == 412: + return url_resp + + url_resp_json = json.loads(url_resp.text) + url = url_resp_json['url'] + headers = url_resp_json['headers'] + cloudlog.debug("sunnylink upload_url %s | Headers: %s", url, headers) + + if fake_upload: + return FakeResponse() + + with open(fn, "rb") as f: + data: BinaryIO + if key.endswith('.bz2') and not fn.endswith('.bz2'): + compressed = bz2.compress(f.read()) + data = io.BytesIO(compressed) + else: + data = f + + return requests.put(url, data=data, headers=headers, timeout=10) + + def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: + try: + sz = os.path.getsize(fn) + except OSError: + cloudlog.exception("upload: getsize failed") + return False + + cloudlog.event("upload_start", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + + if sz == 0: + # tag files of 0 size as uploaded + success = True + elif name in self.immediate_priority and sz > UPLOAD_QLOG_QCAM_MAX_SIZE: + cloudlog.event("uploader_too_large", key=key, fn=fn, sz=sz) + success = True + else: + start_time = time.monotonic() + + stat = None + last_exc = None + try: + stat = self.do_upload(key, fn) + except Exception as e: + last_exc = (e, traceback.format_exc()) + + if stat is not None and stat.status_code in (200, 201, 412): + self.last_filename = fn + dt = time.monotonic() - start_time + if stat.status_code == 412: + cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + else: + content_length = int(stat.request.headers.get("Content-Length", 0)) + speed = (content_length / 1e6) / dt + cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, + network_type=network_type, metered=metered, speed=speed) + success = True + elif stat is not None: # 401, 403... Not sure why they were up to begin with + success = False + cloudlog.event("upload_failed with content", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered, + error=stat.content.decode("utf-8")) + else: + success = False + cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + + if success: + # tag file as uploaded + try: + setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) + except OSError: + cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz) + + return success + + def step(self, network_type: int, metered: bool) -> bool | None: + d = self.next_file_to_upload(metered) + if d is None: + return None + + name, key, fn = d + + # qlogs and bootlogs need to be compressed before uploading + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): + key += ".bz2" + + return self.upload(name, key, fn, network_type, metered) + + +def main(exit_event: threading.Event = None) -> None: + if exit_event is None: + exit_event = threading.Event() + + try: + set_core_affinity([0, 1, 2, 3]) + except Exception: + cloudlog.exception("failed to set core affinity") + + clear_locks(Paths.log_root()) + + params = Params() + dongle_id = params.get("SunnylinkDongleId", encoding='utf8') + + offroad_transition_prev = 0. + offroad_last = False + + if dongle_id is None: + cloudlog.info("uploader missing dongle_id") + raise Exception("uploader can't start without dongle id") + + sm = messaging.SubMaster(['deviceState']) + uploader = Uploader(dongle_id, Paths.log_root()) + + backoff = 0.1 + while not exit_event.is_set(): + sm.update(0) + + offroad = params.get_bool("IsOffroad") + t = time.monotonic() + if offroad and not offroad_last and t > 300.: + offroad_transition_prev = time.monotonic() + offroad_last = offroad + + network_type = NetworkType.wifi if force_wifi else sm['deviceState'].networkType + if network_type == NetworkType.none: + if allow_sleep: + time.sleep(60 if offroad else 5) + continue + + if params.get_bool("DisableOnroadUploads"): + if not offroad or (offroad_transition_prev > 0. and t - offroad_transition_prev < OFFROAD_TRANSITION_TIMEOUT): + if not offroad: + cloudlog.info("not uploading: onroad uploads disabled") + else: + wait_minutes = int(OFFROAD_TRANSITION_TIMEOUT / 60) + time_left = OFFROAD_TRANSITION_TIMEOUT - (t - offroad_transition_prev) + if time_left > 2.0 * 60.0: + time_left_str = f"{int(time_left / 60)} minute(s)" + else: + time_left_str = f"{int(time_left)} seconds(s)" + cloudlog.info(f"not uploading: waiting until offroad for {wait_minutes} minutes; {time_left_str} left") + if allow_sleep: + time.sleep(60) + continue + + success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) + if success is None: + backoff = 60 if offroad else 5 + elif success: + backoff = 0.1 + else: + cloudlog.info("upload backoff %r", backoff) + backoff = min(backoff * 2, 120) + if allow_sleep: + time.sleep(backoff + random.uniform(0, backoff)) + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/utils.py b/sunnypilot/sunnylink/utils.py new file mode 100644 index 000000000..57523d739 --- /dev/null +++ b/sunnypilot/sunnylink/utils.py @@ -0,0 +1,48 @@ +from sunnypilot.sunnylink.api import SunnylinkApi, UNREGISTERED_SUNNYLINK_DONGLE_ID +from openpilot.common.params import Params +from openpilot.system.version import is_prebuilt + + +def get_sunnylink_status(params=None) -> tuple[bool, bool]: + """Get the status of Sunnylink on the device. Returns a tuple of (is_sunnylink_enabled, is_registered).""" + params = params or Params() + is_sunnylink_enabled = params.get_bool("SunnylinkEnabled") + is_registered = params.get("SunnylinkDongleId", encoding='utf-8') not in (None, UNREGISTERED_SUNNYLINK_DONGLE_ID) + return is_sunnylink_enabled, is_registered + + +def sunnylink_ready(params=None) -> bool: + """Check if the device is ready to communicate with Sunnylink. That means it is enabled and registered.""" + params = params or Params() + is_sunnylink_enabled, is_registered = get_sunnylink_status(params) + return is_sunnylink_enabled and is_registered + + +def use_sunnylink_uploader(params) -> bool: + """Check if the device is ready to use Sunnylink and the uploader is enabled.""" + return sunnylink_ready(params) and params.get_bool("EnableSunnylinkUploader") + + +def sunnylink_need_register(params=None) -> bool: + """Check if the device needs to be registered with Sunnylink.""" + params = params or Params() + is_sunnylink_enabled, is_registered = get_sunnylink_status(params) + return is_sunnylink_enabled and not is_registered + + +def register_sunnylink(): + """Register the device with Sunnylink if it is enabled.""" + extra_args = {} + + if not Params().get_bool("SunnylinkEnabled"): + print("Sunnylink is not enabled. Exiting.") + exit(0) + + if not is_prebuilt(): + extra_args = { + "verbose": True, + "timeout": 60 + } + + sunnylink_id = SunnylinkApi(None).register_device(None, **extra_args) + print(f"SunnyLinkId: {sunnylink_id}") diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 2f455981c..f35394102 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -14,6 +14,7 @@ import sys import tempfile import threading import time +import gzip import zstandard as zstd from dataclasses import asdict, dataclass, replace from datetime import datetime @@ -105,6 +106,7 @@ cancelled_uploads: set[str] = set() cur_upload_items: dict[int, UploadItem | None] = {} +# TODO-SP: adapt zst for sunnylink def strip_zst_extension(fn: str) -> str: if fn.endswith('.zst'): return fn[:-4] @@ -334,6 +336,19 @@ def getVersion() -> dict[str, str]: } +@dispatcher.add_method +def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str = None, place_details: str = None) -> dict[str, int]: + destination = { + "latitude": latitude, + "longitude": longitude, + "place_name": place_name, + "place_details": place_details, + } + Params().put("NavDestination", json.dumps(destination)) + + return {"success": 1} + + def scan_dir(path: str, prefix: str) -> list[str]: files = [] # only walk directories that match the prefix @@ -547,7 +562,7 @@ def takeSnapshot() -> str | dict[str, str] | None: raise Exception("not available while camerad is started") -def get_logs_to_send_sorted() -> list[str]: +def get_logs_to_send_sorted(log_attr_name=LOG_ATTR_NAME) -> list[str]: # TODO: scan once then use inotify to detect file creation/deletion curr_time = int(time.time()) logs = [] @@ -555,7 +570,7 @@ def get_logs_to_send_sorted() -> list[str]: log_path = os.path.join(Paths.swaglog_root(), log_entry) time_sent = 0 try: - value = getxattr(log_path, LOG_ATTR_NAME) + value = getxattr(log_path, log_attr_name) if value is not None: time_sent = int.from_bytes(value, sys.byteorder) except (ValueError, TypeError): @@ -567,8 +582,69 @@ def get_logs_to_send_sorted() -> list[str]: return sorted(logs)[:-1] -def log_handler(end_event: threading.Event) -> None: +def add_log_to_queue(log_path, log_id, is_sunnylink=False): + MAX_SIZE_KB = 32 + MAX_SIZE_BYTES = MAX_SIZE_KB * 1024 + + with open(log_path) as f: + data = f.read() + + # Check if the file is empty + if not data: + cloudlog.warning(f"Log file {log_path} is empty.") + return + + # Initialize variables for encoding + payload = data + is_compressed = False + + # Log the current size of the file + current_size = len(json.dumps(payload).encode("utf-8")) + len(log_id.encode("utf-8")) + 100 # Add 100 bytes to account for encoding overhead + cloudlog.debug(f"Current size of log file {log_path}: {current_size} bytes") + + if is_sunnylink and current_size > MAX_SIZE_BYTES: + # Compress and encode the data if it exceeds the maximum size + compressed_data = gzip.compress(data.encode()) + payload = base64.b64encode(compressed_data).decode() + is_compressed = True + + # Log the size after compression and encoding + compressed_size = len(compressed_data) + encoded_size = len(payload) + cloudlog.debug(f"Size of log file {log_path} " + + f"after compression: {compressed_size} bytes, " + + f"after encoding: {encoded_size} bytes") + + jsonrpc = { + "method": "forwardLogs", + "params": { + "logs": payload + }, + "jsonrpc": "2.0", + "id": log_id + } + + if is_sunnylink and is_compressed: + jsonrpc["params"]["compressed"] = is_compressed + + jsonrpc_str = json.dumps(jsonrpc) + size_in_bytes = len(jsonrpc_str.encode('utf-8')) + + if is_sunnylink and size_in_bytes <= MAX_SIZE_BYTES: + cloudlog.debug(f"Target is sunnylink and log file {log_path} is small enough to send in one request ({size_in_bytes} bytes).") + low_priority_send_queue.put_nowait(jsonrpc_str) + elif is_sunnylink: + cloudlog.warning(f"Target is sunnylink and log file {log_path} is too large to send in one request.") + else: + cloudlog.debug(f"Target is not sunnylink, proceeding to send log file {log_path} in one request ({size_in_bytes} bytes).") + low_priority_send_queue.put_nowait(jsonrpc_str) + + +def log_handler(end_event: threading.Event, log_attr_name=LOG_ATTR_NAME) -> None: + is_sunnylink = log_attr_name != LOG_ATTR_NAME if PC: + cloudlog.debug("athena.log_handler: Not supported on PC") + time.sleep(1) return log_files = [] @@ -577,7 +653,7 @@ def log_handler(end_event: threading.Event) -> None: try: curr_scan = time.monotonic() if curr_scan - last_scan > 10: - log_files = get_logs_to_send_sorted() + log_files = get_logs_to_send_sorted(log_attr_name) last_scan = curr_scan # send one log @@ -588,18 +664,10 @@ def log_handler(end_event: threading.Event) -> None: try: curr_time = int(time.time()) log_path = os.path.join(Paths.swaglog_root(), log_entry) - setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) - with open(log_path) as f: - jsonrpc = { - "method": "forwardLogs", - "params": { - "logs": f.read() - }, - "jsonrpc": "2.0", - "id": log_entry - } - low_priority_send_queue.put_nowait(json.dumps(jsonrpc)) - curr_log = log_entry + setxattr(log_path, log_attr_name, int.to_bytes(curr_time, 4, sys.byteorder)) + + add_log_to_queue(log_path, log_entry, is_sunnylink) + curr_log = log_entry except OSError: pass # file could be deleted by log rotation @@ -616,7 +684,7 @@ def log_handler(end_event: threading.Event) -> None: if log_entry and log_success: log_path = os.path.join(Paths.swaglog_root(), log_entry) try: - setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) + setxattr(log_path, log_attr_name, LOG_ATTR_VALUE_MAX_UNIX_TIME) except OSError: pass # file could be deleted by log rotation if curr_log == log_entry: @@ -752,23 +820,24 @@ def ws_manage(ws: WebSocket, end_event: threading.Event) -> None: onroad_prev = None sock = ws.sock - while True: + while not end_event.wait(5): onroad = params.get_bool("IsOnroad") if onroad != onroad_prev: onroad_prev = onroad if sock is not None: - # While not sending data, onroad, we can expect to time out in 7 + (7 * 2) = 21s - # offroad, we can expect to time out in 30 + (10 * 3) = 60s - # FIXME: TCP_USER_TIMEOUT is effectively 2x for some reason (32s), so it's mostly unused - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, 16000 if onroad else 0) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30) + if sys.platform == 'darwin': # macOS + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 7 if onroad else 30) + else: + # While not sending data, onroad, we can expect to time out in 7 + (7 * 2) = 21s + # offroad, we can expect to time out in 30 + (10 * 3) = 60s + # FIXME: TCP_USER_TIMEOUT is effectively 2x for some reason (32s), so it's mostly unused + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, 16000 if onroad else 0) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 7 if onroad else 10) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 if onroad else 3) - if end_event.wait(5): - break - def backoff(retries: int) -> int: return random.randrange(0, min(128, int(2 ** retries))) diff --git a/system/athena/manage_athenad.py b/system/athena/manage_athenad.py index f5ab81720..7158cd922 100755 --- a/system/athena/manage_athenad.py +++ b/system/athena/manage_athenad.py @@ -13,8 +13,12 @@ ATHENA_MGR_PID_PARAM = "AthenadPid" def main(): + manage_athenad("DongleId", ATHENA_MGR_PID_PARAM, 'athenad', 'system.athena.athenad') + + +def manage_athenad(dongle_id_param, pid_param, process_name, target): params = Params() - dongle_id = params.get("DongleId").decode('utf-8') + dongle_id = params.get(dongle_id_param, encoding='utf-8') build_metadata = get_build_metadata() cloudlog.bind_global(dongle_id=dongle_id, @@ -27,17 +31,16 @@ def main(): try: while 1: - cloudlog.info("starting athena daemon") - proc = Process(name='athenad', target=launcher, args=('system.athena.athenad', 'athenad')) + cloudlog.info(f"starting {process_name} daemon") + proc = Process(name=process_name, target=launcher, args=(target, process_name)) proc.start() proc.join() - cloudlog.event("athenad exited", exitcode=proc.exitcode) + cloudlog.event(f"{process_name} exited", exitcode=proc.exitcode) time.sleep(5) except Exception: - cloudlog.exception("manage_athenad.exception") + cloudlog.exception(f"manage_{process_name}.exception") finally: - params.remove(ATHENA_MGR_PID_PARAM) - + params.remove(pid_param) if __name__ == '__main__': main() diff --git a/system/manager/process_config.py b/system/manager/process_config.py index 9d94f6423..9712b63e0 100644 --- a/system/manager/process_config.py +++ b/system/manager/process_config.py @@ -5,6 +5,7 @@ from cereal import car from openpilot.common.params import Params from openpilot.system.hardware import PC, TICI from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess +from sunnypilot.sunnylink.utils import sunnylink_need_register, sunnylink_ready, use_sunnylink_uploader WEBCAM = os.getenv("USE_WEBCAM") is not None @@ -57,6 +58,18 @@ def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool: def use_github_runner(started, params, CP: car.CarParams) -> bool: return not PC and params.get_bool("EnableGithubRunner") and not params.get_bool("NetworkMetered") +def sunnylink_ready_shim(started, params, CP: car.CarParams) -> bool: + """Shim for sunnylink_ready to match the process manager signature.""" + return sunnylink_ready(params) + +def sunnylink_need_register_shim(started, params, CP: car.CarParams) -> bool: + """Shim for sunnylink_need_register to match the process manager signature.""" + return sunnylink_need_register(params) + +def use_sunnylink_uploader_shim(started, params, CP: car.CarParams) -> bool: + """Shim for use_sunnylink_uploader to match the process manager signature.""" + return use_sunnylink_uploader(params) + def or_(*fns): return lambda *args: operator.or_(*(fn(*args) for fn in fns)) @@ -112,6 +125,10 @@ procs = [ PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar), PythonProcess("webjoystick", "tools.bodyteleop.web", notcar), PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)), + + # sunnylink <3 + DaemonProcess("manage_sunnylinkd", "sunnypilot.sunnylink.athena.manage_sunnylinkd", "SunnylinkdPid"), + PythonProcess("sunnylink_registration_manager", "sunnypilot.sunnylink.registration_manager", sunnylink_need_register_shim), ] # sunnypilot @@ -122,4 +139,7 @@ procs += [ if os.path.exists("./github_runner.sh"): procs += [NativeProcess("github_runner_start", "system/manager", ["./github_runner.sh", "start"], and_(only_offroad, use_github_runner), sigkill=False)] +if os.path.exists("../sunnypilot/sunnylink/uploader.py"): + procs += [PythonProcess("sunnylink_uploader", "sunnypilot.sunnylink.uploader", use_sunnylink_uploader_shim)] + managed_processes = {p.name: p for p in procs}