Files
github-actions[bot] 7fa972be6a sunnypilot v2026.02.09-4080
version: sunnypilot v2025.003.000 (dev)
date: 2026-02-09T02:04:38
master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
2026-02-09 02:04:38 +00:00

287 lines
11 KiB
Python

"""
Copyright (c) 2021-, Haibin Wen, sunnypilot, and a number of other contributors.
This file is part of sunnypilot and is licensed under the MIT License.
See the LICENSE.md file in the root directory for more details.
"""
import base64
import json
import requests
import time
from enum import Enum
from typing import Any
from openpilot.common.git import get_branch
from openpilot.common.params import Params, ParamKeyFlag
from openpilot.common.realtime import Ratekeeper
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_version
from cereal import messaging, custom
from openpilot.sunnypilot.sunnylink.api import SunnylinkApi
from openpilot.sunnypilot.sunnylink.backups.utils import decrypt_compressed_data, encrypt_compressed_data, SnakeCaseEncoder
from openpilot.sunnypilot.sunnylink.utils import get_param_as_byte, save_param_from_base64_encoded_string
class OperationType(Enum):
BACKUP = "backup"
RESTORE = "restore"
class BackupManagerSP:
"""Manages device configuration backups to/from sunnylink"""
def __init__(self):
self.params = Params()
self.device_id = self.params.get("SunnylinkDongleId")
self.api = SunnylinkApi(self.device_id)
self.pm = messaging.PubMaster(["backupManagerSP"])
# Status tracking
self.backup_status = custom.BackupManagerSP.Status.idle
self.restore_status = custom.BackupManagerSP.Status.idle
# Unified progress & operation type (only one operation runs at a time)
self.progress = 0.0
self.operation: OperationType | None = None
self.last_error = ""
self._session = requests.Session() # reuse session to reduce SSL handshake overhead
def _report_status(self) -> None:
"""Reports current backup manager state through the messaging system."""
msg = messaging.new_message('backupManagerSP', valid=True)
backup_state = msg.backupManagerSP
backup_state.backupStatus = self.backup_status
backup_state.restoreStatus = self.restore_status
# Both progress fields use the unified progress value
backup_state.backupProgress = self.progress
backup_state.restoreProgress = self.progress
backup_state.lastError = self.last_error
# Optionally, add a field for operation type if supported:
# backup_state.operationType = self.operation.value if self.operation else "none"
self.pm.send('backupManagerSP', msg)
def _update_progress(self, progress: float, op_type: OperationType) -> None:
"""Updates the unified progress and operation type, then reports status."""
self.progress = progress
self.operation = op_type
self._report_status()
def _collect_config_data(self) -> dict[str, Any]:
"""Collects configuration data to be backed up."""
config_data = {}
params_to_backup = [k.decode('utf-8') for k in self.params.all_keys(ParamKeyFlag.BACKUP)]
for param in params_to_backup:
value = get_param_as_byte(param)
if value is not None:
config_data[param] = base64.b64encode(value).decode('utf-8')
return config_data
def _get_metadata_value(self, metadata_list, key, default_value=None):
return next((entry.get("value") for entry in metadata_list if entry.get("key") == key), default_value)
async def create_backup(self) -> bool:
"""Creates and uploads a new backup to sunnylink."""
try:
self.backup_status = custom.BackupManagerSP.Status.inProgress
self._update_progress(0.0, OperationType.BACKUP)
# Collect configuration data
config_data = self._collect_config_data()
self._update_progress(25.0, OperationType.BACKUP)
# Serialize and encrypt config data
config_json = json.dumps(config_data)
encrypted_config = encrypt_compressed_data(config_json, use_aes_256=True)
self._update_progress(50.0, OperationType.BACKUP)
backup_info = custom.BackupManagerSP.BackupInfo()
backup_info.deviceId = self.device_id
backup_info.config = encrypted_config
backup_info.isEncrypted = True
backup_info.createdAt = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
backup_info.updatedAt = backup_info.createdAt
backup_info.sunnypilotVersion = self._get_current_version()
backup_info.backupMetadata = [
custom.BackupManagerSP.MetadataEntry(key="creator", value="BackupManagerSP"),
custom.BackupManagerSP.MetadataEntry(key="all_values_encoded", value="True"),
custom.BackupManagerSP.MetadataEntry(key="AES", value="256")
]
payload = json.loads(json.dumps(backup_info.to_dict(), cls=SnakeCaseEncoder))
self._update_progress(75.0, OperationType.BACKUP)
cloudlog.debug(f"Uploading backup with payload: {json.dumps(payload)}")
# Upload to sunnylink
result = self.api.api_get(
f"backup/{self.device_id}",
method='PUT',
access_token=self.api.get_token(),
json=payload,
session=self._session
)
if result:
self.backup_status = custom.BackupManagerSP.Status.completed
self._update_progress(100.0, OperationType.BACKUP)
cloudlog.info("Backup successfully created and uploaded")
else:
self.backup_status = custom.BackupManagerSP.Status.failed
self.last_error = "Failed to upload backup"
cloudlog.error(result)
self._report_status()
return bool(self.backup_status == custom.BackupManagerSP.Status.completed)
except Exception as e:
cloudlog.exception(f"Error creating backup: {str(e)}")
self.backup_status = custom.BackupManagerSP.Status.failed
self.last_error = str(e)
self._report_status()
return False
async def restore_backup(self, version: int | None = None) -> bool:
"""Restores a backup from sunnylink."""
try:
self.restore_status = custom.BackupManagerSP.Status.inProgress
self._update_progress(0.0, OperationType.RESTORE)
# Get backup data from API for the specified version
endpoint = f"backup/{self.device_id}" + f"/{version or ''}" + "?api-version=1"
backup_data = self.api.api_get(endpoint, access_token=self.api.get_token(), session=self._session)
if not backup_data:
raise Exception(f"No backup found for device {self.device_id}")
self._update_progress(25.0, OperationType.RESTORE)
data = backup_data.json()
backup_metadata = data.get("backup_metadata", [])
encrypted_config = data.get("config", "")
if not encrypted_config:
raise Exception("Empty backup configuration")
self._update_progress(50.0, OperationType.RESTORE)
# Decrypt config and load data
use_aes_256 = self._get_metadata_value(backup_metadata, "AES", "128") == "256"
config_json = decrypt_compressed_data(encrypted_config, use_aes_256)
if not config_json:
raise Exception("Failed to decrypt backup configuration")
config_data = json.loads(config_json)
self._update_progress(75.0, OperationType.RESTORE)
# Apply configuration
self._apply_config(config_data)
self.restore_status = custom.BackupManagerSP.Status.completed
self._update_progress(100.0, OperationType.RESTORE)
return True
except Exception as e:
cloudlog.exception(f"Error restoring backup: {str(e)}")
self.restore_status = custom.BackupManagerSP.Status.failed
self.last_error = str(e)
self._report_status()
return False
def _apply_config(self, config_data: dict[str, str]) -> None:
"""Applies configuration data from a backup, but only for parameters marked as backupable."""
backupable_params = [k.decode('utf-8') for k in self.params.all_keys(ParamKeyFlag.BACKUP)]
backupable_set_lower = {p.lower() for p in backupable_params}
restored_count = 0
skipped_count = 0
for param, encoded_value in config_data.items():
if param.lower() in backupable_set_lower:
# Find real param name (with correct casing)
real_param = next(p for p in backupable_params if p.lower() == param.lower())
try:
save_param_from_base64_encoded_string(real_param, encoded_value)
restored_count += 1
except Exception as e:
cloudlog.error(f"Failed to restore param {param}: {str(e)}")
else:
skipped_count += 1
cloudlog.info(f"Skipped restoring param {param}: not marked for backup in current version")
cloudlog.info(f"Restore complete: {restored_count} params restored, {skipped_count} params skipped")
def _get_current_version(self) -> custom.BackupManagerSP.Version:
"""Gets current sunnypilot version information."""
version_obj = custom.BackupManagerSP.Version()
version_str = get_version()
version_parts = version_str.split('-') # For when version is like "1.2.3-456"
version_nums = version_parts[0].split('.')
# Extract build number from hyphen format or as 4th version component
build = 0
if len(version_parts) > 1 and version_parts[1].isdigit():
build = int(version_parts[1])
elif len(version_nums) > 3 and version_nums[3].isdigit():
build = int(version_nums[3])
# Set version components with safer defaults
version_obj.major = int(version_nums[0]) if len(version_nums) > 0 and version_nums[0].isdigit() else 0
version_obj.minor = int(version_nums[1]) if len(version_nums) > 1 and version_nums[1].isdigit() else 0
version_obj.patch = int(version_nums[2]) if len(version_nums) > 2 and version_nums[2].isdigit() else 0
version_obj.build = build
version_obj.branch = get_branch()
return version_obj
async def main_thread(self) -> None:
"""Main thread for backup management."""
rk = Ratekeeper(1, print_delay_threshold=None)
reset_progress = False
while True:
try:
if reset_progress:
self.progress = 100.0
self.operation = None
self.restore_status = custom.BackupManagerSP.Status.idle
self.backup_status = custom.BackupManagerSP.Status.idle
# Check for backup command
if self.params.get_bool("BackupManager_CreateBackup"):
try:
if await self.create_backup():
reset_progress = True
finally:
self.params.remove("BackupManager_CreateBackup")
# Check for restore command
restore_version = self.params.get("BackupManager_RestoreVersion")
if restore_version:
try:
version = int(restore_version) if restore_version.isdigit() else None
await self.restore_backup(version)
reset_progress = True
finally:
self.params.remove("BackupManager_RestoreVersion")
self._report_status()
rk.keep_time()
except Exception as e:
cloudlog.exception(f"Error in backup manager main thread: {str(e)}")
self.last_error = str(e)
self._report_status()
rk.keep_time()
def main():
import asyncio
asyncio.run(BackupManagerSP().main_thread())
if __name__ == "__main__":
main()