mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 05:24:06 +08:00
# Conflicts: # .github/workflows/ci_weekly_run.yaml # .github/workflows/raylib_ui_preview.yaml # .github/workflows/tests.yaml # .gitmodules # README.md # SConstruct # common/api.py # common/params_keys.h # docs/CARS.md # msgq_repo # opendbc_repo # panda # selfdrive/car/tests/test_car_interfaces.py # selfdrive/controls/controlsd.py # selfdrive/controls/lib/latcontrol.py # selfdrive/controls/lib/latcontrol_angle.py # selfdrive/controls/lib/latcontrol_pid.py # selfdrive/controls/lib/latcontrol_torque.py # selfdrive/controls/tests/test_latcontrol.py # selfdrive/monitoring/helpers.py # selfdrive/ui/SConscript # selfdrive/ui/main.cc # selfdrive/ui/qt/body.h # selfdrive/ui/qt/home.cc # selfdrive/ui/qt/home.h # selfdrive/ui/qt/network/networking.cc # selfdrive/ui/qt/network/networking.h # selfdrive/ui/qt/network/wifi_manager.cc # selfdrive/ui/qt/offroad/developer_panel.cc # selfdrive/ui/qt/offroad/developer_panel.h # selfdrive/ui/qt/offroad/experimental_mode.cc # selfdrive/ui/qt/offroad/firehose.cc # selfdrive/ui/qt/offroad/firehose.h # selfdrive/ui/qt/offroad/onboarding.cc # selfdrive/ui/qt/offroad/onboarding.h # selfdrive/ui/qt/offroad/settings.cc # selfdrive/ui/qt/offroad/settings.h # selfdrive/ui/qt/offroad/software_settings.cc # selfdrive/ui/qt/onroad/alerts.cc # selfdrive/ui/qt/onroad/annotated_camera.h # selfdrive/ui/qt/onroad/buttons.cc # selfdrive/ui/qt/onroad/buttons.h # selfdrive/ui/qt/onroad/driver_monitoring.cc # selfdrive/ui/qt/onroad/hud.cc # selfdrive/ui/qt/onroad/hud.h # selfdrive/ui/qt/onroad/model.cc # selfdrive/ui/qt/onroad/model.h # selfdrive/ui/qt/onroad/onroad_home.cc # selfdrive/ui/qt/onroad/onroad_home.h # selfdrive/ui/qt/request_repeater.h # selfdrive/ui/qt/sidebar.cc # selfdrive/ui/qt/sidebar.h # selfdrive/ui/qt/util.cc # selfdrive/ui/qt/widgets/cameraview.h # selfdrive/ui/qt/widgets/controls.cc # selfdrive/ui/qt/widgets/controls.h # selfdrive/ui/qt/widgets/input.cc # selfdrive/ui/qt/widgets/input.h # selfdrive/ui/qt/widgets/prime.cc # selfdrive/ui/qt/widgets/prime.h # selfdrive/ui/qt/widgets/ssh_keys.h # selfdrive/ui/qt/widgets/toggle.h # selfdrive/ui/qt/widgets/wifi.cc # selfdrive/ui/qt/widgets/wifi.h # selfdrive/ui/qt/window.cc # selfdrive/ui/qt/window.h # selfdrive/ui/tests/cycle_offroad_alerts.py # selfdrive/ui/tests/test_ui/run.py # selfdrive/ui/translations/main_ar.ts # selfdrive/ui/translations/main_de.ts # selfdrive/ui/translations/main_es.ts # selfdrive/ui/translations/main_fr.ts # selfdrive/ui/translations/main_ja.ts # selfdrive/ui/translations/main_ko.ts # selfdrive/ui/translations/main_nl.ts # selfdrive/ui/translations/main_pl.ts # selfdrive/ui/translations/main_pt-BR.ts # selfdrive/ui/translations/main_th.ts # selfdrive/ui/translations/main_tr.ts # selfdrive/ui/translations/main_zh-CHS.ts # selfdrive/ui/translations/main_zh-CHT.ts # selfdrive/ui/ui.cc # selfdrive/ui/ui.h # system/manager/build.py # system/version.py
71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
import jwt
|
|
import os
|
|
import requests
|
|
import unicodedata
|
|
from datetime import datetime, timedelta, UTC
|
|
from openpilot.system.hardware.hw import Paths
|
|
from openpilot.system.version import get_version
|
|
|
|
# name : jwt signature algorithm
|
|
KEYS = {"id_rsa" : "RS256",
|
|
"id_ecdsa" : "ES256"}
|
|
|
|
|
|
class BaseApi:
|
|
def __init__(self, dongle_id, api_host, user_agent="openpilot-"):
|
|
self.dongle_id = dongle_id
|
|
self.api_host = api_host
|
|
self.user_agent = user_agent
|
|
self.jwt_algorithm, self.private_key, _ = self.get_key_pair()
|
|
|
|
def get(self, *args, **kwargs):
|
|
return self.request('GET', *args, **kwargs)
|
|
|
|
def post(self, *args, **kwargs):
|
|
return self.request('POST', *args, **kwargs)
|
|
|
|
def request(self, method, endpoint, timeout=None, access_token=None, **params):
|
|
return self.api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params)
|
|
|
|
def _get_token(self, payload_extra=None, expiry_hours=1, **extra_payload):
|
|
now = datetime.now(UTC).replace(tzinfo=None)
|
|
payload = {
|
|
'identity': self.dongle_id,
|
|
'nbf': now,
|
|
'iat': now,
|
|
'exp': now + timedelta(hours=expiry_hours),
|
|
**extra_payload
|
|
}
|
|
if payload_extra is not None:
|
|
payload.update(payload_extra)
|
|
token = jwt.encode(payload, self.private_key, algorithm=self.jwt_algorithm)
|
|
if isinstance(token, bytes):
|
|
token = token.decode('utf8')
|
|
return token
|
|
|
|
def get_token(self, payload_extra=None, expiry_hours=1):
|
|
return self._get_token(payload_extra, expiry_hours)
|
|
|
|
def remove_non_ascii_chars(self, text):
|
|
normalized_text = unicodedata.normalize('NFD', text)
|
|
ascii_encoded_text = normalized_text.encode('ascii', 'ignore')
|
|
return ascii_encoded_text.decode()
|
|
|
|
def api_get(self, endpoint, method='GET', timeout=None, access_token=None, json=None, **params):
|
|
headers = {}
|
|
if access_token is not None:
|
|
headers['Authorization'] = "JWT " + access_token
|
|
|
|
version = self.remove_non_ascii_chars(get_version())
|
|
headers['User-Agent'] = self.user_agent + version
|
|
|
|
return requests.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, json=json, params=params)
|
|
|
|
@staticmethod
|
|
def get_key_pair():
|
|
for key in KEYS:
|
|
if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'):
|
|
with open(Paths.persist_root() + f'/comma/{key}') as private, open(Paths.persist_root() + f'/comma/{key}.pub') as public:
|
|
return KEYS[key], private.read(), public.read()
|
|
return None, None, None
|