Files
sunnypilot/system/loggerd/tests/loggerd_tests_common.py
Adeeb Shihadeh d6a8213c80 uploader cleanup (#31035)
* Reapply "uploader cleanup" (#31033)

This reverts commit a2723989bced503a4506684a4a6fdc6c8101f7b2.

* always sleep

* more cleanup

* little more

* fix linter

* little more
old-commit-hash: 0b5fd7287e
2024-01-17 14:24:09 -08:00

93 lines
2.5 KiB
Python

import os
import random
import unittest
from pathlib import Path
from typing import Optional
import openpilot.system.loggerd.deleter as deleter
import openpilot.system.loggerd.uploader as uploader
from openpilot.common.params import Params
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import setxattr
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
if lock:
lock_path = str(file_path) + ".lock"
os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL))
chunks = 128
chunk_bytes = int(size_mb * 1024 * 1024 / chunks)
data = os.urandom(chunk_bytes)
with open(file_path, "wb") as f:
for _ in range(chunks):
f.write(data)
if upload_xattr is not None:
setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr)
class MockResponse():
def __init__(self, text, status_code):
self.text = text
self.status_code = status_code
class MockApi():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200)
def get_token(self):
return "fake-token"
class MockApiIgnore():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('', 412)
def get_token(self):
return "fake-token"
class UploaderTestCase(unittest.TestCase):
f_type = "UNKNOWN"
root: Path
seg_num: int
seg_format: str
seg_format2: str
seg_dir: str
def set_ignore(self):
uploader.Api = MockApiIgnore
def setUp(self):
uploader.Api = MockApi
uploader.fake_upload = True
uploader.force_wifi = True
uploader.allow_sleep = False
self.seg_num = random.randint(1, 300)
self.seg_format = "2019-04-18--12-52-54--{}"
self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num)
self.params = Params()
self.params.put("IsOffroad", "1")
self.params.put("DongleId", "0000000000000000")
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = Path(Paths.log_root()) / f_dir / fn
create_random_file(file_path, size_mb, lock, upload_xattr)
if preserve_xattr is not None:
setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr)
return file_path