@@ -7,7 +7,6 @@ from openpilot.system.hardware import HARDWARE
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
|
||||
parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi')
|
||||
parser.add_argument('--switch', metavar='iccid', help='switch to profile')
|
||||
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
|
||||
parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)')
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Literal
|
||||
|
||||
from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile
|
||||
|
||||
class TiciLPA(LPABase):
|
||||
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
|
||||
self.env = os.environ.copy()
|
||||
self.env['LPAC_APDU'] = interface
|
||||
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
|
||||
self.env['AT_DEVICE'] = '/dev/ttyUSB2'
|
||||
|
||||
self.timeout_sec = 45
|
||||
|
||||
if shutil.which('lpac') is None:
|
||||
raise LPAError('lpac not found, must be installed!')
|
||||
|
||||
def list_profiles(self) -> list[Profile]:
|
||||
msgs = self._invoke('profile', 'list')
|
||||
self._validate_successful(msgs)
|
||||
return [Profile(
|
||||
iccid=p['iccid'],
|
||||
nickname=p['profileNickname'],
|
||||
enabled=p['profileState'] == 'enabled',
|
||||
provider=p['serviceProviderName']
|
||||
) for p in msgs[-1]['payload']['data']]
|
||||
|
||||
def get_active_profile(self) -> Profile | None:
|
||||
return next((p for p in self.list_profiles() if p.enabled), None)
|
||||
|
||||
def delete_profile(self, iccid: str) -> None:
|
||||
self._validate_profile_exists(iccid)
|
||||
latest = self.get_active_profile()
|
||||
if latest is not None and latest.iccid == iccid:
|
||||
raise LPAError('cannot delete active profile, switch to another profile first')
|
||||
self._validate_successful(self._invoke('profile', 'delete', iccid))
|
||||
self._process_notifications()
|
||||
|
||||
def download_profile(self, qr: str, nickname: str | None = None) -> None:
|
||||
msgs = self._invoke('profile', 'download', '-a', qr)
|
||||
self._validate_successful(msgs)
|
||||
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
|
||||
if new_profile is None:
|
||||
raise LPAError('no new profile found')
|
||||
if nickname:
|
||||
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
|
||||
self._process_notifications()
|
||||
|
||||
def nickname_profile(self, iccid: str, nickname: str) -> None:
|
||||
self._validate_profile_exists(iccid)
|
||||
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
|
||||
|
||||
def switch_profile(self, iccid: str) -> None:
|
||||
self._validate_profile_exists(iccid)
|
||||
latest = self.get_active_profile()
|
||||
if latest and latest.iccid == iccid:
|
||||
return
|
||||
self._validate_successful(self._invoke('profile', 'enable', iccid))
|
||||
self._process_notifications()
|
||||
|
||||
def _invoke(self, *cmd: str):
|
||||
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
|
||||
try:
|
||||
out, err = proc.communicate(timeout=self.timeout_sec)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
proc.kill()
|
||||
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
|
||||
|
||||
messages = []
|
||||
for line in out.decode().strip().splitlines():
|
||||
if line.startswith('{'):
|
||||
message = json.loads(line)
|
||||
|
||||
# lpac response format validations
|
||||
assert 'type' in message, 'expected type in message'
|
||||
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
|
||||
assert 'payload' in message, 'expected payload in message'
|
||||
assert 'code' in message['payload'], 'expected code in message payload'
|
||||
assert 'data' in message['payload'], 'expected data in message payload'
|
||||
|
||||
msg_ret_code = message['payload']['code']
|
||||
if msg_ret_code != 0:
|
||||
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
|
||||
|
||||
messages.append(message)
|
||||
|
||||
if len(messages) == 0:
|
||||
raise LPAError(f"lpac {cmd} returned no messages")
|
||||
|
||||
return messages
|
||||
|
||||
def _process_notifications(self) -> None:
|
||||
"""
|
||||
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
|
||||
"""
|
||||
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
|
||||
|
||||
def _validate_profile_exists(self, iccid: str) -> None:
|
||||
if not any(p.iccid == iccid for p in self.list_profiles()):
|
||||
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
|
||||
|
||||
def _validate_successful(self, msgs: list[dict]) -> None:
|
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
|
||||
@@ -12,7 +12,7 @@ from openpilot.common.utils import sudo_read, sudo_write
|
||||
from openpilot.common.gpio import gpio_set, gpio_init, get_irqs_for_action
|
||||
from openpilot.system.hardware.base import HardwareBase, LPABase, ThermalConfig, ThermalZone
|
||||
from openpilot.system.hardware.tici import iwlist
|
||||
from openpilot.system.hardware.tici.esim import TiciLPA
|
||||
from openpilot.system.hardware.tici.lpa import TiciLPA
|
||||
from openpilot.system.hardware.tici.pins import GPIO
|
||||
from openpilot.system.hardware.tici.amplifier import Amplifier
|
||||
|
||||
|
||||
24
system/hardware/tici/lpa.py
Normal file
24
system/hardware/tici/lpa.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from openpilot.system.hardware.base import LPABase, Profile
|
||||
|
||||
|
||||
class TiciLPA(LPABase):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def list_profiles(self) -> list[Profile]:
|
||||
return []
|
||||
|
||||
def get_active_profile(self) -> Profile | None:
|
||||
return None
|
||||
|
||||
def delete_profile(self, iccid: str) -> None:
|
||||
return None
|
||||
|
||||
def download_profile(self, qr: str, nickname: str | None = None) -> None:
|
||||
return None
|
||||
|
||||
def nickname_profile(self, iccid: str, nickname: str) -> None:
|
||||
return None
|
||||
|
||||
def switch_profile(self, iccid: str) -> None:
|
||||
return None
|
||||
@@ -1,51 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from openpilot.system.hardware import HARDWARE, TICI
|
||||
from openpilot.system.hardware.base import LPAProfileNotFoundError
|
||||
|
||||
# https://euicc-manual.osmocom.org/docs/rsp/known-test-profile
|
||||
# iccid is always the same for the given activation code
|
||||
TEST_ACTIVATION_CODE = 'LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5'
|
||||
TEST_ICCID = '8944476500001944011'
|
||||
|
||||
TEST_NICKNAME = 'test_profile'
|
||||
|
||||
def cleanup():
|
||||
lpa = HARDWARE.get_sim_lpa()
|
||||
try:
|
||||
lpa.delete_profile(TEST_ICCID)
|
||||
except LPAProfileNotFoundError:
|
||||
pass
|
||||
lpa.process_notifications()
|
||||
|
||||
class TestEsim:
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
if not TICI:
|
||||
pytest.skip()
|
||||
cleanup()
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
cleanup()
|
||||
|
||||
def test_provision_enable_disable(self):
|
||||
lpa = HARDWARE.get_sim_lpa()
|
||||
current_active = lpa.get_active_profile()
|
||||
|
||||
lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME)
|
||||
assert any(p.iccid == TEST_ICCID and p.nickname == TEST_NICKNAME for p in lpa.list_profiles())
|
||||
|
||||
lpa.enable_profile(TEST_ICCID)
|
||||
new_active = lpa.get_active_profile()
|
||||
assert new_active is not None
|
||||
assert new_active.iccid == TEST_ICCID
|
||||
assert new_active.nickname == TEST_NICKNAME
|
||||
|
||||
lpa.disable_profile(TEST_ICCID)
|
||||
new_active = lpa.get_active_profile()
|
||||
assert new_active is None
|
||||
|
||||
if current_active:
|
||||
lpa.enable_profile(current_active.iccid)
|
||||
Reference in New Issue
Block a user