From 3a637fe455d9d7e5418ffc658155222a798af191 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Mon, 30 Oct 2023 01:08:28 +0800 Subject: [PATCH] athenad: fix memory leak in `_do_upload()` (#30237) * fix memory leak * test: stash * clean up * clean up * ruff * rm * add py memory profiler * test compress and no compress * proper test * comment --------- Co-authored-by: Shane Smiskol old-commit-hash: 61288dfe068e6127dcff64b8e7ac81844f7c31ff --- poetry.lock | 4 ++-- pyproject.toml | 1 + selfdrive/athena/athenad.py | 16 ++++++-------- selfdrive/athena/tests/test_athenad.py | 30 +++++++++++++++++++------- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/poetry.lock b/poetry.lock index 870a8d0ec..04e82c5ed 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:5aa29cd91c8a5614163a289b89e5488731f09d3433a675a87962f86dc1799763 -size 455011 +oid sha256:297b24d59d8e39b20fb1a00745e62c8a5b3a0f126da9028c50830f7d81a38117 +size 455177 diff --git a/pyproject.toml b/pyproject.toml index b87211cc9..c83669898 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,6 +140,7 @@ parameterized = "^0.8" pprofile = "*" pre-commit = "*" pygame = "*" +pympler = "*" pyprof2calltree = "*" pytest = "*" pytest-cov = "*" diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index 70e18bbed..c93b43467 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -20,7 +20,7 @@ from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial from queue import Queue -from typing import BinaryIO, Callable, Dict, List, Optional, Set, Union, cast +from typing import Callable, Dict, List, Optional, Set, Union, cast import requests from jsonrpc import JSONRPCResponseManager, dispatcher @@ -290,19 +290,15 @@ def _do_upload(upload_item: UploadItem, callback: Optional[Callable] = None) -> compress = True with open(path, "rb") as f: - data: BinaryIO + content = f.read() if compress: cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) - compressed = bz2.compress(f.read()) - size = len(compressed) - data = io.BytesIO(compressed) - else: - size = os.fstat(f.fileno()).st_size - data = f + content = bz2.compress(content) + with io.BytesIO(content) as data: return requests.put(upload_item.url, - data=CallbackReader(data, callback, size) if callback else data, - headers={**upload_item.headers, 'Content-Length': str(size)}, + data=CallbackReader(data, callback, len(content)) if callback else data, + headers={**upload_item.headers, 'Content-Length': str(len(content))}, timeout=30) diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 27ccbdccc..e81753a6a 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -9,10 +9,11 @@ import queue import unittest from dataclasses import asdict, replace from datetime import datetime, timedelta +from parameterized import parameterized from typing import Optional from multiprocessing import Process -from pathlib import Path +from pympler.tracker import SummaryTracker from unittest import mock from websocket import ABNF from websocket._exceptions import WebSocketConnectionClosedException @@ -57,10 +58,11 @@ class TestAthenadMethods(unittest.TestCase): break @staticmethod - def _create_file(file: str, parent: Optional[str] = None) -> str: + def _create_file(file: str, parent: Optional[str] = None, data: bytes = b'') -> str: fn = os.path.join(Paths.log_root() if parent is None else parent, file) os.makedirs(os.path.dirname(fn), exist_ok=True) - Path(fn).touch() + with open(fn, 'wb') as f: + f.write(data) return fn @@ -137,19 +139,31 @@ class TestAthenadMethods(unittest.TestCase): if fn.endswith('.bz2'): self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) - + @parameterized.expand([(True,), (False,)]) @with_http_server - def test_do_upload(self, host): - fn = self._create_file('qlog.bz2') + def test_do_upload(self, compress, host): + # random bytes to ensure rather large object post-compression + fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) - item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') + # warm up object tracker + tracker = SummaryTracker() + for _ in range(5): + tracker.diff() + + upload_fn = fn + ('.bz2' if compress else '') + item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with self.assertRaises(requests.exceptions.ConnectionError): athenad._do_upload(item) - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') resp = athenad._do_upload(item) self.assertEqual(resp.status_code, 201) + # assert memory cleaned up + for _type, num_objects, total_size in tracker.diff(): + with self.subTest(_type=_type): + self.assertLess(total_size / 1024, 10, f'Object {_type} ({num_objects=}) grew larger than 10 kB while uploading file') + @with_http_server def test_uploadFileToUrl(self, host): fn = self._create_file('qlog.bz2')