mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 01:53:57 +08:00
# Conflicts: # common/api.py # docs/CARS.md # opendbc_repo # panda # scripts/lint/lint.sh # selfdrive/car/car_specific.py # selfdrive/car/card.py # selfdrive/test/process_replay/ref_commit # system/hardware/hardwared.py # tinygrad_repo
73 lines
2.7 KiB
Python
73 lines
2.7 KiB
Python
import jwt
|
|
import os
|
|
import requests
|
|
import unicodedata
|
|
from datetime import datetime, timedelta, UTC
|
|
from openpilot.system.hardware.hw import Paths
|
|
from openpilot.system.version import get_version
|
|
|
|
# name: jwt signature algorithm
|
|
KEYS = {"id_rsa": "RS256",
|
|
"id_ecdsa": "ES256"}
|
|
|
|
|
|
class BaseApi:
|
|
def __init__(self, dongle_id, api_host, user_agent="openpilot-"):
|
|
self.dongle_id = dongle_id
|
|
self.api_host = api_host
|
|
self.user_agent = user_agent
|
|
self.jwt_algorithm, self.private_key, _ = self.get_key_pair()
|
|
|
|
def get(self, *args, **kwargs):
|
|
return self.request('GET', *args, **kwargs)
|
|
|
|
def post(self, *args, **kwargs):
|
|
return self.request('POST', *args, **kwargs)
|
|
|
|
def request(self, method, endpoint, timeout=None, access_token=None, **params):
|
|
return self.api_get(endpoint, method=method, timeout=timeout, access_token=access_token, **params)
|
|
|
|
def _get_token(self, payload_extra=None, expiry_hours=1, **extra_payload):
|
|
now = datetime.now(UTC).replace(tzinfo=None)
|
|
payload = {
|
|
'identity': self.dongle_id,
|
|
'nbf': now,
|
|
'iat': now,
|
|
'exp': now + timedelta(hours=expiry_hours),
|
|
**extra_payload
|
|
}
|
|
if payload_extra is not None:
|
|
payload.update(payload_extra)
|
|
token = jwt.encode(payload, self.private_key, algorithm=self.jwt_algorithm)
|
|
if isinstance(token, bytes):
|
|
token = token.decode('utf8')
|
|
return token
|
|
|
|
def get_token(self, payload_extra=None, expiry_hours=1):
|
|
return self._get_token(payload_extra, expiry_hours)
|
|
|
|
def remove_non_ascii_chars(self, text):
|
|
normalized_text = unicodedata.normalize('NFD', text)
|
|
ascii_encoded_text = normalized_text.encode('ascii', 'ignore')
|
|
return ascii_encoded_text.decode()
|
|
|
|
def api_get(self, endpoint, method='GET', timeout=None, access_token=None, session=None, json=None, **params):
|
|
headers = {}
|
|
if access_token is not None:
|
|
headers['Authorization'] = "JWT " + access_token
|
|
|
|
version = self.remove_non_ascii_chars(get_version())
|
|
headers['User-Agent'] = self.user_agent + version
|
|
|
|
# TODO: add session to Api
|
|
req = requests if session is None else session
|
|
return req.request(method, f"{self.api_host}/{endpoint}", timeout=timeout, headers=headers, json=json, params=params)
|
|
|
|
@staticmethod
|
|
def get_key_pair() -> tuple[str, str, str] | tuple[None, None, None]:
|
|
for key in KEYS:
|
|
if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'):
|
|
with open(Paths.persist_root() + f'/comma/{key}') as private, open(Paths.persist_root() + f'/comma/{key}.pub') as public:
|
|
return KEYS[key], private.read(), public.read()
|
|
return None, None, None
|