mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-18 20:03:53 +08:00
athenad: small tests cleanup (#26037)
* test helpers
* create file helper
* clearer
* type
* fix default create path
* static methods
old-commit-hash: 9e2a1121ea
This commit is contained in:
@@ -9,6 +9,7 @@ import threading
|
||||
import queue
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from multiprocessing import Process
|
||||
from pathlib import Path
|
||||
@@ -46,12 +47,26 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
else:
|
||||
os.unlink(p)
|
||||
|
||||
def wait_for_upload(self):
|
||||
|
||||
# *** test helpers ***
|
||||
|
||||
@staticmethod
|
||||
def _wait_for_upload():
|
||||
now = time.time()
|
||||
while time.time() - now < 5:
|
||||
if athenad.upload_queue.qsize() == 0:
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def _create_file(file: str, parent: Optional[str] = None) -> str:
|
||||
fn = os.path.join(athenad.ROOT if parent is None else parent, file)
|
||||
os.makedirs(os.path.dirname(fn), exist_ok=True)
|
||||
Path(fn).touch()
|
||||
return fn
|
||||
|
||||
|
||||
# *** test cases ***
|
||||
|
||||
def test_echo(self):
|
||||
assert dispatcher["echo"]("bob") == "bob"
|
||||
|
||||
@@ -85,9 +100,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
|
||||
files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
|
||||
for file in files:
|
||||
fn = os.path.join(athenad.ROOT, file)
|
||||
os.makedirs(os.path.dirname(fn), exist_ok=True)
|
||||
Path(fn).touch()
|
||||
self._create_file(file)
|
||||
|
||||
resp = dispatcher["listDataDirectory"]()
|
||||
self.assertTrue(resp, 'list empty!')
|
||||
@@ -121,16 +134,14 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
self.assertCountEqual(resp, expected)
|
||||
|
||||
def test_strip_bz2_extension(self):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
if fn.endswith('.bz2'):
|
||||
self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4])
|
||||
|
||||
|
||||
@with_http_server
|
||||
def test_do_upload(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
|
||||
with self.assertRaises(requests.exceptions.ConnectionError):
|
||||
@@ -142,8 +153,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
@with_http_server
|
||||
def test_uploadFileToUrl(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
|
||||
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
|
||||
self.assertEqual(resp['enqueued'], 1)
|
||||
@@ -154,8 +164,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
@with_http_server
|
||||
def test_uploadFileToUrl_duplicate(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
self._create_file('qlog.bz2')
|
||||
|
||||
url1 = f"{host}/qlog.bz2?sig=sig1"
|
||||
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {})
|
||||
@@ -172,8 +181,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
@with_http_server
|
||||
def test_upload_handler(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
@@ -182,7 +190,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
try:
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
# TODO: verify that upload actually succeeded
|
||||
@@ -195,8 +203,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
def test_upload_handler_retry(self, host, mock_put):
|
||||
for status, retry in ((500, True), (412, False)):
|
||||
mock_put.return_value.status_code = status
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
@@ -205,7 +212,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
try:
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
|
||||
@@ -217,8 +224,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
def test_upload_handler_timeout(self):
|
||||
"""When an upload times out or fails to connect it should be placed back in the queue"""
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
|
||||
|
||||
@@ -228,14 +234,14 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
try:
|
||||
athenad.upload_queue.put_nowait(item_no_retry)
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
# Check that upload with retry count exceeded is not put back
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 0)
|
||||
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
# Check that upload item was put back in the queue with incremented retry count
|
||||
@@ -256,7 +262,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
|
||||
thread.start()
|
||||
try:
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 0)
|
||||
@@ -269,8 +275,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
ts = int(t_future.strftime("%s")) * 1000
|
||||
|
||||
# Item that would time out if actually uploaded
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
|
||||
|
||||
|
||||
@@ -279,7 +284,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
thread.start()
|
||||
try:
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 0)
|
||||
@@ -292,8 +297,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
@with_http_server
|
||||
def test_listUploadQueueCurrent(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
fn = self._create_file('qlog.bz2')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
@@ -302,7 +306,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
|
||||
try:
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
self.wait_for_upload()
|
||||
self._wait_for_upload()
|
||||
|
||||
items = dispatcher["listUploadQueue"]()
|
||||
self.assertEqual(len(items), 1)
|
||||
@@ -405,9 +409,9 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
def test_get_logs_to_send_sorted(self):
|
||||
fl = list()
|
||||
for i in range(10):
|
||||
fn = os.path.join(swaglog.SWAGLOG_DIR, f'swaglog.{i:010}')
|
||||
Path(fn).touch()
|
||||
fl.append(os.path.basename(fn))
|
||||
file = f'swaglog.{i:010}'
|
||||
self._create_file(file, athenad.SWAGLOG_DIR)
|
||||
fl.append(file)
|
||||
|
||||
# ensure the list is all logs except most recent
|
||||
sl = athenad.get_logs_to_send_sorted()
|
||||
|
||||
Reference in New Issue
Block a user