diff --git a/system/hardware/esim.py b/system/hardware/esim.py index 58ead6593..9b7d4f9ec 100755 --- a/system/hardware/esim.py +++ b/system/hardware/esim.py @@ -7,7 +7,6 @@ from openpilot.system.hardware import HARDWARE if __name__ == '__main__': parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai') - parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi') parser.add_argument('--switch', metavar='iccid', help='switch to profile') parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)') parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py deleted file mode 100644 index b489286f5..000000000 --- a/system/hardware/tici/esim.py +++ /dev/null @@ -1,106 +0,0 @@ -import json -import os -import shutil -import subprocess -from typing import Literal - -from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile - -class TiciLPA(LPABase): - def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): - self.env = os.environ.copy() - self.env['LPAC_APDU'] = interface - self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' - self.env['AT_DEVICE'] = '/dev/ttyUSB2' - - self.timeout_sec = 45 - - if shutil.which('lpac') is None: - raise LPAError('lpac not found, must be installed!') - - def list_profiles(self) -> list[Profile]: - msgs = self._invoke('profile', 'list') - self._validate_successful(msgs) - return [Profile( - iccid=p['iccid'], - nickname=p['profileNickname'], - enabled=p['profileState'] == 'enabled', - provider=p['serviceProviderName'] - ) for p in msgs[-1]['payload']['data']] - - def get_active_profile(self) -> Profile | None: - return next((p for p in self.list_profiles() if p.enabled), None) - - def delete_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) - latest = self.get_active_profile() - if latest is not None and latest.iccid == iccid: - raise LPAError('cannot delete active profile, switch to another profile first') - self._validate_successful(self._invoke('profile', 'delete', iccid)) - self._process_notifications() - - def download_profile(self, qr: str, nickname: str | None = None) -> None: - msgs = self._invoke('profile', 'download', '-a', qr) - self._validate_successful(msgs) - new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) - if new_profile is None: - raise LPAError('no new profile found') - if nickname: - self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) - self._process_notifications() - - def nickname_profile(self, iccid: str, nickname: str) -> None: - self._validate_profile_exists(iccid) - self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) - - def switch_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) - latest = self.get_active_profile() - if latest and latest.iccid == iccid: - return - self._validate_successful(self._invoke('profile', 'enable', iccid)) - self._process_notifications() - - def _invoke(self, *cmd: str): - proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) - try: - out, err = proc.communicate(timeout=self.timeout_sec) - except subprocess.TimeoutExpired as e: - proc.kill() - raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e - - messages = [] - for line in out.decode().strip().splitlines(): - if line.startswith('{'): - message = json.loads(line) - - # lpac response format validations - assert 'type' in message, 'expected type in message' - assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' - assert 'payload' in message, 'expected payload in message' - assert 'code' in message['payload'], 'expected code in message payload' - assert 'data' in message['payload'], 'expected data in message payload' - - msg_ret_code = message['payload']['code'] - if msg_ret_code != 0: - raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}") - - messages.append(message) - - if len(messages) == 0: - raise LPAError(f"lpac {cmd} returned no messages") - - return messages - - def _process_notifications(self) -> None: - """ - Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. - """ - self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) - - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - - def _validate_successful(self, msgs: list[dict]) -> None: - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' diff --git a/system/hardware/tici/hardware.py b/system/hardware/tici/hardware.py index 5a84afce0..2295ca3cb 100644 --- a/system/hardware/tici/hardware.py +++ b/system/hardware/tici/hardware.py @@ -12,7 +12,7 @@ from openpilot.common.utils import sudo_read, sudo_write from openpilot.common.gpio import gpio_set, gpio_init, get_irqs_for_action from openpilot.system.hardware.base import HardwareBase, LPABase, ThermalConfig, ThermalZone from openpilot.system.hardware.tici import iwlist -from openpilot.system.hardware.tici.esim import TiciLPA +from openpilot.system.hardware.tici.lpa import TiciLPA from openpilot.system.hardware.tici.pins import GPIO from openpilot.system.hardware.tici.amplifier import Amplifier diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py new file mode 100644 index 000000000..9bd9d8c7b --- /dev/null +++ b/system/hardware/tici/lpa.py @@ -0,0 +1,24 @@ +from openpilot.system.hardware.base import LPABase, Profile + + +class TiciLPA(LPABase): + def __init__(self): + pass + + def list_profiles(self) -> list[Profile]: + return [] + + def get_active_profile(self) -> Profile | None: + return None + + def delete_profile(self, iccid: str) -> None: + return None + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + return None + + def nickname_profile(self, iccid: str, nickname: str) -> None: + return None + + def switch_profile(self, iccid: str) -> None: + return None diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py deleted file mode 100644 index 6fab931cc..000000000 --- a/system/hardware/tici/tests/test_esim.py +++ /dev/null @@ -1,51 +0,0 @@ -import pytest - -from openpilot.system.hardware import HARDWARE, TICI -from openpilot.system.hardware.base import LPAProfileNotFoundError - -# https://euicc-manual.osmocom.org/docs/rsp/known-test-profile -# iccid is always the same for the given activation code -TEST_ACTIVATION_CODE = 'LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5' -TEST_ICCID = '8944476500001944011' - -TEST_NICKNAME = 'test_profile' - -def cleanup(): - lpa = HARDWARE.get_sim_lpa() - try: - lpa.delete_profile(TEST_ICCID) - except LPAProfileNotFoundError: - pass - lpa.process_notifications() - -class TestEsim: - - @classmethod - def setup_class(cls): - if not TICI: - pytest.skip() - cleanup() - - @classmethod - def teardown_class(cls): - cleanup() - - def test_provision_enable_disable(self): - lpa = HARDWARE.get_sim_lpa() - current_active = lpa.get_active_profile() - - lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME) - assert any(p.iccid == TEST_ICCID and p.nickname == TEST_NICKNAME for p in lpa.list_profiles()) - - lpa.enable_profile(TEST_ICCID) - new_active = lpa.get_active_profile() - assert new_active is not None - assert new_active.iccid == TEST_ICCID - assert new_active.nickname == TEST_NICKNAME - - lpa.disable_profile(TEST_ICCID) - new_active = lpa.get_active_profile() - assert new_active is None - - if current_active: - lpa.enable_profile(current_active.iccid)