sunnylink: elliptic curve keys support and improve key path handling (#1566)

* support ecdsa for mici

* lint

* ugh

* ugh ughain

* more

* symmetrical AES key derivation and some missing key handling

* cleanup

---------

Co-authored-by: Jason Wen <haibin.wen3@gmail.com>
This commit is contained in:
Nayan
2025-12-19 15:49:24 -05:00
committed by GitHub
parent 2e576178cb
commit f8487cae23
2 changed files with 53 additions and 35 deletions

View File

@@ -19,7 +19,7 @@ from openpilot.system.version import get_version
from cereal import messaging, custom
from openpilot.sunnypilot.sunnylink.api import SunnylinkApi
from openpilot.sunnypilot.sunnylink.backups.utils import decrypt_compressed_data, encrypt_compress_data, SnakeCaseEncoder
from openpilot.sunnypilot.sunnylink.backups.utils import decrypt_compressed_data, encrypt_compressed_data, SnakeCaseEncoder
from openpilot.sunnypilot.sunnylink.utils import get_param_as_byte, save_param_from_base64_encoded_string
@@ -95,7 +95,7 @@ class BackupManagerSP:
# Serialize and encrypt config data
config_json = json.dumps(config_data)
encrypted_config = encrypt_compress_data(config_json, use_aes_256=True)
encrypted_config = encrypt_compressed_data(config_json, use_aes_256=True)
self._update_progress(50.0, OperationType.BACKUP)
backup_info = custom.BackupManagerSP.BackupInfo()

View File

@@ -4,9 +4,9 @@ Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import base64
import hashlib
import os
import zlib
import re
import json
@@ -14,8 +14,9 @@ from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from openpilot.common.api.base import KEYS
from openpilot.sunnypilot.sunnylink.backups.AESCipher import AESCipher
from openpilot.system.hardware.hw import Paths
@@ -27,37 +28,43 @@ class KeyDerivation:
return f.read()
@staticmethod
def derive_aes_key_iv_from_rsa(key_path: str, use_aes_256: bool) -> tuple[bytes, bytes]:
rsa_key_pem: bytes = KeyDerivation._load_key(key_path)
key_plain = rsa_key_pem.decode(errors="ignore")
def derive_aes_key_iv(key_path: str, use_aes_256: bool) -> tuple[bytes, bytes]:
key_pem: bytes = KeyDerivation._load_key(key_path)
key_plain = key_pem.decode(errors="ignore")
if "private" in key_plain.lower():
private_key = serialization.load_pem_private_key(rsa_key_pem, password=None, backend=default_backend())
if not isinstance(private_key, rsa.RSAPrivateKey):
raise ValueError("Invalid RSA key format: Unable to determine if key is public or private.")
der_data = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
private_key = serialization.load_pem_private_key(key_pem, password=None, backend=default_backend())
if isinstance(private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey)):
public_key = private_key.public_key()
else:
raise ValueError("Invalid key format: Unable to determine if key is public or private.")
elif "public" in key_plain.lower():
public_key = serialization.load_pem_public_key(rsa_key_pem, backend=default_backend())
if not isinstance(public_key, rsa.RSAPublicKey):
raise ValueError("Invalid RSA key format: Unable to determine if key is public or private.")
der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1)
public_key = serialization.load_pem_public_key(key_pem, backend=default_backend()) # type: ignore[assignment]
if not isinstance(public_key, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)):
raise ValueError("Invalid key format: Unable to determine if key is public or private.")
else:
raise ValueError("Unknown key format: Unable to determine if key is public or private.")
raise ValueError("Invalid key format: Unable to determine if key is public or private.")
sha256_hash = hashlib.sha256(der_data).digest()
aes_key = sha256_hash[:32] if use_aes_256 else sha256_hash[:16]
aes_iv = sha256_hash[16:32]
if isinstance(public_key, rsa.RSAPublicKey):
der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
der_data = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)
else:
raise ValueError("Unsupported key type.")
return aes_key, aes_iv
if use_aes_256:
# AES-256-CBC
key = hashlib.sha256(der_data).digest()
iv = hashlib.md5(der_data).digest()
else:
# AES-128-CBC
key = hashlib.md5(der_data).digest()
iv = hashlib.md5(der_data).digest() # Insecure IV reuse, kept for compatibility
return key, iv
def qUncompress(data):
def uncompress_dat(data):
"""
Decompress data using zlib.
@@ -71,7 +78,7 @@ def qUncompress(data):
return zlib.decompress(data_stripped_4)
def qCompress(data):
def compress_dat(data):
"""
Compress data using zlib.
@@ -85,6 +92,19 @@ def qCompress(data):
return b"ZLIB" + compressed_data
def get_key_path(use_aes_256=False) -> str:
key_path = ""
for key in KEYS:
if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'):
key_path = str(Path(Paths.persist_root() + f'/comma/{key}') if use_aes_256 else Path(Paths.persist_root() + f'/comma/{key}.pub'))
break
if not key_path:
raise FileNotFoundError("No valid key pair found in persist storage.")
return key_path
def decrypt_compressed_data(encrypted_base64, use_aes_256=False):
"""
Decrypt and decompress data from base64 string.
@@ -96,18 +116,17 @@ def decrypt_compressed_data(encrypted_base64, use_aes_256=False):
Returns:
str: Decrypted and decompressed string
"""
key_path = Path(f"{Paths.persist_root()}/comma/id_rsa") if use_aes_256 else Path(f"{Paths.persist_root()}/comma/id_rsa.pub")
try:
# Decode base64
encrypted_data = base64.b64decode(encrypted_base64)
# Decrypt
key, iv = KeyDerivation.derive_aes_key_iv_from_rsa(str(key_path), use_aes_256)
key, iv = KeyDerivation.derive_aes_key_iv(get_key_path(use_aes_256), use_aes_256)
cipher = AESCipher(key, iv)
decrypted_data = cipher.decrypt(encrypted_data)
# Decompress
decompressed_data = qUncompress(decrypted_data)
decompressed_data = uncompress_dat(decrypted_data)
# Decode UTF-8
result = decompressed_data.decode('utf-8')
@@ -117,7 +136,7 @@ def decrypt_compressed_data(encrypted_base64, use_aes_256=False):
return ""
def encrypt_compress_data(text, use_aes_256=True):
def encrypt_compressed_data(text, use_aes_256=True):
"""
Compress and encrypt string data to base64.
@@ -128,16 +147,15 @@ def encrypt_compress_data(text, use_aes_256=True):
Returns:
str: Base64 encoded encrypted data
"""
key_path = Path(f"{Paths.persist_root()}/comma/id_rsa") if use_aes_256 else Path(f"{Paths.persist_root()}/comma/id_rsa.pub")
try:
# Encode to UTF-8
text_bytes = text.encode('utf-8')
# Compress
compressed_data = qCompress(text_bytes)
compressed_data = compress_dat(text_bytes)
# Encrypt
key, iv = KeyDerivation.derive_aes_key_iv_from_rsa(str(key_path), use_aes_256)
key, iv = KeyDerivation.derive_aes_key_iv(get_key_path(use_aes_256), use_aes_256)
cipher = AESCipher(key, iv)
encrypted_data = cipher.encrypt(compressed_data)