mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-18 16:33:57 +08:00
athena: specify network type for file uploads (#23687)
* athena: specify network type for file uploads
* add comment
* catch abort transfer
* fix tests
* put athena upload args in dict
* fix defaults
Co-authored-by: Joost Wooning <jwooning@gmail.com>
old-commit-hash: e9153fdb4b
This commit is contained in:
@@ -22,6 +22,7 @@ from jsonrpc import JSONRPCResponseManager, dispatcher
|
||||
from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
from cereal.services import service_list
|
||||
from common.api import Api
|
||||
from common.file_helpers import CallbackReader
|
||||
@@ -47,6 +48,8 @@ RETRY_DELAY = 10 # seconds
|
||||
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
|
||||
WS_FRAME_SIZE = 4096
|
||||
|
||||
NetworkType = log.DeviceState.NetworkType
|
||||
|
||||
dispatcher["echo"] = lambda s: s
|
||||
recv_queue: Any = queue.Queue()
|
||||
send_queue: Any = queue.Queue()
|
||||
@@ -54,10 +57,13 @@ upload_queue: Any = queue.Queue()
|
||||
low_priority_send_queue: Any = queue.Queue()
|
||||
log_recv_queue: Any = queue.Queue()
|
||||
cancelled_uploads: Any = set()
|
||||
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0))
|
||||
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress', 'allow_cellular'], defaults=(0, False, 0, False))
|
||||
|
||||
cur_upload_items: Dict[int, Any] = {}
|
||||
|
||||
class AbortTransferException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class UploadQueueCache():
|
||||
params = Params()
|
||||
@@ -129,11 +135,13 @@ def jsonrpc_handler(end_event):
|
||||
send_queue.put_nowait(json.dumps({"error": str(e)}))
|
||||
|
||||
|
||||
def retry_upload(tid: int, end_event: threading.Event) -> None:
|
||||
def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None:
|
||||
if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT:
|
||||
item = cur_upload_items[tid]
|
||||
new_retry_count = item.retry_count + 1 if increase_count else item.retry_count
|
||||
|
||||
item = item._replace(
|
||||
retry_count=item.retry_count + 1,
|
||||
retry_count=new_retry_count,
|
||||
progress=0,
|
||||
current=False
|
||||
)
|
||||
@@ -149,6 +157,7 @@ def retry_upload(tid: int, end_event: threading.Event) -> None:
|
||||
|
||||
|
||||
def upload_handler(end_event: threading.Event) -> None:
|
||||
sm = messaging.SubMaster(['deviceState'])
|
||||
tid = threading.get_ident()
|
||||
|
||||
while not end_event.is_set():
|
||||
@@ -161,8 +170,23 @@ def upload_handler(end_event: threading.Event) -> None:
|
||||
cancelled_uploads.remove(cur_upload_items[tid].id)
|
||||
continue
|
||||
|
||||
# TODO: remove item if too old
|
||||
|
||||
# Check if uploading over cell is allowed
|
||||
sm.update(0)
|
||||
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet]
|
||||
if cell and (not cur_upload_items[tid].allow_cellular):
|
||||
retry_upload(tid, end_event, False)
|
||||
continue
|
||||
|
||||
try:
|
||||
def cb(sz, cur):
|
||||
# Abort transfer if connection changed to cell after starting upload
|
||||
sm.update(0)
|
||||
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet]
|
||||
if cell and (not cur_upload_items[tid].allow_cellular):
|
||||
raise AbortTransferException
|
||||
|
||||
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)
|
||||
|
||||
response = _do_upload(cur_upload_items[tid], cb)
|
||||
@@ -172,8 +196,10 @@ def upload_handler(end_event: threading.Event) -> None:
|
||||
UploadQueueCache.cache(upload_queue)
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
|
||||
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")
|
||||
|
||||
retry_upload(tid, end_event)
|
||||
except AbortTransferException:
|
||||
cloudlog.warning(f"athena.upload_handler.abort {cur_upload_items[tid]}")
|
||||
retry_upload(tid, end_event, False)
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
@@ -274,15 +300,20 @@ def reboot():
|
||||
|
||||
@dispatcher.add_method
|
||||
def uploadFileToUrl(fn, url, headers):
|
||||
return uploadFilesToUrls([[fn, url, headers]])
|
||||
return uploadFilesToUrls([{
|
||||
"fn": fn,
|
||||
"url": url,
|
||||
"headers": headers,
|
||||
}])
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
def uploadFilesToUrls(files_data):
|
||||
items = []
|
||||
failed = []
|
||||
for fn, url, headers in files_data:
|
||||
if len(fn) == 0 or fn[0] == '/' or '..' in fn:
|
||||
for file in files_data:
|
||||
fn = file.get('fn', '')
|
||||
if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file:
|
||||
failed.append(fn)
|
||||
continue
|
||||
path = os.path.join(ROOT, fn)
|
||||
@@ -290,7 +321,14 @@ def uploadFilesToUrls(files_data):
|
||||
failed.append(fn)
|
||||
continue
|
||||
|
||||
item = UploadItem(path=path, url=url, headers=headers, created_at=int(time.time() * 1000), id=None)
|
||||
item = UploadItem(
|
||||
path=path,
|
||||
url=file['url'],
|
||||
headers=file.get('headers', {}),
|
||||
created_at=int(time.time() * 1000),
|
||||
id=None,
|
||||
allow_cellular=file.get('allow_cellular', False),
|
||||
)
|
||||
upload_id = hashlib.sha1(str(item).encode()).hexdigest()
|
||||
item = item._replace(id=upload_id)
|
||||
upload_queue.put_nowait(item)
|
||||
|
||||
@@ -150,7 +150,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
def test_upload_handler(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
|
||||
@@ -173,7 +173,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
mock_put.return_value.status_code = status
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
|
||||
@@ -187,7 +187,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
|
||||
finally:
|
||||
end_event.set()
|
||||
|
||||
|
||||
if retry:
|
||||
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
|
||||
|
||||
@@ -195,7 +195,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
"""When an upload times out or fails to connect it should be placed back in the queue"""
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
|
||||
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
|
||||
|
||||
end_event = threading.Event()
|
||||
@@ -222,7 +222,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
end_event.set()
|
||||
|
||||
def test_cancelUpload(self):
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
dispatcher["cancelUpload"](item.id)
|
||||
|
||||
@@ -248,7 +248,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
def test_listUploadQueueCurrent(self, host):
|
||||
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
|
||||
Path(fn).touch()
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
|
||||
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
|
||||
|
||||
end_event = threading.Event()
|
||||
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
|
||||
@@ -266,7 +266,7 @@ class TestAthenadMethods(unittest.TestCase):
|
||||
end_event.set()
|
||||
|
||||
def test_listUploadQueue(self):
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')
|
||||
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)
|
||||
athenad.upload_queue.put_nowait(item)
|
||||
|
||||
items = dispatcher["listUploadQueue"]()
|
||||
|
||||
Reference in New Issue
Block a user