mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-18 22:23:56 +08:00
uploader: compress with zstd (#32736)
* zstd uploader
* fix that
* fix name of function
* comment
* log failed
* fix comma_api_source for routes with both bz2 and zst rlogs
* TODO
* 10-14 achieves almost no benefit on qlogs in a few cases, but takes 2x the time
* these aren't written out
* regen: specify any list of sources
ooh this is pretty nice
* regen and process replay
* damn, actually we don't need all this (cool tho)
Revert "regen: specify any list of sources"
This reverts commit ceb0b4abed9ad463a9fe98d9b98a05875a52806f.
* just let it auto resolve
* fix athenad/uploader tests
* zst here too
* TODOs
* yes
* Revert "TODOs"
This reverts commit 8c7da1dbd0340c72290b5eb5563b642080ddc131.
* Revert "zst here too"
This reverts commit 23b0023ddfd22c8090be7a7caa09e7026a12aa5c.
* Revert "just let it auto resolve"
This reverts commit f296d62424227ad05facc62abc18a6f81b474e84.
* Revert "regen and process replay"
This reverts commit 0768330e96974a42616d229d159780619d049cd0.
* revert readme
* not in save_log either
* lfg
* Revert "lfg"
This reverts commit 3718559c6c4de7d1f0c80dc9f1a1d335fe679a89.
old-commit-hash: 7dec7c39be
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import bz2
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
@@ -15,6 +14,7 @@ import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import zstd
|
||||
from dataclasses import asdict, dataclass, replace
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
@@ -35,6 +35,7 @@ from openpilot.common.file_helpers import CallbackReader
|
||||
from openpilot.common.params import Params
|
||||
from openpilot.common.realtime import set_core_affinity
|
||||
from openpilot.system.hardware import HARDWARE, PC
|
||||
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
|
||||
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
|
||||
from openpilot.common.swaglog import cloudlog
|
||||
from openpilot.system.version import get_build_metadata
|
||||
@@ -103,8 +104,8 @@ cancelled_uploads: set[str] = set()
|
||||
cur_upload_items: dict[int, UploadItem | None] = {}
|
||||
|
||||
|
||||
def strip_bz2_extension(fn: str) -> str:
|
||||
if fn.endswith('.bz2'):
|
||||
def strip_zst_extension(fn: str) -> str:
|
||||
if fn.endswith('.zst'):
|
||||
return fn[:-4]
|
||||
return fn
|
||||
|
||||
@@ -283,16 +284,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
|
||||
path = upload_item.path
|
||||
compress = False
|
||||
|
||||
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
|
||||
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
|
||||
path = strip_bz2_extension(path)
|
||||
# If file does not exist, but does exist without the .zst extension we will compress on the fly
|
||||
if not os.path.exists(path) and os.path.exists(strip_zst_extension(path)):
|
||||
path = strip_zst_extension(path)
|
||||
compress = True
|
||||
|
||||
with open(path, "rb") as f:
|
||||
content = f.read()
|
||||
if compress:
|
||||
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
|
||||
content = bz2.compress(content)
|
||||
content = zstd.compress(content, LOG_COMPRESSION_LEVEL)
|
||||
|
||||
with io.BytesIO(content) as data:
|
||||
return requests.put(upload_item.url,
|
||||
@@ -375,7 +376,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
|
||||
continue
|
||||
|
||||
path = os.path.join(Paths.log_root(), file.fn)
|
||||
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
|
||||
if not os.path.exists(path) and not os.path.exists(strip_zst_extension(path)):
|
||||
failed.append(file.fn)
|
||||
continue
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ def seed_athena_server(host, port):
|
||||
with Timeout(2, 'HTTP Server seeding failed'):
|
||||
while True:
|
||||
try:
|
||||
requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10)
|
||||
requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10)
|
||||
break
|
||||
except requests.exceptions.ConnectionError:
|
||||
time.sleep(0.1)
|
||||
@@ -174,54 +174,59 @@ class TestAthenadMethods:
|
||||
assert resp, 'list empty!'
|
||||
assert len(resp) == len(expected)
|
||||
|
||||
def test_strip_bz2_extension(self):
|
||||
def test_strip_extension(self):
|
||||
# any requested log file with an invalid extension won't return as existing
|
||||
fn = self._create_file('qlog.bz2')
|
||||
if fn.endswith('.bz2'):
|
||||
assert athenad.strip_bz2_extension(fn) == fn[:-4]
|
||||
assert athenad.strip_zst_extension(fn) == fn
|
||||
|
||||
fn = self._create_file('qlog.zst')
|
||||
if fn.endswith('.zst'):
|
||||
assert athenad.strip_zst_extension(fn) == fn[:-4]
|
||||
|
||||
@pytest.mark.parametrize("compress", [True, False])
|
||||
def test_do_upload(self, host, compress):
|
||||
# random bytes to ensure rather large object post-compression
|
||||
fn = self._create_file('qlog', data=os.urandom(10000 * 1024))
|
||||
|
||||
upload_fn = fn + ('.bz2' if compress else '')
|
||||
upload_fn = fn + ('.zst' if compress else '')
|
||||
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
|
||||
with pytest.raises(requests.exceptions.ConnectionError):
|
||||
athenad._do_upload(item)
|
||||
|
||||
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
|
||||
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='')
|
||||
resp = athenad._do_upload(item)
|
||||
assert resp.status_code == 201
|
||||
|
||||
def test_upload_file_to_url(self, host):
|
||||
fn = self._create_file('qlog.bz2')
|
||||
fn = self._create_file('qlog.zst')
|
||||
|
||||
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
|
||||
resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {})
|
||||
assert resp['enqueued'] == 1
|
||||
assert 'failed' not in resp
|
||||
assert {"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}.items() <= resp['items'][0].items()
|
||||
assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items()
|
||||
assert resp['items'][0].get('id') is not None
|
||||
assert athenad.upload_queue.qsize() == 1
|
||||
|
||||
def test_upload_file_to_url_duplicate(self, host):
|
||||
self._create_file('qlog.bz2')
|
||||
self._create_file('qlog.zst')
|
||||
|
||||
url1 = f"{host}/qlog.bz2?sig=sig1"
|
||||
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {})
|
||||
url1 = f"{host}/qlog.zst?sig=sig1"
|
||||
dispatcher["uploadFileToUrl"]("qlog.zst", url1, {})
|
||||
|
||||
# Upload same file again, but with different signature
|
||||
url2 = f"{host}/qlog.bz2?sig=sig2"
|
||||
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {})
|
||||
url2 = f"{host}/qlog.zst?sig=sig2"
|
||||
resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {})
|
||||
assert resp == {'enqueued': 0, 'items': []}
|
||||
|
||||
def test_upload_file_to_url_does_not_exist(self, host):
|
||||
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {})
|
||||
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}
|
||||
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {})
|
||||
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']}
|
||||
|
||||
@with_upload_handler
|
||||
def test_upload_handler(self, host):
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
fn = self._create_file('qlog.zst')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self._wait_for_upload()
|
||||
@@ -236,8 +241,8 @@ class TestAthenadMethods:
|
||||
def test_upload_handler_retry(self, mocker, host, status, retry):
|
||||
mock_put = mocker.patch('requests.put')
|
||||
mock_put.return_value.status_code = status
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
fn = self._create_file('qlog.zst')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self._wait_for_upload()
|
||||
@@ -251,8 +256,8 @@ class TestAthenadMethods:
|
||||
@with_upload_handler
|
||||
def test_upload_handler_timeout(self):
|
||||
"""When an upload times out or fails to connect it should be placed back in the queue"""
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
fn = self._create_file('qlog.zst')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)
|
||||
|
||||
athenad.upload_queue.put_nowait(item_no_retry)
|
||||
@@ -272,7 +277,7 @@ class TestAthenadMethods:
|
||||
|
||||
@with_upload_handler
|
||||
def test_cancel_upload(self):
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
|
||||
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
|
||||
created_at=int(time.time()*1000), id='id', allow_cellular=True)
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
dispatcher["cancelUpload"](item.id)
|
||||
@@ -291,8 +296,8 @@ class TestAthenadMethods:
|
||||
ts = int(t_future.strftime("%s")) * 1000
|
||||
|
||||
# Item that would time out if actually uploaded
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
|
||||
fn = self._create_file('qlog.zst')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True)
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self._wait_for_upload()
|
||||
@@ -306,8 +311,8 @@ class TestAthenadMethods:
|
||||
|
||||
@with_upload_handler
|
||||
def test_list_upload_queue_current(self, host: str):
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
fn = self._create_file('qlog.zst')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self._wait_for_upload()
|
||||
@@ -317,7 +322,7 @@ class TestAthenadMethods:
|
||||
assert items[0]['current']
|
||||
|
||||
def test_list_upload_queue(self):
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
|
||||
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
|
||||
created_at=int(time.time()*1000), id='id', allow_cellular=True)
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
|
||||
|
||||
@@ -62,10 +62,10 @@ class TestUploader(UploaderTestCase):
|
||||
def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]:
|
||||
keys = []
|
||||
if boot:
|
||||
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
|
||||
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
|
||||
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
|
||||
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
|
||||
keys += [f"boot/{self.seg_format.format(i)}.zst" for i in seg1]
|
||||
keys += [f"boot/{self.seg_format2.format(i)}.zst" for i in seg2]
|
||||
keys += [f"{self.seg_format.format(i)}/qlog.zst" for i in seg1]
|
||||
keys += [f"{self.seg_format2.format(i)}/qlog.zst" for i in seg2]
|
||||
return keys
|
||||
|
||||
def test_upload(self):
|
||||
@@ -159,7 +159,7 @@ class TestUploader(UploaderTestCase):
|
||||
self.join_thread()
|
||||
|
||||
for f_path in f_paths:
|
||||
fn = f_path.with_suffix(f_path.suffix.replace(".bz2", ""))
|
||||
fn = f_path.with_suffix(f_path.suffix.replace(".zst", ""))
|
||||
uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
|
||||
assert not uploaded, "File upload when locked"
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#!/usr/bin/env python3
|
||||
import bz2
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
@@ -9,6 +8,7 @@ import threading
|
||||
import time
|
||||
import traceback
|
||||
import datetime
|
||||
import zstd
|
||||
from typing import BinaryIO
|
||||
from collections.abc import Iterator
|
||||
|
||||
@@ -26,6 +26,7 @@ UPLOAD_ATTR_NAME = 'user.upload'
|
||||
UPLOAD_ATTR_VALUE = b'1'
|
||||
|
||||
UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB
|
||||
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
|
||||
|
||||
allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
|
||||
force_wifi = os.getenv("FORCEWIFI") is not None
|
||||
@@ -83,7 +84,7 @@ class Uploader:
|
||||
self.last_filename = ""
|
||||
|
||||
self.immediate_folders = ["crash/", "boot/"]
|
||||
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
|
||||
self.immediate_priority = {"qlog": 0, "qlog.zst": 0, "qcamera.ts": 1}
|
||||
|
||||
def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
|
||||
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
|
||||
@@ -152,8 +153,8 @@ class Uploader:
|
||||
|
||||
with open(fn, "rb") as f:
|
||||
data: BinaryIO
|
||||
if key.endswith('.bz2') and not fn.endswith('.bz2'):
|
||||
compressed = bz2.compress(f.read())
|
||||
if key.endswith('.zst') and not fn.endswith('.zst'):
|
||||
compressed = zstd.compress(f.read(), LOG_COMPRESSION_LEVEL)
|
||||
data = io.BytesIO(compressed)
|
||||
else:
|
||||
data = f
|
||||
@@ -218,8 +219,8 @@ class Uploader:
|
||||
name, key, fn = d
|
||||
|
||||
# qlogs and bootlogs need to be compressed before uploading
|
||||
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
|
||||
key += ".bz2"
|
||||
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zst')):
|
||||
key += ".zst"
|
||||
|
||||
return self.upload(name, key, fn, network_type, metered)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user