From d1ff1dee89cefb8eb33040afb542e8f87de871ef Mon Sep 17 00:00:00 2001 From: "Mr.one" Date: Fri, 15 Aug 2025 08:27:34 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=96=E8=BE=91registration.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- system/athena/registration.py | 136 +++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 60 deletions(-) diff --git a/system/athena/registration.py b/system/athena/registration.py index 31aedb09..519e2dd3 100755 --- a/system/athena/registration.py +++ b/system/athena/registration.py @@ -1,89 +1,105 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- - import time import json -import requests +import jwt from pathlib import Path + +from datetime import datetime, timedelta, UTC +from openpilot.common.api import api_get from openpilot.common.params import Params -from openpilot.system.ui.spinner import Spinner -from openpilot.selfdrive.selfdrived.alertmanager import AlertManager, set_offroad_alert +from openpilot.common.spinner import Spinner +from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert from openpilot.system.hardware import HARDWARE, PC from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog -UNREGISTERED_DONGLE_ID = "UnregisteredDevice" -# 你的注册服务器地址 -REGISTER_SERVER = "https://mr-one.cn/register.php" -API_KEY = "my_secret_key" +UNREGISTERED_DONGLE_ID = "UnregisteredDevice" def is_registered_device() -> bool: dongle = Params().get("DongleId", encoding='utf-8') return dongle not in (None, UNREGISTERED_DONGLE_ID) -def register(show_spinner=False) -> str: - params = Params() - dongle_id: str | None = params.get("DongleId", encoding='utf8') - if dongle_id is None and Path(Paths.persist_root() + "/comma/dongle_id").is_file(): - with open(Paths.persist_root() + "/comma/dongle_id") as f: - dongle_id = f.read().strip() +def register(show_spinner=False) -> str | None: + """ + All devices built since March 2024 come with all + info stored in /persist/. This is kept around + only for devices built before then. - pubkey = Path(Paths.persist_root() + "/comma/id_rsa.pub") - if not pubkey.is_file(): - dongle_id = UNREGISTERED_DONGLE_ID - cloudlog.warning(f"missing public key: {pubkey}") - elif dongle_id is None: - if show_spinner: - spinner = Spinner() - spinner.update("正在获取授权") + With a backend update to take serial number instead + of dongle ID to some endpoints, this can be removed + entirely. + """ + params = Params() + return UNREGISTERED_DONGLE_ID + dongle_id: str | None = params.get("DongleId", encoding='utf8') + if dongle_id is None and Path(Paths.persist_root()+"/comma/dongle_id").is_file(): + # not all devices will have this; added early in comma 3X production (2/28/24) + with open(Paths.persist_root()+"/comma/dongle_id") as f: + dongle_id = f.read().strip() - with open(Paths.persist_root() + "/comma/id_rsa.pub") as f1: - public_key = f1.read() + pubkey = Path(Paths.persist_root()+"/comma/id_rsa.pub") + if not pubkey.is_file(): + dongle_id = UNREGISTERED_DONGLE_ID + cloudlog.warning(f"missing public key: {pubkey}") + elif dongle_id is None: + if show_spinner: + spinner = Spinner() + spinner.update("registering device") - serial = HARDWARE.get_serial() + # Create registration token, in the future, this key will make JWTs directly + with open(Paths.persist_root()+"/comma/id_rsa.pub") as f1, open(Paths.persist_root()+"/comma/id_rsa") as f2: + public_key = f1.read() + private_key = f2.read() - retry = 0 + # Block until we get the imei + serial = HARDWARE.get_serial() + start_time = time.monotonic() + imei1: str | None = None + imei2: str | None = None + while imei1 is None and imei2 is None: + try: + imei1, imei2 = HARDWARE.get_imei(0), HARDWARE.get_imei(1) + except Exception: + cloudlog.exception("Error getting imei, trying again...") + time.sleep(1) - while True: # 无限循环,永远不会跳出 - try: - payload = { - "serial": serial, - "public_key": public_key, - "api_key": API_KEY - } - resp = requests.post(REGISTER_SERVER, data=payload, timeout=10) + if time.monotonic() - start_time > 60 and show_spinner: + spinner.update(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})") - if resp.status_code == 200: - data = resp.json() - dongle_id = data.get("dongle_id", UNREGISTERED_DONGLE_ID) - if dongle_id != UNREGISTERED_DONGLE_ID: - # 如果真的注册成功了,break也没意义,可以删掉break让它一直卡着 - break - else: - cloudlog.info("Device not in whitelist, retrying...") - else: - cloudlog.warning(f"Server returned HTTP {resp.status_code}") - except Exception as e: - cloudlog.exception(f"Failed to register with server: {e}") + backoff = 0 + start_time = time.monotonic() + while True: + try: + register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm='RS256') + cloudlog.info("getting pilotauth") + resp = api_get("v2/pilotauth/", method='POST', timeout=15, + imei=imei1, imei2=imei2, serial=serial, public_key=public_key, register_token=register_token) - if show_spinner: - spinner.update(f"序列号:{serial}!非Mr.one设备无法使用本软件, 付费使用请联系mr.one购买授权,否则启动自毁程序 {retry+1}...") + if resp.status_code in (402, 403): + cloudlog.info(f"Unable to register device, got {resp.status_code}") + dongle_id = UNREGISTERED_DONGLE_ID + else: + dongleauth = json.loads(resp.text) + dongle_id = dongleauth["dongle_id"] + break + except Exception: + cloudlog.exception("failed to authenticate") + backoff = min(backoff + 1, 15) + time.sleep(backoff) - time.sleep(5) - retry += 1 + if time.monotonic() - start_time > 60 and show_spinner: + spinner.update(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})") - if show_spinner: - spinner.close() - - if dongle_id: - params.put("DongleId", dongle_id) - set_offroad_alert("Offroad_UnofficialHardware", (dongle_id == UNREGISTERED_DONGLE_ID) and not PC) - - return dongle_id + if show_spinner: + spinner.close() + if dongle_id: + params.put("DongleId", dongle_id) + set_offroad_alert("Offroad_UnofficialHardware", (dongle_id == UNREGISTERED_DONGLE_ID) and not PC) + return dongle_id if __name__ == "__main__": - print(register(show_spinner=True)) + print(register()) \ No newline at end of file