logreader.py: concurrent file existence checks (#34875)
* concurrent file existence checks * upper bound on parallel queries --------- Co-authored-by: Shane Smiskol <shane@smiskol.com>
This commit is contained in:
@@ -13,6 +13,7 @@ import warnings
|
||||
import zstandard as zstd
|
||||
|
||||
from collections.abc import Callable, Iterable, Iterator
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from cereal import log as capnp_log
|
||||
@@ -201,9 +202,15 @@ def direct_source(file_or_url: str) -> list[LogPath]:
|
||||
|
||||
|
||||
def get_invalid_files(files):
|
||||
for f in files:
|
||||
if f is None or not file_exists(f):
|
||||
yield f
|
||||
if not files:
|
||||
return
|
||||
|
||||
with ThreadPoolExecutor(max_workers=32) as executor:
|
||||
future_to_file = {executor.submit(file_exists, file): file for file in files}
|
||||
for future in as_completed(future_to_file):
|
||||
file = future_to_file[future]
|
||||
if not future.result():
|
||||
yield file
|
||||
|
||||
|
||||
def check_source(source: Source, *args) -> list[LogPath]:
|
||||
|
||||
Reference in New Issue
Block a user