From f8487cae23416baf563b468b0565286b519569e7 Mon Sep 17 00:00:00 2001 From: Nayan Date: Fri, 19 Dec 2025 15:49:24 -0500 Subject: [PATCH] sunnylink: elliptic curve keys support and improve key path handling (#1566) * support ecdsa for mici * lint * ugh * ugh ughain * more * symmetrical AES key derivation and some missing key handling * cleanup --------- Co-authored-by: Jason Wen --- sunnypilot/sunnylink/backups/manager.py | 4 +- sunnypilot/sunnylink/backups/utils.py | 84 +++++++++++++++---------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/sunnypilot/sunnylink/backups/manager.py b/sunnypilot/sunnylink/backups/manager.py index 1b3c623fc4..cc38476041 100644 --- a/sunnypilot/sunnylink/backups/manager.py +++ b/sunnypilot/sunnylink/backups/manager.py @@ -19,7 +19,7 @@ from openpilot.system.version import get_version from cereal import messaging, custom from openpilot.sunnypilot.sunnylink.api import SunnylinkApi -from openpilot.sunnypilot.sunnylink.backups.utils import decrypt_compressed_data, encrypt_compress_data, SnakeCaseEncoder +from openpilot.sunnypilot.sunnylink.backups.utils import decrypt_compressed_data, encrypt_compressed_data, SnakeCaseEncoder from openpilot.sunnypilot.sunnylink.utils import get_param_as_byte, save_param_from_base64_encoded_string @@ -95,7 +95,7 @@ class BackupManagerSP: # Serialize and encrypt config data config_json = json.dumps(config_data) - encrypted_config = encrypt_compress_data(config_json, use_aes_256=True) + encrypted_config = encrypt_compressed_data(config_json, use_aes_256=True) self._update_progress(50.0, OperationType.BACKUP) backup_info = custom.BackupManagerSP.BackupInfo() diff --git a/sunnypilot/sunnylink/backups/utils.py b/sunnypilot/sunnylink/backups/utils.py index 1734a7efcf..a81a13b2c7 100644 --- a/sunnypilot/sunnylink/backups/utils.py +++ b/sunnypilot/sunnylink/backups/utils.py @@ -4,9 +4,9 @@ Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors. This file is part of sunnypilot and is licensed under the MIT License. See the LICENSE.md file in the root directory for more details. """ - import base64 import hashlib +import os import zlib import re import json @@ -14,8 +14,9 @@ from pathlib import Path from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from openpilot.common.api.base import KEYS from openpilot.sunnypilot.sunnylink.backups.AESCipher import AESCipher from openpilot.system.hardware.hw import Paths @@ -27,37 +28,43 @@ class KeyDerivation: return f.read() @staticmethod - def derive_aes_key_iv_from_rsa(key_path: str, use_aes_256: bool) -> tuple[bytes, bytes]: - rsa_key_pem: bytes = KeyDerivation._load_key(key_path) - key_plain = rsa_key_pem.decode(errors="ignore") + def derive_aes_key_iv(key_path: str, use_aes_256: bool) -> tuple[bytes, bytes]: + key_pem: bytes = KeyDerivation._load_key(key_path) + key_plain = key_pem.decode(errors="ignore") if "private" in key_plain.lower(): - private_key = serialization.load_pem_private_key(rsa_key_pem, password=None, backend=default_backend()) - if not isinstance(private_key, rsa.RSAPrivateKey): - raise ValueError("Invalid RSA key format: Unable to determine if key is public or private.") - - der_data = private_key.private_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption() - ) + private_key = serialization.load_pem_private_key(key_pem, password=None, backend=default_backend()) + if isinstance(private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey)): + public_key = private_key.public_key() + else: + raise ValueError("Invalid key format: Unable to determine if key is public or private.") elif "public" in key_plain.lower(): - public_key = serialization.load_pem_public_key(rsa_key_pem, backend=default_backend()) - if not isinstance(public_key, rsa.RSAPublicKey): - raise ValueError("Invalid RSA key format: Unable to determine if key is public or private.") - - der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1) + public_key = serialization.load_pem_public_key(key_pem, backend=default_backend()) # type: ignore[assignment] + if not isinstance(public_key, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)): + raise ValueError("Invalid key format: Unable to determine if key is public or private.") else: - raise ValueError("Unknown key format: Unable to determine if key is public or private.") + raise ValueError("Invalid key format: Unable to determine if key is public or private.") - sha256_hash = hashlib.sha256(der_data).digest() - aes_key = sha256_hash[:32] if use_aes_256 else sha256_hash[:16] - aes_iv = sha256_hash[16:32] + if isinstance(public_key, rsa.RSAPublicKey): + der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1) + elif isinstance(public_key, ec.EllipticCurvePublicKey): + der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo) + else: + raise ValueError("Unsupported key type.") - return aes_key, aes_iv + if use_aes_256: + # AES-256-CBC + key = hashlib.sha256(der_data).digest() + iv = hashlib.md5(der_data).digest() + else: + # AES-128-CBC + key = hashlib.md5(der_data).digest() + iv = hashlib.md5(der_data).digest() # Insecure IV reuse, kept for compatibility + + return key, iv -def qUncompress(data): +def uncompress_dat(data): """ Decompress data using zlib. @@ -71,7 +78,7 @@ def qUncompress(data): return zlib.decompress(data_stripped_4) -def qCompress(data): +def compress_dat(data): """ Compress data using zlib. @@ -85,6 +92,19 @@ def qCompress(data): return b"ZLIB" + compressed_data +def get_key_path(use_aes_256=False) -> str: + key_path = "" + for key in KEYS: + if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'): + key_path = str(Path(Paths.persist_root() + f'/comma/{key}') if use_aes_256 else Path(Paths.persist_root() + f'/comma/{key}.pub')) + break + + if not key_path: + raise FileNotFoundError("No valid key pair found in persist storage.") + + return key_path + + def decrypt_compressed_data(encrypted_base64, use_aes_256=False): """ Decrypt and decompress data from base64 string. @@ -96,18 +116,17 @@ def decrypt_compressed_data(encrypted_base64, use_aes_256=False): Returns: str: Decrypted and decompressed string """ - key_path = Path(f"{Paths.persist_root()}/comma/id_rsa") if use_aes_256 else Path(f"{Paths.persist_root()}/comma/id_rsa.pub") try: # Decode base64 encrypted_data = base64.b64decode(encrypted_base64) # Decrypt - key, iv = KeyDerivation.derive_aes_key_iv_from_rsa(str(key_path), use_aes_256) + key, iv = KeyDerivation.derive_aes_key_iv(get_key_path(use_aes_256), use_aes_256) cipher = AESCipher(key, iv) decrypted_data = cipher.decrypt(encrypted_data) # Decompress - decompressed_data = qUncompress(decrypted_data) + decompressed_data = uncompress_dat(decrypted_data) # Decode UTF-8 result = decompressed_data.decode('utf-8') @@ -117,7 +136,7 @@ def decrypt_compressed_data(encrypted_base64, use_aes_256=False): return "" -def encrypt_compress_data(text, use_aes_256=True): +def encrypt_compressed_data(text, use_aes_256=True): """ Compress and encrypt string data to base64. @@ -128,16 +147,15 @@ def encrypt_compress_data(text, use_aes_256=True): Returns: str: Base64 encoded encrypted data """ - key_path = Path(f"{Paths.persist_root()}/comma/id_rsa") if use_aes_256 else Path(f"{Paths.persist_root()}/comma/id_rsa.pub") try: # Encode to UTF-8 text_bytes = text.encode('utf-8') # Compress - compressed_data = qCompress(text_bytes) + compressed_data = compress_dat(text_bytes) # Encrypt - key, iv = KeyDerivation.derive_aes_key_iv_from_rsa(str(key_path), use_aes_256) + key, iv = KeyDerivation.derive_aes_key_iv(get_key_path(use_aes_256), use_aes_256) cipher = AESCipher(key, iv) encrypted_data = cipher.encrypt(compressed_data)