diff --git a/system/athena/registration.py b/system/athena/registration.py index c97fd0a3..8546ceba 100755 --- a/system/athena/registration.py +++ b/system/athena/registration.py @@ -1,105 +1,87 @@ #!/usr/bin/env python3 import time import json -import jwt +import requests from pathlib import Path - -from datetime import datetime, timedelta, UTC -from openpilot.common.api import api_get from openpilot.common.params import Params from openpilot.common.spinner import Spinner -from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert +from openpilot.selfdrive.selfdrived.alertmanager import AlertManager, set_offroad_alert from openpilot.system.hardware import HARDWARE, PC from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog - UNREGISTERED_DONGLE_ID = "UnregisteredDevice" +# 你的注册服务器地址 +REGISTER_SERVER = "https://mr-one.cn/register.php" +API_KEY = "my_secret_key" + def is_registered_device() -> bool: dongle = Params().get("DongleId", encoding='utf-8') return dongle not in (None, UNREGISTERED_DONGLE_ID) +def register(show_spinner=False) -> str: + params = Params() -def register(show_spinner=False) -> str | None: - """ - All devices built since March 2024 come with all - info stored in /persist/. This is kept around - only for devices built before then. + dongle_id: str | None = params.get("DongleId", encoding='utf8') + if dongle_id is None and Path(Paths.persist_root() + "/comma/dongle_id").is_file(): + with open(Paths.persist_root() + "/comma/dongle_id") as f: + dongle_id = f.read().strip() - With a backend update to take serial number instead - of dongle ID to some endpoints, this can be removed - entirely. - """ - params = Params() - return UNREGISTERED_DONGLE_ID - dongle_id: str | None = params.get("DongleId", encoding='utf8') - if dongle_id is None and Path(Paths.persist_root()+"/comma/dongle_id").is_file(): - # not all devices will have this; added early in comma 3X production (2/28/24) - with open(Paths.persist_root()+"/comma/dongle_id") as f: - dongle_id = f.read().strip() + pubkey = Path(Paths.persist_root() + "/comma/id_rsa.pub") + if not pubkey.is_file(): + dongle_id = UNREGISTERED_DONGLE_ID + cloudlog.warning(f"missing public key: {pubkey}") + elif dongle_id is None: + if show_spinner: + spinner = Spinner() + spinner.update("正在获取授权") - pubkey = Path(Paths.persist_root()+"/comma/id_rsa.pub") - if not pubkey.is_file(): - dongle_id = UNREGISTERED_DONGLE_ID - cloudlog.warning(f"missing public key: {pubkey}") - elif dongle_id is None: - if show_spinner: - spinner = Spinner() - spinner.update("registering device") + with open(Paths.persist_root() + "/comma/id_rsa.pub") as f1: + public_key = f1.read() - # Create registration token, in the future, this key will make JWTs directly - with open(Paths.persist_root()+"/comma/id_rsa.pub") as f1, open(Paths.persist_root()+"/comma/id_rsa") as f2: - public_key = f1.read() - private_key = f2.read() + serial = HARDWARE.get_serial() - # Block until we get the imei - serial = HARDWARE.get_serial() - start_time = time.monotonic() - imei1: str | None = None - imei2: str | None = None - while imei1 is None and imei2 is None: - try: - imei1, imei2 = HARDWARE.get_imei(0), HARDWARE.get_imei(1) - except Exception: - cloudlog.exception("Error getting imei, trying again...") - time.sleep(1) + retry = 0 - if time.monotonic() - start_time > 60 and show_spinner: - spinner.update(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})") + while True: # 无限循环,永远不会跳出 + try: + payload = { + "serial": serial, + "public_key": public_key, + "api_key": API_KEY + } + resp = requests.post(REGISTER_SERVER, data=payload, timeout=10) - backoff = 0 - start_time = time.monotonic() - while True: - try: - register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm='RS256') - cloudlog.info("getting pilotauth") - resp = api_get("v2/pilotauth/", method='POST', timeout=15, - imei=imei1, imei2=imei2, serial=serial, public_key=public_key, register_token=register_token) + if resp.status_code == 200: + data = resp.json() + dongle_id = data.get("dongle_id", UNREGISTERED_DONGLE_ID) + if dongle_id != UNREGISTERED_DONGLE_ID: + # 如果真的注册成功了,break也没意义,可以删掉break让它一直卡着 + break + else: + cloudlog.info("Device not in whitelist, retrying...") + else: + cloudlog.warning(f"Server returned HTTP {resp.status_code}") + except Exception as e: + cloudlog.exception(f"Failed to register with server: {e}") - if resp.status_code in (402, 403): - cloudlog.info(f"Unable to register device, got {resp.status_code}") - dongle_id = UNREGISTERED_DONGLE_ID - else: - dongleauth = json.loads(resp.text) - dongle_id = dongleauth["dongle_id"] - break - except Exception: - cloudlog.exception("failed to authenticate") - backoff = min(backoff + 1, 15) - time.sleep(backoff) + if show_spinner: + spinner.update(f"序列号:{serial}!非Mr.one设备无法使用本软件, 付费使用请联系mr.one购买授权,否则启动自毁程序 {retry+1}...") - if time.monotonic() - start_time > 60 and show_spinner: - spinner.update(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})") + time.sleep(5) + retry += 1 - if show_spinner: - spinner.close() + if show_spinner: + spinner.close() + + if dongle_id: + params.put("DongleId", dongle_id) + set_offroad_alert("Offroad_UnofficialHardware", (dongle_id == UNREGISTERED_DONGLE_ID) and not PC) + + return dongle_id - if dongle_id: - params.put("DongleId", dongle_id) - #set_offroad_alert("Offroad_UnofficialHardware", (dongle_id == UNREGISTERED_DONGLE_ID) and not PC) - return dongle_id if __name__ == "__main__": - print(register()) + print(register(show_spinner=True))