mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-21 19:23:53 +08:00
* segment range reader * rename that * revert that * cleanup * revert this for now * revert this for now * Fix + test * rm that * rm that * use for auto_fingerprint * simpler * for notebook too * match numpy indexing * just use numpy directly * remove that * spacing * spacing * use qlog for auto fingerprint * add 'read mode' * pass in read mode * add test for modes * numpy indexing * fix that case * more examples * fix the notebook * cleanup the notebook * cleaner * fix those
82 lines
2.3 KiB
Python
82 lines
2.3 KiB
Python
import enum
|
|
import re
|
|
import numpy as np
|
|
from openpilot.selfdrive.test.openpilotci import get_url
|
|
from openpilot.tools.lib.helpers import RE
|
|
from openpilot.tools.lib.logreader import LogReader
|
|
from openpilot.tools.lib.route import Route, SegmentRange
|
|
|
|
class ReadMode(enum.Enum):
|
|
RLOG = 0 # only read rlogs
|
|
QLOG = 1 # only read qlogs
|
|
#AUTO = 2 # default to rlogs, fallback to qlogs, not supported yet
|
|
|
|
|
|
def create_slice_from_string(s: str):
|
|
m = re.fullmatch(RE.SLICE, s)
|
|
assert m is not None, f"Invalid slice: {s}"
|
|
start, end, step = m.groups()
|
|
start = int(start) if start is not None else None
|
|
end = int(end) if end is not None else None
|
|
step = int(step) if step is not None else None
|
|
|
|
if start is not None and ":" not in s and end is None and step is None:
|
|
return start
|
|
return slice(start, end, step)
|
|
|
|
|
|
def parse_slice(sr: SegmentRange):
|
|
route = Route(sr.route_name)
|
|
segs = np.arange(route.max_seg_number+1)
|
|
s = create_slice_from_string(sr._slice)
|
|
return segs[s] if isinstance(s, slice) else [segs[s]]
|
|
|
|
def comma_api_source(sr: SegmentRange, mode=ReadMode.RLOG):
|
|
segs = parse_slice(sr)
|
|
route = Route(sr.route_name)
|
|
|
|
log_paths = route.log_paths() if mode == ReadMode.RLOG else route.qlog_paths()
|
|
|
|
for seg in segs:
|
|
yield LogReader(log_paths[seg])
|
|
|
|
def internal_source(sr: SegmentRange, mode=ReadMode.RLOG):
|
|
segs = parse_slice(sr)
|
|
|
|
for seg in segs:
|
|
yield LogReader(f"cd:/{sr.dongle_id}/{sr.timestamp}/{seg}/{'rlog' if mode == ReadMode.RLOG else 'qlog'}.bz2")
|
|
|
|
def openpilotci_source(sr: SegmentRange, mode=ReadMode.RLOG):
|
|
segs = parse_slice(sr)
|
|
|
|
for seg in segs:
|
|
yield LogReader(get_url(sr.route_name, seg, 'rlog' if mode == ReadMode.RLOG else 'qlog'))
|
|
|
|
def auto_source(sr: SegmentRange, mode=ReadMode.RLOG):
|
|
# Automatically determine viable source
|
|
|
|
try:
|
|
next(internal_source(sr, mode))
|
|
return internal_source(sr, mode)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
next(openpilotci_source(sr, mode))
|
|
return openpilotci_source(sr, mode)
|
|
except Exception:
|
|
pass
|
|
|
|
return comma_api_source(sr, mode)
|
|
|
|
|
|
class SegmentRangeReader:
|
|
def __init__(self, segment_range: str, mode=ReadMode.RLOG, source=auto_source):
|
|
sr = SegmentRange(segment_range)
|
|
self.lrs = source(sr, mode)
|
|
|
|
def __iter__(self):
|
|
for lr in self.lrs:
|
|
for m in lr:
|
|
yield m
|