tools: add LRU eviction for log cache (#36959)
* tools: add LRU for log cache * lil more * cleanup: * less syscall * manifest * cleanup * cleanup * lil more * cleanup * lil more * simpler * lil more
This commit is contained in:
@@ -2,11 +2,13 @@ import http.server
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import tempfile
|
||||
import pytest
|
||||
|
||||
from openpilot.selfdrive.test.helpers import http_server_context
|
||||
from openpilot.system.hardware.hw import Paths
|
||||
from openpilot.tools.lib.url_file import URLFile
|
||||
from openpilot.tools.lib.url_file import URLFile, prune_cache
|
||||
import openpilot.tools.lib.url_file as url_file_module
|
||||
|
||||
|
||||
class CachingTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
@@ -128,3 +130,35 @@ class TestFileDownload:
|
||||
CachingTestRequestHandler.FILE_EXISTS = True
|
||||
length = URLFile(file_url).get_length()
|
||||
assert length == 4
|
||||
|
||||
|
||||
class TestCache:
|
||||
def test_prune_cache(self, monkeypatch):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
monkeypatch.setattr(Paths, 'download_cache_root', staticmethod(lambda: tmpdir + "/"))
|
||||
|
||||
# setup test files and manifest
|
||||
manifest_lines = []
|
||||
for i in range(3):
|
||||
fname = f"hash_{i}"
|
||||
with open(tmpdir + "/" + fname, "wb") as f:
|
||||
f.truncate(1000)
|
||||
manifest_lines.append(f"{fname} {1000 + i}")
|
||||
with open(tmpdir + "/manifest.txt", "w") as f:
|
||||
f.write('\n'.join(manifest_lines))
|
||||
|
||||
# under limit, shouldn't prune
|
||||
assert len(os.listdir(tmpdir)) == 4
|
||||
prune_cache()
|
||||
assert len(os.listdir(tmpdir)) == 4
|
||||
|
||||
# set a tiny cache limit to force eviction (1.5 chunks worth)
|
||||
monkeypatch.setattr(url_file_module, 'CACHE_SIZE', url_file_module.CHUNK_SIZE + url_file_module.CHUNK_SIZE // 2)
|
||||
|
||||
# prune_cache should evict oldest files to get under limit
|
||||
prune_cache()
|
||||
remaining = os.listdir(tmpdir)
|
||||
# should have evicted at least one file + manifest
|
||||
assert len(remaining) < 4
|
||||
# newest file should remain
|
||||
assert manifest_lines[2].split()[0] in remaining
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import re
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
from hashlib import sha256
|
||||
import time
|
||||
from hashlib import md5
|
||||
from urllib3 import PoolManager, Retry
|
||||
from urllib3.response import BaseHTTPResponse
|
||||
from urllib3.util import Timeout
|
||||
@@ -14,14 +15,41 @@ from urllib3.exceptions import MaxRetryError
|
||||
# Cache chunk size
|
||||
K = 1000
|
||||
CHUNK_SIZE = 1000 * K
|
||||
CACHE_SIZE = 10 * 1024 * 1024 * 1024 # total cache size in GB
|
||||
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def hash_256(link: str) -> str:
|
||||
return sha256((link.split("?")[0]).encode('utf-8')).hexdigest()
|
||||
def hash_url(link: str) -> str:
|
||||
return md5((link.split("?")[0]).encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def prune_cache(new_entry: str | None = None) -> None:
|
||||
"""Evicts oldest cache files (LRU) until cache is under the size limit."""
|
||||
# we use a manifest to avoid tons of os.stat syscalls (slow)
|
||||
manifest = {}
|
||||
manifest_path = Paths.download_cache_root() + "manifest.txt"
|
||||
if os.path.exists(manifest_path):
|
||||
with open(manifest_path) as f:
|
||||
manifest = {parts[0]: int(parts[1]) for line in f if (parts := line.strip().split()) and len(parts) == 2}
|
||||
|
||||
if new_entry:
|
||||
manifest[new_entry] = int(time.time()) # noqa: TID251
|
||||
|
||||
# evict the least recently used files until under limit
|
||||
sorted_items = sorted(manifest.items(), key=lambda x: x[1])
|
||||
while len(manifest) * CHUNK_SIZE > CACHE_SIZE and sorted_items:
|
||||
key, _ = sorted_items.pop(0)
|
||||
try:
|
||||
os.remove(Paths.download_cache_root() + key)
|
||||
except OSError:
|
||||
pass
|
||||
manifest.pop(key, None)
|
||||
|
||||
# write out manifest
|
||||
with atomic_write(manifest_path, mode="w", overwrite=True) as f:
|
||||
f.write('\n'.join(f"{k} {v}" for k, v in manifest.items()))
|
||||
|
||||
class URLFileException(Exception):
|
||||
pass
|
||||
|
||||
@@ -77,7 +105,7 @@ class URLFile:
|
||||
if self._length is not None:
|
||||
return self._length
|
||||
|
||||
file_length_path = os.path.join(Paths.download_cache_root(), hash_256(self._url) + "_length")
|
||||
file_length_path = os.path.join(Paths.download_cache_root(), hash_url(self._url) + "_length")
|
||||
if not self._force_download and os.path.exists(file_length_path):
|
||||
with open(file_length_path) as file_length:
|
||||
content = file_length.read()
|
||||
@@ -103,7 +131,7 @@ class URLFile:
|
||||
while True:
|
||||
self._pos = position
|
||||
chunk_number = self._pos / CHUNK_SIZE
|
||||
file_name = hash_256(self._url) + "_" + str(chunk_number)
|
||||
file_name = hash_url(self._url) + "_" + str(chunk_number)
|
||||
full_path = os.path.join(Paths.download_cache_root(), str(file_name))
|
||||
data = None
|
||||
# If we don't have a file, download it
|
||||
@@ -111,6 +139,7 @@ class URLFile:
|
||||
data = self.read_aux(ll=CHUNK_SIZE)
|
||||
with atomic_write(full_path, mode="wb", overwrite=True) as new_cached_file:
|
||||
new_cached_file.write(data)
|
||||
prune_cache(file_name)
|
||||
else:
|
||||
with open(full_path, "rb") as cached_file:
|
||||
data = cached_file.read()
|
||||
|
||||
Reference in New Issue
Block a user