diff --git a/common/api/__init__.py b/common/api/__init__.py index ac231400a..554276080 100644 --- a/common/api/__init__.py +++ b/common/api/__init__.py @@ -1,46 +1,27 @@ -import jwt -import os -import requests -from datetime import datetime, timedelta, UTC -from openpilot.system.hardware.hw import Paths -from openpilot.system.version import get_version +from openpilot.common.api.comma_connect import CommaConnectApi +from sunnypilot.sunnylink.api import SunnylinkApi -API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') class Api: - def __init__(self, dongle_id): - self.dongle_id = dongle_id - with open(Paths.persist_root()+'/comma/id_rsa') as f: - self.private_key = f.read() + def __init__(self, dongle_id, use_sunnylink=False): + if use_sunnylink: + self.service = SunnylinkApi(dongle_id) + else: + self.service = CommaConnectApi(dongle_id) + + def request(self, method, endpoint, **params): + return self.service.request(method, endpoint, **params) def get(self, *args, **kwargs): - return self.request('GET', *args, **kwargs) + return self.service.get(*args, **kwargs) def post(self, *args, **kwargs): - return self.request('POST', *args, **kwargs) - - def request(self, method, endpoint, timeout=None, access_token=None, **params): - return api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params) + return self.service.post(*args, **kwargs) def get_token(self, expiry_hours=1): - now = datetime.now(UTC).replace(tzinfo=None) - payload = { - 'identity': self.dongle_id, - 'nbf': now, - 'iat': now, - 'exp': now + timedelta(hours=expiry_hours) - } - token = jwt.encode(payload, self.private_key, algorithm='RS256') - if isinstance(token, bytes): - token = token.decode('utf8') - return token + return self.service.get_token(expiry_hours) -def api_get(endpoint, method='GET', timeout=None, access_token=None, **params): - headers = {} - if access_token is not None: - headers['Authorization'] = "JWT " + access_token - - headers['User-Agent'] = "openpilot-" + get_version() - - return requests.request(method, API_HOST + "/" + endpoint, timeout=timeout, headers=headers, params=params) +def api_get(endpoint, method='GET', timeout=None, access_token=None, use_sunnylink=False, **params): + return SunnylinkApi(None).api_get(endpoint, method, timeout, access_token, **params) if use_sunnylink \ + else CommaConnectApi(None).api_get(endpoint, method, timeout, access_token, **params) diff --git a/common/api/base.py b/common/api/base.py new file mode 100644 index 000000000..0707c1cca --- /dev/null +++ b/common/api/base.py @@ -0,0 +1,56 @@ +import jwt +import requests +import unicodedata +from datetime import datetime, timedelta, UTC +from openpilot.system.hardware.hw import Paths +from openpilot.system.version import get_version + + +class BaseApi: + def __init__(self, dongle_id, api_host, user_agent="openpilot-"): + self.dongle_id = dongle_id + self.api_host = api_host + self.user_agent = user_agent + with open(f'{Paths.persist_root()}/comma/id_rsa') as f: + self.private_key = f.read() + + def get(self, *args, **kwargs): + return self.request('GET', *args, **kwargs) + + def post(self, *args, **kwargs): + return self.request('POST', *args, **kwargs) + + def request(self, method, endpoint, timeout=None, access_token=None, **params): + return self.api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params) + + def _get_token(self, expiry_hours=1, **extra_payload): + now = datetime.now(UTC).replace(tzinfo=None) + payload = { + 'identity': self.dongle_id, + 'nbf': now, + 'iat': now, + 'exp': now + timedelta(hours=expiry_hours), + **extra_payload + } + token = jwt.encode(payload, self.private_key, algorithm='RS256') + if isinstance(token, bytes): + token = token.decode('utf8') + return token + + def get_token(self, expiry_hours=1): + return self._get_token(expiry_hours) + + def remove_non_ascii_chars(self, text): + normalized_text = unicodedata.normalize('NFD', text) + ascii_encoded_text = normalized_text.encode('ascii', 'ignore') + return ascii_encoded_text.decode() + + def api_get(self, endpoint, method='GET', timeout=None, access_token=None, **params): + headers = {} + if access_token is not None: + headers['Authorization'] = "JWT " + access_token + + version = self.remove_non_ascii_chars(get_version()) + headers['User-Agent'] = self.user_agent + version + + return requests.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, params=params) diff --git a/common/api/comma_connect.py b/common/api/comma_connect.py new file mode 100644 index 000000000..1c705f372 --- /dev/null +++ b/common/api/comma_connect.py @@ -0,0 +1,11 @@ +import os + +from openpilot.common.api.base import BaseApi + +API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') + + +class CommaConnectApi(BaseApi): + def __init__(self, dongle_id): + super().__init__(dongle_id, API_HOST) + self.user_agent = "openpilot-" diff --git a/common/params.cc b/common/params.cc index 3eccd937e..85dc56463 100644 --- a/common/params.cc +++ b/common/params.cc @@ -109,36 +109,36 @@ std::unordered_map keys = { {"CurrentBootlog", PERSISTENT}, {"CurrentRoute", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"DisableLogging", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, - {"DisablePowerDown", PERSISTENT}, - {"DisableUpdates", PERSISTENT}, - {"DisengageOnAccelerator", PERSISTENT}, + {"DisablePowerDown", PERSISTENT | BACKUP}, + {"DisableUpdates", PERSISTENT | BACKUP}, + {"DisengageOnAccelerator", PERSISTENT | BACKUP}, {"DongleId", PERSISTENT}, {"DoReboot", CLEAR_ON_MANAGER_START}, {"DoShutdown", CLEAR_ON_MANAGER_START}, {"DoUninstall", CLEAR_ON_MANAGER_START}, - {"ExperimentalLongitudinalEnabled", PERSISTENT | DEVELOPMENT_ONLY}, - {"ExperimentalMode", PERSISTENT}, - {"ExperimentalModeConfirmed", PERSISTENT}, + {"ExperimentalLongitudinalEnabled", PERSISTENT | DEVELOPMENT_ONLY | BACKUP}, + {"ExperimentalMode", PERSISTENT | BACKUP}, + {"ExperimentalModeConfirmed", PERSISTENT | BACKUP}, {"FirmwareQueryDone", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"ForcePowerDown", PERSISTENT}, {"GitBranch", PERSISTENT}, {"GitCommit", PERSISTENT}, {"GitCommitDate", PERSISTENT}, {"GitDiff", PERSISTENT}, - {"GithubSshKeys", PERSISTENT}, - {"GithubUsername", PERSISTENT}, + {"GithubSshKeys", PERSISTENT | BACKUP}, + {"GithubUsername", PERSISTENT | BACKUP}, {"GitRemote", PERSISTENT}, - {"GsmApn", PERSISTENT}, - {"GsmMetered", PERSISTENT}, - {"GsmRoaming", PERSISTENT}, + {"GsmApn", PERSISTENT | BACKUP}, + {"GsmMetered", PERSISTENT | BACKUP}, + {"GsmRoaming", PERSISTENT | BACKUP}, {"HardwareSerial", PERSISTENT}, {"HasAcceptedTerms", PERSISTENT}, {"IMEI", PERSISTENT}, {"InstallDate", PERSISTENT}, {"IsDriverViewEnabled", CLEAR_ON_MANAGER_START}, {"IsEngaged", PERSISTENT}, - {"IsLdwEnabled", PERSISTENT}, - {"IsMetric", PERSISTENT}, + {"IsLdwEnabled", PERSISTENT | BACKUP}, + {"IsMetric", PERSISTENT | BACKUP}, {"IsOffroad", CLEAR_ON_MANAGER_START}, {"IsOnroad", PERSISTENT}, {"IsRhdDetected", PERSISTENT}, @@ -146,7 +146,7 @@ std::unordered_map keys = { {"IsTakingSnapshot", CLEAR_ON_MANAGER_START}, {"IsTestedBranch", CLEAR_ON_MANAGER_START}, {"JoystickDebugMode", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"LanguageSetting", PERSISTENT}, + {"LanguageSetting", PERSISTENT | BACKUP}, {"LastAthenaPingTime", CLEAR_ON_MANAGER_START}, {"LastGPSPosition", PERSISTENT}, {"LastManagerExitReason", CLEAR_ON_MANAGER_START}, @@ -158,7 +158,7 @@ std::unordered_map keys = { {"LiveTorqueParameters", PERSISTENT | DONT_LOG}, {"LocationFilterInitialState", PERSISTENT}, {"LongitudinalManeuverMode", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"LongitudinalPersonality", PERSISTENT}, + {"LongitudinalPersonality", PERSISTENT | BACKUP}, {"NetworkMetered", PERSISTENT}, {"ObdMultiplexingChanged", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, {"ObdMultiplexingEnabled", CLEAR_ON_MANAGER_START | CLEAR_ON_ONROAD_TRANSITION}, @@ -174,17 +174,17 @@ std::unordered_map keys = { {"Offroad_TemperatureTooHigh", CLEAR_ON_MANAGER_START}, {"Offroad_UnofficialHardware", CLEAR_ON_MANAGER_START}, {"Offroad_UpdateFailed", CLEAR_ON_MANAGER_START}, - {"OpenpilotEnabledToggle", PERSISTENT}, + {"OpenpilotEnabledToggle", PERSISTENT | BACKUP}, {"PandaHeartbeatLost", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, {"PandaSomResetTriggered", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, {"PandaSignatures", CLEAR_ON_MANAGER_START}, {"PrimeType", PERSISTENT}, - {"RecordFront", PERSISTENT}, + {"RecordFront", PERSISTENT | BACKUP}, {"RecordFrontLock", PERSISTENT}, // for the internal fleet - {"SecOCKey", PERSISTENT | DONT_LOG}, + {"SecOCKey", PERSISTENT | DONT_LOG}, // Candidate for | BACKUP {"RouteCount", PERSISTENT}, {"SnoozeUpdate", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"SshEnabled", PERSISTENT}, + {"SshEnabled", PERSISTENT | BACKUP}, {"TermsVersion", PERSISTENT}, {"TrainingVersion", PERSISTENT}, {"UbloxAvailable", PERSISTENT}, @@ -202,15 +202,20 @@ std::unordered_map keys = { {"Version", PERSISTENT}, // sunnypilot params - {"EnableGithubRunner", PERSISTENT}, - {"Mads", PERSISTENT}, - {"MadsMainCruiseAllowed", PERSISTENT}, - {"MadsPauseLateralOnBrake", PERSISTENT}, - {"MadsUnifiedEngagementMode", PERSISTENT}, + {"EnableGithubRunner", PERSISTENT | BACKUP}, + {"EnableSunnylinkUploader", PERSISTENT | BACKUP}, + {"LastSunnylinkPingTime", CLEAR_ON_MANAGER_START}, + {"Mads", PERSISTENT | BACKUP}, + {"MadsMainCruiseAllowed", PERSISTENT | BACKUP}, + {"MadsPauseLateralOnBrake", PERSISTENT | BACKUP}, + {"MadsUnifiedEngagementMode", PERSISTENT | BACKUP}, {"ModelManager_ActiveBundle", PERSISTENT}, {"ModelManager_DownloadIndex", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION | CLEAR_ON_ONROAD_TRANSITION}, {"ModelManager_LastSyncTime", CLEAR_ON_MANAGER_START | CLEAR_ON_OFFROAD_TRANSITION}, - {"ModelManager_ModelsCache", PERSISTENT}, + {"ModelManager_ModelsCache", PERSISTENT | BACKUP}, + {"SunnylinkDongleId", PERSISTENT}, + {"SunnylinkdPid", PERSISTENT}, + {"SunnylinkEnabled", PERSISTENT}, }; } // namespace diff --git a/common/params.h b/common/params.h index d726a6185..b40371f97 100644 --- a/common/params.h +++ b/common/params.h @@ -16,6 +16,7 @@ enum ParamKeyType { CLEAR_ON_OFFROAD_TRANSITION = 0x10, DONT_LOG = 0x20, DEVELOPMENT_ONLY = 0x40, + BACKUP = 0x80, ALL = 0xFFFFFFFF }; diff --git a/sunnypilot/sunnylink/__init__.py b/sunnypilot/sunnylink/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sunnypilot/sunnylink/api.py b/sunnypilot/sunnylink/api.py new file mode 100644 index 000000000..558a4e8d4 --- /dev/null +++ b/sunnypilot/sunnylink/api.py @@ -0,0 +1,158 @@ +import json +import os +import random +import time +from datetime import datetime, timedelta +from pathlib import Path + +import jwt +from openpilot.common.api.base import BaseApi +from openpilot.common.params import Params +from openpilot.system.hardware import HARDWARE +from openpilot.system.hardware.hw import Paths + +API_HOST = os.getenv('SUNNYLINK_API_HOST', 'https://stg.api.sunnypilot.ai') +UNREGISTERED_SUNNYLINK_DONGLE_ID = "UnregisteredDevice" +MAX_RETRIES = 6 +CRASH_LOG_DIR = '/data/community/crashes' + + +class SunnylinkApi(BaseApi): + def __init__(self, dongle_id): + super().__init__(dongle_id, API_HOST) + self.user_agent = "sunnypilot-" + self.spinner = None + self.params = Params() + + def api_get(self, endpoint, method='GET', timeout=10, access_token=None, **kwargs): + if not self.params.get_bool("SunnylinkEnabled"): + return None + + return super().api_get(endpoint, method, timeout, access_token, **kwargs) + + def resume_queued(self, timeout=10, **kwargs): + sunnylinkId, commaId = self._resolve_dongle_ids() + return self.api_get(f"ws/{sunnylinkId}/resume_queued", "POST", timeout, access_token=self.get_token(), **kwargs) + + def get_token(self, expiry_hours=1): + # Add your additional data here + additional_data = {} + return super()._get_token(expiry_hours, **additional_data) + + def _status_update(self, message): + print(message) + if self.spinner: + self.spinner.update(message) + time.sleep(0.5) + + def _resolve_dongle_ids(self): + sunnylink_dongle_id = self.params.get("SunnylinkDongleId", encoding='utf-8') + comma_dongle_id = self.dongle_id or self.params.get("DongleId", encoding='utf-8') + return sunnylink_dongle_id, comma_dongle_id + + def _resolve_imeis(self): + imei1, imei2 = None, None + imei_try = 0 + while imei1 is None and imei2 is None and imei_try < MAX_RETRIES: + try: + imei1, imei2 = self.params.get("IMEI", encoding='utf8') or HARDWARE.get_imei(0), HARDWARE.get_imei(1) + except Exception: + self._status_update(f"Error getting imei, trying again... [{imei_try + 1}/{MAX_RETRIES}]") + time.sleep(1) + imei_try += 1 + return imei1, imei2 + + def _resolve_serial(self): + return (self.params.get("HardwareSerial", encoding='utf8') + or HARDWARE.get_serial()) + + def register_device(self, spinner=None, timeout=60, verbose=False): + self.spinner = spinner + + sunnylink_dongle_id, comma_dongle_id = self._resolve_dongle_ids() + + if comma_dongle_id is None: + self._status_update("Comma dongle ID not found, deferring sunnylink's registration to comma's registration process.") + return None + + imei1, imei2 = self._resolve_imeis() + serial = self._resolve_serial() + + if sunnylink_dongle_id not in (None, UNREGISTERED_SUNNYLINK_DONGLE_ID): + return sunnylink_dongle_id + + privkey_path = Path(f"{Paths.persist_root()}/comma/id_rsa") + pubkey_path = Path(f"{Paths.persist_root()}/comma/id_rsa.pub") + + start_time = time.monotonic() + successful_registration = False + if not pubkey_path.is_file(): + sunnylink_dongle_id = UNREGISTERED_SUNNYLINK_DONGLE_ID + self._status_update("Public key not found, setting dongle ID to unregistered.") + else: + Params().put("LastSunnylinkPingTime", "0") # Reset the last ping time to 0 if we are trying to register + with pubkey_path.open() as f1, privkey_path.open() as f2: + public_key = f1.read() + private_key = f2.read() + + backoff = 1 + while True: + register_token = jwt.encode({'register': True, 'exp': datetime.utcnow() + timedelta(hours=1)}, private_key, algorithm='RS256') + try: + if verbose or time.monotonic() - start_time < timeout / 2: + self._status_update("Registering device to sunnylink...") + elif time.monotonic() - start_time >= timeout / 2: + self._status_update("Still registering device to sunnylink...") + + resp = self.api_get("v2/pilotauth/", method='POST', timeout=15, imei=imei1, imei2=imei2, serial=serial, + comma_dongle_id=comma_dongle_id, public_key=public_key, register_token=register_token) + + if resp is None: + raise Exception("Unable to register device, request was None") + + if resp.status_code in (409, 412): + timeout = time.monotonic() - start_time # Don't retry if the public key is already in use + key_in_use = "Public key is already in use, is your key unique? Contact your vendor for a new key." + unsafe_key = "Public key is known to not be unique and it's unsafe. Contact your vendor for a new key." + error_message = key_in_use if resp.status_code == 409 else unsafe_key + raise Exception(error_message) + + if resp.status_code != 200: + raise Exception(f"Failed to register with sunnylink. Status code: {resp.status_code}\nData\n:{resp.text}") + + dongleauth = json.loads(resp.text) + sunnylink_dongle_id = dongleauth["device_id"] + if sunnylink_dongle_id: + self._status_update("Device registered successfully.") + successful_registration = True + break + except Exception as e: + if verbose: + self._status_update(f"Waiting {backoff}s before retry, Exception occurred during registration: [{str(e)}]") + + if not os.path.exists(CRASH_LOG_DIR): + os.makedirs(CRASH_LOG_DIR) + + with open(f'{CRASH_LOG_DIR}/error.txt', 'a') as f: + f.write(f"[{datetime.now()}] sunnylink: {str(e)}\n") + + backoff = min(backoff * 2 * (0.5 + random.random()), 60) + time.sleep(backoff) + + if time.monotonic() - start_time > timeout: + self._status_update(f"Giving up on sunnylink's registration after {timeout}s. Will retry on next boot.") + time.sleep(3) + break + + self.params.put("SunnylinkDongleId", sunnylink_dongle_id or UNREGISTERED_SUNNYLINK_DONGLE_ID) + + # Set the last ping time to the current time since we were just talking to the API + last_ping = int(time.monotonic() * 1e9) if successful_registration else start_time + Params().put("LastSunnylinkPingTime", str(last_ping)) + + # Disable sunnylink if registration was not successful + if not successful_registration: + Params().put_bool("SunnylinkEnabled", False) + + self.spinner = None + return sunnylink_dongle_id diff --git a/sunnypilot/sunnylink/athena/__init__.py b/sunnypilot/sunnylink/athena/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sunnypilot/sunnylink/athena/manage_sunnylinkd.py b/sunnypilot/sunnylink/athena/manage_sunnylinkd.py new file mode 100755 index 000000000..377b6990f --- /dev/null +++ b/sunnypilot/sunnylink/athena/manage_sunnylinkd.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from openpilot.system.athena.manage_athenad import manage_athenad + +if __name__ == '__main__': + manage_athenad("SunnylinkDongleId", "SunnylinkdPid", 'sunnylinkd', 'sunnypilot.sunnylink.athena.sunnylinkd') diff --git a/sunnypilot/sunnylink/athena/sunnylinkd.py b/sunnypilot/sunnylink/athena/sunnylinkd.py new file mode 100755 index 000000000..0fb7ea6d9 --- /dev/null +++ b/sunnypilot/sunnylink/athena/sunnylinkd.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import base64 +import gzip +import os +import threading +import time + +from jsonrpc import dispatcher +from openpilot.common.params import Params +from openpilot.common.realtime import set_core_affinity +from openpilot.common.swaglog import cloudlog +from openpilot.system.athena.athenad import ws_send, jsonrpc_handler, \ + recv_queue, UploadQueueCache, upload_queue, cur_upload_items, backoff, ws_manage, log_handler +from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException, + create_connection) + +import cereal.messaging as messaging +from sunnypilot.sunnylink.api import SunnylinkApi +from sunnypilot.sunnylink.utils import sunnylink_need_register, sunnylink_ready + +SUNNYLINK_ATHENA_HOST = os.getenv('SUNNYLINK_ATHENA_HOST', 'wss://ws.stg.api.sunnypilot.ai') +HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) +LOCAL_PORT_WHITELIST = {8022} +SUNNYLINK_LOG_ATTR_NAME = "user.sunny.upload" +SUNNYLINK_RECONNECT_TIMEOUT_S = 70 # FYI changing this will also would require a change on sidebar.cc +DISALLOW_LOG_UPLOAD = threading.Event() + +params = Params() +sunnylink_api = SunnylinkApi(params.get("SunnylinkDongleId", encoding='utf-8')) + + +def handle_long_poll(ws: WebSocket, exit_event: threading.Event | None) -> None: + cloudlog.info("sunnylinkd.handle_long_poll started") + sm = messaging.SubMaster(['deviceState']) + end_event = threading.Event() + comma_prime_cellular_end_event = threading.Event() + + threads = [ + threading.Thread(target=ws_manage, args=(ws, end_event), name='ws_manage'), + threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'), + threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'), + threading.Thread(target=ws_ping, args=(ws, end_event), name='ws_ping'), + threading.Thread(target=ws_queue, args=(end_event,), name='ws_queue'), + # threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), + # threading.Thread(target=sunny_log_handler, args=(end_event, comma_prime_cellular_end_event), name='log_handler'), + # threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'), + ] + [ + threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}') + for x in range(HANDLER_THREADS) + ] + + for thread in threads: + thread.start() + try: + while not end_event.wait(0.1): + if not sunnylink_ready(params): + cloudlog.warning("Exiting sunnylinkd.handle_long_poll as SunnylinkEnabled is False") + break + + sm.update(0) + if exit_event is not None and exit_event.is_set(): + end_event.set() + comma_prime_cellular_end_event.set() + + prime_type = params.get("PrimeType", encoding='utf-8') or 0 + metered = sm['deviceState'].networkMetered + + if DISALLOW_LOG_UPLOAD.is_set() and not comma_prime_cellular_end_event.is_set(): + cloudlog.debug("sunnylinkd.handle_long_poll: DISALLOW_LOG_UPLOAD, setting comma_prime_cellular_end_event") + comma_prime_cellular_end_event.set() + elif metered and int(prime_type) > 2: + cloudlog.debug(f"sunnylinkd.handle_long_poll: PrimeType({prime_type}) > 2 and networkMetered({metered})") + comma_prime_cellular_end_event.set() + elif comma_prime_cellular_end_event.is_set() and not DISALLOW_LOG_UPLOAD.is_set(): + cloudlog.debug( + f"sunnylinkd.handle_long_poll: comma_prime_cellular_end_event is set and not PrimeType({prime_type}) > 2 or not networkMetered({metered})") + comma_prime_cellular_end_event.clear() + finally: + end_event.set() + comma_prime_cellular_end_event.set() + for thread in threads: + cloudlog.debug(f"sunnylinkd athena.joining {thread.name}") + thread.join() + cloudlog.debug(f"sunnylinkd athena.joined {thread.name}") + + +def ws_recv(ws: WebSocket, end_event: threading.Event) -> None: + last_ping = int(time.monotonic() * 1e9) + while not end_event.is_set(): + try: + opcode, data = ws.recv_data(control_frame=True) + if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): + if opcode == ABNF.OPCODE_TEXT: + data = data.decode("utf-8") + recv_queue.put_nowait(data) + cloudlog.debug(f"sunnylinkd.ws_recv.recv {data}") + elif opcode in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG): + cloudlog.debug("sunnylinkd.ws_recv.pong") + last_ping = int(time.monotonic() * 1e9) + Params().put("LastSunnylinkPingTime", str(last_ping)) + except WebSocketTimeoutException: + ns_since_last_ping = int(time.monotonic() * 1e9) - last_ping + if ns_since_last_ping > SUNNYLINK_RECONNECT_TIMEOUT_S * 1e9: + cloudlog.exception("sunnylinkd.ws_recv.timeout") + end_event.set() + except Exception: + cloudlog.exception("sunnylinkd.ws_recv.exception") + end_event.set() + + +def ws_ping(ws: WebSocket, end_event: threading.Event) -> None: + ws.ping() # Send the first ping + while not end_event.wait(SUNNYLINK_RECONNECT_TIMEOUT_S * 0.7): # Sleep about 70% before a timeout + try: + ws.ping() + cloudlog.debug("sunnylinkd.ws_recv.ws_ping: Pinging") + except Exception: + cloudlog.exception("sunnylinkd.ws_ping.exception") + end_event.set() + cloudlog.debug("sunnylinkd.ws_ping.end_event is set, exiting ws_ping thread") + + +def ws_queue(end_event: threading.Event) -> None: + resume_requested = False + tries = 0 + + while not end_event.is_set() and not resume_requested: + try: + if not resume_requested: + cloudlog.debug("sunnylinkd.ws_queue.resume_queued") + sunnylink_api.resume_queued(timeout=29) + resume_requested = True + tries = 0 + except Exception: + cloudlog.exception("sunnylinkd.ws_queue.resume_queued.exception") + resume_requested = False + tries += 1 + time.sleep(backoff(tries)) # Wait for the backoff time before the next attempt + + if end_event.is_set(): + cloudlog.debug("end_event is set, exiting ws_queue thread") + elif resume_requested: + cloudlog.debug(f"Resume requested to server after {tries} tries") + else: + cloudlog.error(f"Reached end of ws_queue while end_event is not set and resume_requested is {resume_requested}") + + +def sunny_log_handler(end_event: threading.Event, comma_prime_cellular_end_event: threading.Event) -> None: + while not end_event.wait(0.1): + if not comma_prime_cellular_end_event.is_set(): + log_handler(comma_prime_cellular_end_event, SUNNYLINK_LOG_ATTR_NAME) + comma_prime_cellular_end_event.set() + + +@dispatcher.add_method +def toggleLogUpload(enabled: bool): + DISALLOW_LOG_UPLOAD.clear() if enabled and DISALLOW_LOG_UPLOAD.is_set() else DISALLOW_LOG_UPLOAD.set() + + +@dispatcher.add_method +def getParamsAllKeys() -> list[str]: + keys: list[str] = [k.decode('utf-8') for k in Params().all_keys()] + return keys + + +@dispatcher.add_method +def getParams(params_keys: list[str], compression: bool = False) -> str | dict[str, str]: + try: + params = Params() + params_dict: dict[str, bytes] = {key: params.get(key) or b'' for key in params_keys} + + # Compress the values before encoding to base64 as output from params.get is bytes and same for compression + if compression: + params_dict = {key: gzip.compress(value) for key, value in params_dict.items()} + + # Last step is to encode the values to base64 and decode to utf-8 for JSON serialization + return {key: base64.b64encode(value).decode('utf-8') for key, value in params_dict.items()} + + except Exception as e: + cloudlog.exception("sunnylinkd.getParams.exception", e) + raise + + +@dispatcher.add_method +def saveParams(params_to_update: dict[str, str], compression: bool = False) -> None: + params = Params() + params_dict = {key: base64.b64decode(value) for key, value in params_to_update.items()} + + if compression: + params_dict = {key: gzip.decompress(value) for key, value in params_dict.items()} + + for key, value in params_dict.items(): + try: + params.put(key, value) + except Exception as e: + cloudlog.error(f"sunnylinkd.saveParams.exception {e}") + + +def main(exit_event: threading.Event = None): + try: + set_core_affinity([0, 1, 2, 3]) + except Exception: + cloudlog.exception("failed to set core affinity") + + while sunnylink_need_register(params): + cloudlog.info("Waiting for sunnylink registration to complete") + time.sleep(10) + + UploadQueueCache.initialize(upload_queue) + + ws_uri = SUNNYLINK_ATHENA_HOST + conn_start = None + conn_retries = 0 + while (exit_event is None or not exit_event.is_set()) and sunnylink_ready(params): + try: + if conn_start is None: + conn_start = time.monotonic() + + cloudlog.event("sunnylinkd.main.connecting_ws", ws_uri=ws_uri, retries=conn_retries) + ws = create_connection( + ws_uri, + cookie=f"jwt={sunnylink_api.get_token()}", + enable_multithread=True, + timeout=SUNNYLINK_RECONNECT_TIMEOUT_S, + ) + cloudlog.event("sunnylinkd.main.connected_ws", ws_uri=ws_uri, retries=conn_retries, + duration=time.monotonic() - conn_start) + conn_start = None + + conn_retries = 0 + cur_upload_items.clear() + + handle_long_poll(ws, exit_event) + except (KeyboardInterrupt, SystemExit): + break + except (ConnectionError, TimeoutError, WebSocketException): + conn_retries += 1 + params.remove("LastSunnylinkPingTime") + except Exception: + cloudlog.exception("sunnylinkd.main.exception") + + conn_retries += 1 + params.remove("LastSunnylinkPingTime") + + time.sleep(backoff(conn_retries)) + + if not sunnylink_ready(params): + cloudlog.debug("Reached end of sunnylinkd.main while sunnylink is not ready. Waiting 60s before retrying") + time.sleep(60) + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/registration_manager.py b/sunnypilot/sunnylink/registration_manager.py new file mode 100755 index 000000000..34323c278 --- /dev/null +++ b/sunnypilot/sunnylink/registration_manager.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +import time + +from openpilot.common.realtime import Ratekeeper +from openpilot.common.swaglog import cloudlog + +from cereal import log, messaging +from sunnypilot.sunnylink.utils import register_sunnylink + +NetworkType = log.DeviceState.NetworkType + + +def main(): + """The main method is expected to be called by the manager when the device boots up.""" + rk = Ratekeeper(.5) + sm = messaging.SubMaster(['deviceState'], poll='deviceState') + while True: + sm.update(1000) + if sm['deviceState'].networkType != NetworkType.none: + break + + cloudlog.info(f"Waiting to become online... {time.monotonic()}") + rk.keep_time() + + register_sunnylink() + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/uploader.py b/sunnypilot/sunnylink/uploader.py new file mode 100755 index 000000000..69ef2cfd8 --- /dev/null +++ b/sunnypilot/sunnylink/uploader.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +import bz2 +import datetime +import io +import json +import os +import random +import threading +import time +import traceback +from collections.abc import Iterator +from typing import BinaryIO + +import requests +from openpilot.common.params import Params +from openpilot.common.realtime import set_core_affinity +from openpilot.common.swaglog import cloudlog +from openpilot.system.hardware.hw import Paths +from openpilot.system.loggerd.xattr_cache import getxattr, setxattr + +import cereal.messaging as messaging +from cereal import log +from sunnypilot.sunnylink.api import SunnylinkApi + +NetworkType = log.DeviceState.NetworkType +UPLOAD_ATTR_NAME = 'user.sunny.upload' + +UPLOAD_ATTR_VALUE = b'1' + +UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB + +allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) +force_wifi = os.getenv("FORCEWIFI") is not None +fake_upload = os.getenv("FAKEUPLOAD") is not None + +OFFROAD_TRANSITION_TIMEOUT = 900. # wait until offroad for 15 minutes before allowing uploads + + +class FakeRequest: + def __init__(self): + self.headers = {"Content-Length": "0"} + + +class FakeResponse: + def __init__(self): + self.status_code = 200 + self.request = FakeRequest() + + +def get_directory_sort(d: str) -> list[str]: + # ensure old format is sorted sooner + o = ["0", ] if d.startswith("2024-") else ["1", ] + return o + [s.rjust(10, '0') for s in d.rsplit('--', 1)] + + +def listdir_by_creation(d: str) -> list[str]: + if not os.path.isdir(d): + return [] + + try: + paths = [f for f in os.listdir(d) if os.path.isdir(os.path.join(d, f))] + paths = sorted(paths, key=get_directory_sort) + return paths + except OSError: + cloudlog.exception("listdir_by_creation failed") + return [] + + +def clear_locks(root: str) -> None: + for logdir in os.listdir(root): + path = os.path.join(root, logdir) + try: + for fname in os.listdir(path): + if fname.endswith(".lock"): + os.unlink(os.path.join(path, fname)) + except OSError: + cloudlog.exception("clear_locks failed") + + +class Uploader: + def __init__(self, dongle_id: str, root: str): + self.dongle_id = dongle_id + self.api = SunnylinkApi(dongle_id) + self.root = root + + self.params = Params() + + # stats for last successfully uploaded file + self.last_filename = "" + + self.immediate_folders = ["crash/", "boot/"] + self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} + + def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]: + r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8") + requested_routes = [] if r is None else r.split(",") + + for logdir in listdir_by_creation(self.root): + path = os.path.join(self.root, logdir) + try: + names = os.listdir(path) + except OSError: + continue + + if any(name.endswith(".lock") for name in names): + continue + + for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): + key = os.path.join(logdir, name) + fn = os.path.join(path, name) + # skip files already uploaded + try: + ctime = os.path.getctime(fn) + is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE + except OSError: + cloudlog.event("uploader_getxattr_failed", key=key, fn=fn) + # deleter could have deleted, so skip + continue + if is_uploaded: + continue + + # limit uploading on metered connections + if metered: + dt = datetime.timedelta(hours=12) + if logdir in self.immediate_folders and (datetime.datetime.now() - datetime.datetime.fromtimestamp(ctime)) < dt: + continue + + if name == "qcamera.ts" and not any(logdir.startswith(r.split('|')[-1]) for r in requested_routes): + continue + + yield name, key, fn + + def next_file_to_upload(self, metered: bool) -> tuple[str, str, str] | None: + upload_files = list(self.list_upload_files(metered)) + + for name, key, fn in upload_files: + if any(f in fn for f in self.immediate_folders): + return name, key, fn + + return next( + ((name, key, fn) + for name, key, fn in upload_files if name in self.immediate_priority), + None, + ) + + def do_upload(self, key: str, fn: str): + url_resp = self.api.get( + f"device/{self.dongle_id}/upload_url/", + timeout=10, + path=key, + access_token=self.api.get_token(), + ) + if url_resp.status_code == 412: + return url_resp + + url_resp_json = json.loads(url_resp.text) + url = url_resp_json['url'] + headers = url_resp_json['headers'] + cloudlog.debug("sunnylink upload_url %s | Headers: %s", url, headers) + + if fake_upload: + return FakeResponse() + + with open(fn, "rb") as f: + data: BinaryIO + if key.endswith('.bz2') and not fn.endswith('.bz2'): + compressed = bz2.compress(f.read()) + data = io.BytesIO(compressed) + else: + data = f + + return requests.put(url, data=data, headers=headers, timeout=10) + + def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: + try: + sz = os.path.getsize(fn) + except OSError: + cloudlog.exception("upload: getsize failed") + return False + + cloudlog.event("upload_start", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + + if sz == 0: + # tag files of 0 size as uploaded + success = True + elif name in self.immediate_priority and sz > UPLOAD_QLOG_QCAM_MAX_SIZE: + cloudlog.event("uploader_too_large", key=key, fn=fn, sz=sz) + success = True + else: + start_time = time.monotonic() + + stat = None + last_exc = None + try: + stat = self.do_upload(key, fn) + except Exception as e: + last_exc = (e, traceback.format_exc()) + + if stat is not None and stat.status_code in (200, 201, 412): + self.last_filename = fn + dt = time.monotonic() - start_time + if stat.status_code == 412: + cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + else: + content_length = int(stat.request.headers.get("Content-Length", 0)) + speed = (content_length / 1e6) / dt + cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, + network_type=network_type, metered=metered, speed=speed) + success = True + elif stat is not None: # 401, 403... Not sure why they were up to begin with + success = False + cloudlog.event("upload_failed with content", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered, + error=stat.content.decode("utf-8")) + else: + success = False + cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + + if success: + # tag file as uploaded + try: + setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) + except OSError: + cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz) + + return success + + def step(self, network_type: int, metered: bool) -> bool | None: + d = self.next_file_to_upload(metered) + if d is None: + return None + + name, key, fn = d + + # qlogs and bootlogs need to be compressed before uploading + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): + key += ".bz2" + + return self.upload(name, key, fn, network_type, metered) + + +def main(exit_event: threading.Event = None) -> None: + if exit_event is None: + exit_event = threading.Event() + + try: + set_core_affinity([0, 1, 2, 3]) + except Exception: + cloudlog.exception("failed to set core affinity") + + clear_locks(Paths.log_root()) + + params = Params() + dongle_id = params.get("SunnylinkDongleId", encoding='utf8') + + offroad_transition_prev = 0. + offroad_last = False + + if dongle_id is None: + cloudlog.info("uploader missing dongle_id") + raise Exception("uploader can't start without dongle id") + + sm = messaging.SubMaster(['deviceState']) + uploader = Uploader(dongle_id, Paths.log_root()) + + backoff = 0.1 + while not exit_event.is_set(): + sm.update(0) + + offroad = params.get_bool("IsOffroad") + t = time.monotonic() + if offroad and not offroad_last and t > 300.: + offroad_transition_prev = time.monotonic() + offroad_last = offroad + + network_type = NetworkType.wifi if force_wifi else sm['deviceState'].networkType + if network_type == NetworkType.none: + if allow_sleep: + time.sleep(60 if offroad else 5) + continue + + if params.get_bool("DisableOnroadUploads"): + if not offroad or (offroad_transition_prev > 0. and t - offroad_transition_prev < OFFROAD_TRANSITION_TIMEOUT): + if not offroad: + cloudlog.info("not uploading: onroad uploads disabled") + else: + wait_minutes = int(OFFROAD_TRANSITION_TIMEOUT / 60) + time_left = OFFROAD_TRANSITION_TIMEOUT - (t - offroad_transition_prev) + if time_left > 2.0 * 60.0: + time_left_str = f"{int(time_left / 60)} minute(s)" + else: + time_left_str = f"{int(time_left)} seconds(s)" + cloudlog.info(f"not uploading: waiting until offroad for {wait_minutes} minutes; {time_left_str} left") + if allow_sleep: + time.sleep(60) + continue + + success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) + if success is None: + backoff = 60 if offroad else 5 + elif success: + backoff = 0.1 + else: + cloudlog.info("upload backoff %r", backoff) + backoff = min(backoff * 2, 120) + if allow_sleep: + time.sleep(backoff + random.uniform(0, backoff)) + + +if __name__ == "__main__": + main() diff --git a/sunnypilot/sunnylink/utils.py b/sunnypilot/sunnylink/utils.py new file mode 100644 index 000000000..57523d739 --- /dev/null +++ b/sunnypilot/sunnylink/utils.py @@ -0,0 +1,48 @@ +from sunnypilot.sunnylink.api import SunnylinkApi, UNREGISTERED_SUNNYLINK_DONGLE_ID +from openpilot.common.params import Params +from openpilot.system.version import is_prebuilt + + +def get_sunnylink_status(params=None) -> tuple[bool, bool]: + """Get the status of Sunnylink on the device. Returns a tuple of (is_sunnylink_enabled, is_registered).""" + params = params or Params() + is_sunnylink_enabled = params.get_bool("SunnylinkEnabled") + is_registered = params.get("SunnylinkDongleId", encoding='utf-8') not in (None, UNREGISTERED_SUNNYLINK_DONGLE_ID) + return is_sunnylink_enabled, is_registered + + +def sunnylink_ready(params=None) -> bool: + """Check if the device is ready to communicate with Sunnylink. That means it is enabled and registered.""" + params = params or Params() + is_sunnylink_enabled, is_registered = get_sunnylink_status(params) + return is_sunnylink_enabled and is_registered + + +def use_sunnylink_uploader(params) -> bool: + """Check if the device is ready to use Sunnylink and the uploader is enabled.""" + return sunnylink_ready(params) and params.get_bool("EnableSunnylinkUploader") + + +def sunnylink_need_register(params=None) -> bool: + """Check if the device needs to be registered with Sunnylink.""" + params = params or Params() + is_sunnylink_enabled, is_registered = get_sunnylink_status(params) + return is_sunnylink_enabled and not is_registered + + +def register_sunnylink(): + """Register the device with Sunnylink if it is enabled.""" + extra_args = {} + + if not Params().get_bool("SunnylinkEnabled"): + print("Sunnylink is not enabled. Exiting.") + exit(0) + + if not is_prebuilt(): + extra_args = { + "verbose": True, + "timeout": 60 + } + + sunnylink_id = SunnylinkApi(None).register_device(None, **extra_args) + print(f"SunnyLinkId: {sunnylink_id}") diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 2f455981c..f35394102 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -14,6 +14,7 @@ import sys import tempfile import threading import time +import gzip import zstandard as zstd from dataclasses import asdict, dataclass, replace from datetime import datetime @@ -105,6 +106,7 @@ cancelled_uploads: set[str] = set() cur_upload_items: dict[int, UploadItem | None] = {} +# TODO-SP: adapt zst for sunnylink def strip_zst_extension(fn: str) -> str: if fn.endswith('.zst'): return fn[:-4] @@ -334,6 +336,19 @@ def getVersion() -> dict[str, str]: } +@dispatcher.add_method +def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str = None, place_details: str = None) -> dict[str, int]: + destination = { + "latitude": latitude, + "longitude": longitude, + "place_name": place_name, + "place_details": place_details, + } + Params().put("NavDestination", json.dumps(destination)) + + return {"success": 1} + + def scan_dir(path: str, prefix: str) -> list[str]: files = [] # only walk directories that match the prefix @@ -547,7 +562,7 @@ def takeSnapshot() -> str | dict[str, str] | None: raise Exception("not available while camerad is started") -def get_logs_to_send_sorted() -> list[str]: +def get_logs_to_send_sorted(log_attr_name=LOG_ATTR_NAME) -> list[str]: # TODO: scan once then use inotify to detect file creation/deletion curr_time = int(time.time()) logs = [] @@ -555,7 +570,7 @@ def get_logs_to_send_sorted() -> list[str]: log_path = os.path.join(Paths.swaglog_root(), log_entry) time_sent = 0 try: - value = getxattr(log_path, LOG_ATTR_NAME) + value = getxattr(log_path, log_attr_name) if value is not None: time_sent = int.from_bytes(value, sys.byteorder) except (ValueError, TypeError): @@ -567,8 +582,69 @@ def get_logs_to_send_sorted() -> list[str]: return sorted(logs)[:-1] -def log_handler(end_event: threading.Event) -> None: +def add_log_to_queue(log_path, log_id, is_sunnylink=False): + MAX_SIZE_KB = 32 + MAX_SIZE_BYTES = MAX_SIZE_KB * 1024 + + with open(log_path) as f: + data = f.read() + + # Check if the file is empty + if not data: + cloudlog.warning(f"Log file {log_path} is empty.") + return + + # Initialize variables for encoding + payload = data + is_compressed = False + + # Log the current size of the file + current_size = len(json.dumps(payload).encode("utf-8")) + len(log_id.encode("utf-8")) + 100 # Add 100 bytes to account for encoding overhead + cloudlog.debug(f"Current size of log file {log_path}: {current_size} bytes") + + if is_sunnylink and current_size > MAX_SIZE_BYTES: + # Compress and encode the data if it exceeds the maximum size + compressed_data = gzip.compress(data.encode()) + payload = base64.b64encode(compressed_data).decode() + is_compressed = True + + # Log the size after compression and encoding + compressed_size = len(compressed_data) + encoded_size = len(payload) + cloudlog.debug(f"Size of log file {log_path} " + + f"after compression: {compressed_size} bytes, " + + f"after encoding: {encoded_size} bytes") + + jsonrpc = { + "method": "forwardLogs", + "params": { + "logs": payload + }, + "jsonrpc": "2.0", + "id": log_id + } + + if is_sunnylink and is_compressed: + jsonrpc["params"]["compressed"] = is_compressed + + jsonrpc_str = json.dumps(jsonrpc) + size_in_bytes = len(jsonrpc_str.encode('utf-8')) + + if is_sunnylink and size_in_bytes <= MAX_SIZE_BYTES: + cloudlog.debug(f"Target is sunnylink and log file {log_path} is small enough to send in one request ({size_in_bytes} bytes).") + low_priority_send_queue.put_nowait(jsonrpc_str) + elif is_sunnylink: + cloudlog.warning(f"Target is sunnylink and log file {log_path} is too large to send in one request.") + else: + cloudlog.debug(f"Target is not sunnylink, proceeding to send log file {log_path} in one request ({size_in_bytes} bytes).") + low_priority_send_queue.put_nowait(jsonrpc_str) + + +def log_handler(end_event: threading.Event, log_attr_name=LOG_ATTR_NAME) -> None: + is_sunnylink = log_attr_name != LOG_ATTR_NAME if PC: + cloudlog.debug("athena.log_handler: Not supported on PC") + time.sleep(1) return log_files = [] @@ -577,7 +653,7 @@ def log_handler(end_event: threading.Event) -> None: try: curr_scan = time.monotonic() if curr_scan - last_scan > 10: - log_files = get_logs_to_send_sorted() + log_files = get_logs_to_send_sorted(log_attr_name) last_scan = curr_scan # send one log @@ -588,18 +664,10 @@ def log_handler(end_event: threading.Event) -> None: try: curr_time = int(time.time()) log_path = os.path.join(Paths.swaglog_root(), log_entry) - setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) - with open(log_path) as f: - jsonrpc = { - "method": "forwardLogs", - "params": { - "logs": f.read() - }, - "jsonrpc": "2.0", - "id": log_entry - } - low_priority_send_queue.put_nowait(json.dumps(jsonrpc)) - curr_log = log_entry + setxattr(log_path, log_attr_name, int.to_bytes(curr_time, 4, sys.byteorder)) + + add_log_to_queue(log_path, log_entry, is_sunnylink) + curr_log = log_entry except OSError: pass # file could be deleted by log rotation @@ -616,7 +684,7 @@ def log_handler(end_event: threading.Event) -> None: if log_entry and log_success: log_path = os.path.join(Paths.swaglog_root(), log_entry) try: - setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) + setxattr(log_path, log_attr_name, LOG_ATTR_VALUE_MAX_UNIX_TIME) except OSError: pass # file could be deleted by log rotation if curr_log == log_entry: @@ -752,23 +820,24 @@ def ws_manage(ws: WebSocket, end_event: threading.Event) -> None: onroad_prev = None sock = ws.sock - while True: + while not end_event.wait(5): onroad = params.get_bool("IsOnroad") if onroad != onroad_prev: onroad_prev = onroad if sock is not None: - # While not sending data, onroad, we can expect to time out in 7 + (7 * 2) = 21s - # offroad, we can expect to time out in 30 + (10 * 3) = 60s - # FIXME: TCP_USER_TIMEOUT is effectively 2x for some reason (32s), so it's mostly unused - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, 16000 if onroad else 0) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30) + if sys.platform == 'darwin': # macOS + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 7 if onroad else 30) + else: + # While not sending data, onroad, we can expect to time out in 7 + (7 * 2) = 21s + # offroad, we can expect to time out in 30 + (10 * 3) = 60s + # FIXME: TCP_USER_TIMEOUT is effectively 2x for some reason (32s), so it's mostly unused + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, 16000 if onroad else 0) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 7 if onroad else 10) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 if onroad else 3) - if end_event.wait(5): - break - def backoff(retries: int) -> int: return random.randrange(0, min(128, int(2 ** retries))) diff --git a/system/athena/manage_athenad.py b/system/athena/manage_athenad.py index f5ab81720..7158cd922 100755 --- a/system/athena/manage_athenad.py +++ b/system/athena/manage_athenad.py @@ -13,8 +13,12 @@ ATHENA_MGR_PID_PARAM = "AthenadPid" def main(): + manage_athenad("DongleId", ATHENA_MGR_PID_PARAM, 'athenad', 'system.athena.athenad') + + +def manage_athenad(dongle_id_param, pid_param, process_name, target): params = Params() - dongle_id = params.get("DongleId").decode('utf-8') + dongle_id = params.get(dongle_id_param, encoding='utf-8') build_metadata = get_build_metadata() cloudlog.bind_global(dongle_id=dongle_id, @@ -27,17 +31,16 @@ def main(): try: while 1: - cloudlog.info("starting athena daemon") - proc = Process(name='athenad', target=launcher, args=('system.athena.athenad', 'athenad')) + cloudlog.info(f"starting {process_name} daemon") + proc = Process(name=process_name, target=launcher, args=(target, process_name)) proc.start() proc.join() - cloudlog.event("athenad exited", exitcode=proc.exitcode) + cloudlog.event(f"{process_name} exited", exitcode=proc.exitcode) time.sleep(5) except Exception: - cloudlog.exception("manage_athenad.exception") + cloudlog.exception(f"manage_{process_name}.exception") finally: - params.remove(ATHENA_MGR_PID_PARAM) - + params.remove(pid_param) if __name__ == '__main__': main() diff --git a/system/manager/process_config.py b/system/manager/process_config.py index 9d94f6423..9712b63e0 100644 --- a/system/manager/process_config.py +++ b/system/manager/process_config.py @@ -5,6 +5,7 @@ from cereal import car from openpilot.common.params import Params from openpilot.system.hardware import PC, TICI from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess +from sunnypilot.sunnylink.utils import sunnylink_need_register, sunnylink_ready, use_sunnylink_uploader WEBCAM = os.getenv("USE_WEBCAM") is not None @@ -57,6 +58,18 @@ def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool: def use_github_runner(started, params, CP: car.CarParams) -> bool: return not PC and params.get_bool("EnableGithubRunner") and not params.get_bool("NetworkMetered") +def sunnylink_ready_shim(started, params, CP: car.CarParams) -> bool: + """Shim for sunnylink_ready to match the process manager signature.""" + return sunnylink_ready(params) + +def sunnylink_need_register_shim(started, params, CP: car.CarParams) -> bool: + """Shim for sunnylink_need_register to match the process manager signature.""" + return sunnylink_need_register(params) + +def use_sunnylink_uploader_shim(started, params, CP: car.CarParams) -> bool: + """Shim for use_sunnylink_uploader to match the process manager signature.""" + return use_sunnylink_uploader(params) + def or_(*fns): return lambda *args: operator.or_(*(fn(*args) for fn in fns)) @@ -112,6 +125,10 @@ procs = [ PythonProcess("webrtcd", "system.webrtc.webrtcd", notcar), PythonProcess("webjoystick", "tools.bodyteleop.web", notcar), PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)), + + # sunnylink <3 + DaemonProcess("manage_sunnylinkd", "sunnypilot.sunnylink.athena.manage_sunnylinkd", "SunnylinkdPid"), + PythonProcess("sunnylink_registration_manager", "sunnypilot.sunnylink.registration_manager", sunnylink_need_register_shim), ] # sunnypilot @@ -122,4 +139,7 @@ procs += [ if os.path.exists("./github_runner.sh"): procs += [NativeProcess("github_runner_start", "system/manager", ["./github_runner.sh", "start"], and_(only_offroad, use_github_runner), sigkill=False)] +if os.path.exists("../sunnypilot/sunnylink/uploader.py"): + procs += [PythonProcess("sunnylink_uploader", "sunnypilot.sunnylink.uploader", use_sunnylink_uploader_shim)] + managed_processes = {p.name: p for p in procs}