diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 317b69f8b8..7207229d89 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -19,8 +19,9 @@ jobs: docs: name: build docs runs-on: ubuntu-latest - timeout-minutes: 1 steps: + - uses: commaai/timeout@v1 + - uses: actions/checkout@v4 with: submodules: true diff --git a/.github/workflows/setup/action.yaml b/.github/workflows/setup/action.yaml index 935db34154..818060c3b0 100644 --- a/.github/workflows/setup/action.yaml +++ b/.github/workflows/setup/action.yaml @@ -20,11 +20,9 @@ runs: name: No retries! run: | if [ "${{ github.run_attempt }}" -gt 1 ]; then - echo -e "\033[31m" - echo "##################################################" - echo " Retries not allowed! Fix the flaky test! " - echo "##################################################" - echo -e "\033[0m" + echo -e "\033[0;31m##################################################" + echo -e "\033[0;31m Retries not allowed! Fix the flaky test! " + echo -e "\033[0;31m##################################################\033[0m" exit 1 fi diff --git a/SConstruct b/SConstruct index 8aa46b9359..7cc672cef0 100644 --- a/SConstruct +++ b/SConstruct @@ -349,7 +349,7 @@ Export('common', 'gpucommon') env_swaglog = env.Clone() env_swaglog['CXXFLAGS'].append('-DSWAGLOG="\\"common/swaglog.h\\""') SConscript(['msgq_repo/SConscript'], exports={'env': env_swaglog}) -SConscript(['opendbc/can/SConscript'], exports={'env': env_swaglog}) +SConscript(['opendbc_repo/SConscript'], exports={'env': env_swaglog}) SConscript(['cereal/SConscript']) diff --git a/opendbc_repo b/opendbc_repo index 7185479f94..f9d7203779 160000 --- a/opendbc_repo +++ b/opendbc_repo @@ -1 +1 @@ -Subproject commit 7185479f94aa2269bfa4827444c49b1be2da467b +Subproject commit f9d72037792da2b0f0fc8fc8ecca0ee82ad81f8d diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index b50955b2a9..ee60c47021 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -394,7 +394,7 @@ class TestCarModelBase(unittest.TestCase): for msg in filter(lambda m: m.src in range(64), can.can): to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) ret = self.safety.safety_rx_hook(to_send) - self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}") + self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") # Skip first frame so CS_prev is properly initialized if idx == 0: diff --git a/selfdrive/locationd/locationd.py b/selfdrive/locationd/locationd.py index 7f5541b8c2..f7780834b0 100755 --- a/selfdrive/locationd/locationd.py +++ b/selfdrive/locationd/locationd.py @@ -8,6 +8,7 @@ from enum import Enum from collections import defaultdict from cereal import log, messaging +from cereal.services import SERVICE_LIST from openpilot.common.transformations.orientation import rot_from_euler from openpilot.common.realtime import config_realtime_process from openpilot.common.params import Params @@ -23,8 +24,10 @@ MIN_STD_SANITY_CHECK = 1e-5 # m or rad MAX_FILTER_REWIND_TIME = 0.8 # s MAX_SENSOR_TIME_DIFF = 0.1 # s YAWRATE_CROSS_ERR_CHECK_FACTOR = 30 -INPUT_INVALID_THRESHOLD = 0.5 -INPUT_INVALID_DECAY = 0.9993 # ~10 secs to resume after a bad input +INPUT_INVALID_THRESHOLD = 0.5 # 0 bad inputs ignored +TIMING_INVALID_THRESHOLD = 2.5 # 2 bad timings ignored +INPUT_INVALID_DECAY = 0.9993 # ~10 secs to resume after exceeding allowed bad inputs by one (at 100hz) +TIMING_INVALID_DECAY = 0.9990 # ~2 secs to resume after exceeding allowed bad timings by one (at 100hz) POSENET_STD_INITIAL_VALUE = 10.0 POSENET_STD_HIST_HALF = 20 @@ -265,10 +268,13 @@ def main(): estimator = LocationEstimator(DEBUG) filter_initialized = False - critcal_services = ["accelerometer", "gyroscope", "liveCalibration", "cameraOdometry"] - observation_timing_invalid = False + critcal_services = ["accelerometer", "gyroscope", "cameraOdometry"] + observation_timing_invalid = defaultdict(int) observation_input_invalid = defaultdict(int) + input_invalid_decay = {s: INPUT_INVALID_DECAY ** (100. / SERVICE_LIST[s].frequency) for s in critcal_services} + timing_invalid_decay = {s: TIMING_INVALID_DECAY ** (100. / SERVICE_LIST[s].frequency) for s in critcal_services} + initial_pose = params.get("LocationFilterInitialState") if initial_pose is not None: initial_pose = json.loads(initial_pose) @@ -282,8 +288,6 @@ def main(): acc_msgs, gyro_msgs = (messaging.drain_sock(sock) for sock in sensor_sockets) if filter_initialized: - observation_timing_invalid = False - msgs = [] for msg in acc_msgs + gyro_msgs: t, valid, which, data = msg.logMonoTime, msg.valid, msg.which(), getattr(msg, msg.which()) @@ -298,18 +302,23 @@ def main(): if valid: t = log_mono_time * 1e-9 res = estimator.handle_log(t, which, msg) + if which not in critcal_services: + continue + if res == HandleLogResult.TIMING_INVALID: - observation_timing_invalid = True + observation_timing_invalid[which] += 1 elif res == HandleLogResult.INPUT_INVALID: observation_input_invalid[which] += 1 else: - observation_input_invalid[which] *= INPUT_INVALID_DECAY + observation_input_invalid[which] *= input_invalid_decay[which] + observation_timing_invalid[which] *= timing_invalid_decay[which] else: filter_initialized = sm.all_checks() and sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, SIMULATION) if sm.updated["cameraOdometry"]: critical_service_inputs_valid = all(observation_input_invalid[s] < INPUT_INVALID_THRESHOLD for s in critcal_services) - inputs_valid = sm.all_valid() and critical_service_inputs_valid and not observation_timing_invalid + critical_service_timing_valid = all(observation_timing_invalid[s] < TIMING_INVALID_THRESHOLD for s in critcal_services) + inputs_valid = sm.all_valid() and critical_service_inputs_valid and critical_service_timing_valid sensors_valid = sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, SIMULATION) msg = estimator.get_msg(sensors_valid, inputs_valid, filter_initialized) diff --git a/selfdrive/locationd/test/test_locationd_scenarios.py b/selfdrive/locationd/test/test_locationd_scenarios.py index 166d715c34..e0ff96a362 100644 --- a/selfdrive/locationd/test/test_locationd_scenarios.py +++ b/selfdrive/locationd/test/test_locationd_scenarios.py @@ -17,6 +17,7 @@ SELECT_COMPARE_FIELDS = { 'sensors_flag': ['sensorsOK'], } JUNK_IDX = 100 +CONSISTENT_SPIKES_COUNT = 10 class Scenario(Enum): @@ -25,6 +26,8 @@ class Scenario(Enum): GYRO_SPIKE_MIDWAY = 'gyro_spike_midway' ACCEL_OFF = 'accel_off' ACCEL_SPIKE_MIDWAY = 'accel_spike_midway' + SENSOR_TIMING_SPIKE_MIDWAY = 'timing_spikes' + SENSOR_TIMING_CONSISTENT_SPIKES = 'timing_consistent_spikes' def get_select_fields_data(logs): @@ -43,6 +46,17 @@ def get_select_fields_data(logs): return data +def modify_logs_midway(logs, which, count, fn): + non_which = [x for x in logs if x.which() != which] + which = [x for x in logs if x.which() == which] + temps = which[len(which) // 2:len(which) // 2 + count] + for i, temp in enumerate(temps): + temp = temp.as_builder() + fn(temp) + which[len(which) // 2 + i] = temp.as_reader() + return sorted(non_which + which, key=lambda x: x.logMonoTime) + + def run_scenarios(scenario, logs): if scenario == Scenario.BASE: pass @@ -51,23 +65,23 @@ def run_scenarios(scenario, logs): logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime) elif scenario == Scenario.GYRO_SPIKE_MIDWAY: - non_gyro = [x for x in logs if x.which() not in 'gyroscope'] - gyro = [x for x in logs if x.which() in 'gyroscope'] - temp = gyro[len(gyro) // 2].as_builder() - temp.gyroscope.gyroUncalibrated.v[0] += 3.0 - gyro[len(gyro) // 2] = temp.as_reader() - logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime) + def gyro_spike(msg): + msg.gyroscope.gyroUncalibrated.v[0] += 3.0 + logs = modify_logs_midway(logs, 'gyroscope', 1, gyro_spike) elif scenario == Scenario.ACCEL_OFF: logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime) elif scenario == Scenario.ACCEL_SPIKE_MIDWAY: - non_accel = [x for x in logs if x.which() not in 'accelerometer'] - accel = [x for x in logs if x.which() in 'accelerometer'] - temp = accel[len(accel) // 2].as_builder() - temp.accelerometer.acceleration.v[0] += 10.0 - accel[len(accel) // 2] = temp.as_reader() - logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime) + def acc_spike(msg): + msg.accelerometer.acceleration.v[0] += 10.0 + logs = modify_logs_midway(logs, 'accelerometer', 1, acc_spike) + + elif scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY or scenario == Scenario.SENSOR_TIMING_CONSISTENT_SPIKES: + def timing_spike(msg): + msg.accelerometer.timestamp -= int(0.150 * 1e9) + count = 1 if scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT + logs = modify_logs_midway(logs, 'accelerometer', count, timing_spike) replayed_logs = replay_process_with_name(name='locationd', lr=logs) return get_select_fields_data(logs), get_select_fields_data(replayed_logs) @@ -122,7 +136,7 @@ class TestLocationdScenarios: assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35)) assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55)) assert np.diff(replayed_data['inputs_flag'])[499] == -1.0 - assert np.diff(replayed_data['inputs_flag'])[696] == 1.0 + assert np.diff(replayed_data['inputs_flag'])[704] == 1.0 def test_accel_off(self): """ @@ -146,3 +160,21 @@ class TestLocationdScenarios: orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs) assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35)) assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55)) + + def test_single_timing_spike(self): + """ + Test: timing of 150ms off for the single accelerometer message in the middle of the segment + Expected Result: the message is ignored, and inputsOK is False for that time + """ + orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_SPIKE_MIDWAY, self.logs) + assert np.all(replayed_data['inputs_flag'] == orig_data['inputs_flag']) + assert np.all(replayed_data['sensors_flag'] == orig_data['sensors_flag']) + + def test_consistent_timing_spikes(self): + """ + Test: consistent timing spikes for N accelerometer messages in the middle of the segment + Expected Result: inputsOK becomes False after N of bad measurements + """ + orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_CONSISTENT_SPIKES, self.logs) + assert np.diff(replayed_data['inputs_flag'])[500] == -1.0 + assert np.diff(replayed_data['inputs_flag'])[787] == 1.0 diff --git a/selfdrive/modeld/models/supercombo.onnx b/selfdrive/modeld/models/supercombo.onnx index 2a0ddef57b..06b7875362 100644 --- a/selfdrive/modeld/models/supercombo.onnx +++ b/selfdrive/modeld/models/supercombo.onnx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c829d824ebc73d15da82516592c07d9784369ccbf710698e919e06a702e70924 -size 50320138 +oid sha256:663f58026cdf0b5c8e079a8a1591c8e2b5fa7e5c0f29a882011a17c405af10f4 +size 50320584 diff --git a/selfdrive/pandad/spi.cc b/selfdrive/pandad/spi.cc index d365dd3620..108b11b9dc 100644 --- a/selfdrive/pandad/spi.cc +++ b/selfdrive/pandad/spi.cc @@ -239,7 +239,7 @@ int PandaSpiHandle::spi_transfer_retry(uint8_t endpoint, uint8_t *tx_data, uint1 // due to full TX buffers nack_count += 1; if (nack_count > 3) { - SPILOG(LOGE, "NACK sleep %d", nack_count); + SPILOG(LOGD, "NACK sleep %d", nack_count); usleep(std::clamp(nack_count*10, 200, 2000)); } } diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b5a8f5127f..9c9acf2c13 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -801,6 +801,8 @@ def main(exit_event: threading.Event = None): cur_upload_items.clear() handle_long_poll(ws, exit_event) + + ws.close() except (KeyboardInterrupt, SystemExit): break except (ConnectionError, TimeoutError, WebSocketException): diff --git a/system/camerad/sensors/os04c10.cc b/system/camerad/sensors/os04c10.cc index e15b4470e2..5074b08f2c 100644 --- a/system/camerad/sensors/os04c10.cc +++ b/system/camerad/sensors/os04c10.cc @@ -50,7 +50,7 @@ OS04C10::OS04C10() { dc_gain_on_grey = 0.9; dc_gain_off_grey = 1.0; exposure_time_min = 2; - exposure_time_max = 2352; + exposure_time_max = 1684; analog_gain_min_idx = 0x0; analog_gain_rec_idx = 0x0; // 1x analog_gain_max_idx = 0x28; diff --git a/system/camerad/sensors/os04c10_registers.h b/system/camerad/sensors/os04c10_registers.h index b8844379f9..8b1c78c69d 100644 --- a/system/camerad/sensors/os04c10_registers.h +++ b/system/camerad/sensors/os04c10_registers.h @@ -278,8 +278,8 @@ const struct i2c_random_wr_payload init_array_os04c10[] = { {0x3816, 0x03}, {0x3817, 0x01}, - {0x380c, 0x08}, {0x380d, 0x5c}, // HTS - {0x380e, 0x09}, {0x380f, 0x38}, // VTS + {0x380c, 0x0b}, {0x380d, 0xac}, // HTS + {0x380e, 0x06}, {0x380f, 0x9c}, // VTS {0x3820, 0xb3}, {0x3821, 0x01}, @@ -307,7 +307,6 @@ const struct i2c_random_wr_payload init_array_os04c10[] = { // {0x0100, 0x01}, // {0x320d, 0x00}, // {0x3208, 0xa0}, - {0x3822, 0x14}, // initialize exposure {0x3503, 0x88}, diff --git a/system/hardware/tici/agnos.json b/system/hardware/tici/agnos.json index 025202019e..3aa712a6b0 100644 --- a/system/hardware/tici/agnos.json +++ b/system/hardware/tici/agnos.json @@ -1,19 +1,19 @@ [ { "name": "boot", - "url": "https://commadist.azureedge.net/agnosupdate-staging/boot-974d920c6b631708bff58a1080c9e48c78213a72e570ae2eeb7a4029e578b85c.img.xz", - "hash": "974d920c6b631708bff58a1080c9e48c78213a72e570ae2eeb7a4029e578b85c", - "hash_raw": "974d920c6b631708bff58a1080c9e48c78213a72e570ae2eeb7a4029e578b85c", - "size": 16422912, + "url": "https://commadist.azureedge.net/agnosupdate-staging/boot-184b9edb429167dcc97110134cdeffaa9739a758b3069e3ea7700e6559b79a0a.img.xz", + "hash": "184b9edb429167dcc97110134cdeffaa9739a758b3069e3ea7700e6559b79a0a", + "hash_raw": "184b9edb429167dcc97110134cdeffaa9739a758b3069e3ea7700e6559b79a0a", + "size": 16414720, "sparse": false, "full_check": true, "has_ab": true }, { "name": "system", - "url": "https://commadist.azureedge.net/agnosupdate-staging/system-bd5bc0257f8a60ab7239f37d7cbae20860f354805c7fee151310fe3805308cfc.img.xz", - "hash": "bd5bc0257f8a60ab7239f37d7cbae20860f354805c7fee151310fe3805308cfc", - "hash_raw": "bd5bc0257f8a60ab7239f37d7cbae20860f354805c7fee151310fe3805308cfc", + "url": "https://commadist.azureedge.net/agnosupdate-staging/system-93a86656670d6d8d99ea401bd5735cd1060c2355d65f2c14de522c77a80c57ea.img.xz", + "hash": "93a86656670d6d8d99ea401bd5735cd1060c2355d65f2c14de522c77a80c57ea", + "hash_raw": "93a86656670d6d8d99ea401bd5735cd1060c2355d65f2c14de522c77a80c57ea", "size": 4404019200, "sparse": false, "full_check": false, diff --git a/system/hardware/tici/hardware.py b/system/hardware/tici/hardware.py index 21e2c8fe1a..2692fc128a 100644 --- a/system/hardware/tici/hardware.py +++ b/system/hardware/tici/hardware.py @@ -452,7 +452,24 @@ class Tici(HardwareBase): manufacturer = None cmds = [] - if manufacturer == 'Cavli Inc.': + + if self.get_device_type() in ("tici", "tizi"): + # clear out old blue prime initial APN + os.system('mmcli -m any --3gpp-set-initial-eps-bearer-settings="apn="') + + cmds += [ + # configure modem as data-centric + 'AT+QNVW=5280,0,"0102000000000000"', + 'AT+QNVFW="/nv/item_files/ims/IMS_enable",00', + 'AT+QNVFW="/nv/item_files/modem/mmode/ue_usage_setting",01', + ] + if self.get_device_type() == "tizi": + # SIM hot swap, not routed on tici + cmds += [ + 'AT+QSIMDET=1,0', + 'AT+QSIMSTAT=1', + ] + elif manufacturer == 'Cavli Inc.': cmds += [ 'AT^SIMSWAP=1', # use SIM slot, instead of internal eSIM 'AT$QCSIMSLEEP=0', # disable SIM sleep @@ -462,22 +479,16 @@ class Tici(HardwareBase): 'AT$QCPCFG=usbNet,0', 'AT$QCNETDEVCTL=3,1', ] - elif self.get_device_type() in ("tici", "tizi"): + else: cmds += [ - # configure modem as data-centric - 'AT+QNVW=5280,0,"0102000000000000"', - 'AT+QNVFW="/nv/item_files/ims/IMS_enable",00', - 'AT+QNVFW="/nv/item_files/modem/mmode/ue_usage_setting",01', - ] - if self.get_device_type() == "tizi": - cmds += [ - # SIM hot swap - 'AT+QSIMDET=1,0', - 'AT+QSIMSTAT=1', - ] + # SIM sleep disable + 'AT$QCSIMSLEEP=0', + 'AT$QCSIMCFG=SimPowerSave,0', + + # ethernet config + 'AT$QCPCFG=usbNet,1', + ] - # clear out old blue prime initial APN - os.system('mmcli -m any --3gpp-set-initial-eps-bearer-settings="apn="') for cmd in cmds: try: modem.Command(cmd, math.ceil(TIMEOUT), dbus_interface=MM_MODEM, timeout=TIMEOUT) diff --git a/system/loggerd/bootlog.cc b/system/loggerd/bootlog.cc index b8257b6d69..85eeb369a1 100644 --- a/system/loggerd/bootlog.cc +++ b/system/loggerd/bootlog.cc @@ -27,7 +27,7 @@ static kj::Array build_boot_log() { // Gather output of commands std::vector bootlog_commands = { - "[ -x \"$(command -v journalctl)\" ] && journalctl", + "[ -x \"$(command -v journalctl)\" ] && journalctl -o short-monotonic", }; if (Hardware::TICI()) { diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 965d74bef8..0a21712096 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -24,7 +24,11 @@ NetworkType = log.DeviceState.NetworkType UPLOAD_ATTR_NAME = 'user.upload' UPLOAD_ATTR_VALUE = b'1' -UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB +MAX_UPLOAD_SIZES = { + "qlog": 25*1e6, # can't be too restrictive here since we use qlogs to find + # bugs, including ones that can cause massive log sizes + "qcam": 5*1e6, +} LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) @@ -170,7 +174,7 @@ class Uploader: if sz == 0: # tag files of 0 size as uploaded success = True - elif name in self.immediate_priority and sz > UPLOAD_QLOG_QCAM_MAX_SIZE: + elif name in MAX_UPLOAD_SIZES and sz > MAX_UPLOAD_SIZES[name]: cloudlog.event("uploader_too_large", key=key, fn=fn, sz=sz) success = True else: diff --git a/system/ubloxd/pigeond.py b/system/ubloxd/pigeond.py index 8809689dcd..2b34ec2ea8 100755 --- a/system/ubloxd/pigeond.py +++ b/system/ubloxd/pigeond.py @@ -9,6 +9,7 @@ import urllib.parse from datetime import datetime, UTC from cereal import messaging +from openpilot.common.time import system_time_valid from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.system.hardware import TICI @@ -196,8 +197,8 @@ def initialize_pigeon(pigeon: TTYPigeon) -> bool: cloudlog.error(f"failed to restore almanac backup, status: {restore_status}") # sending time to ublox - t_now = datetime.now(UTC).replace(tzinfo=None) - if t_now >= datetime(2021, 6, 1): + if system_time_valid(): + t_now = datetime.now(UTC).replace(tzinfo=None) cloudlog.warning("Sending current time to ublox") # UBX-MGA-INI-TIME_UTC diff --git a/system/ugpsd.py b/system/ugpsd.py deleted file mode 100755 index 34b20b01c8..0000000000 --- a/system/ugpsd.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python3 -import os -import time -import traceback -import serial -import datetime -import numpy as np -from collections import defaultdict - -from cereal import log -import cereal.messaging as messaging -from openpilot.common.retry import retry -from openpilot.common.swaglog import cloudlog -from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem - - -def sfloat(n: str): - return float(n) if len(n) > 0 else 0 - -def checksum(s: str): - ret = 0 - for c in s[1:-3]: - ret ^= ord(c) - return format(ret, '02X') - -class Unicore: - def __init__(self): - self.s = serial.Serial('/dev/ttyHS0', 115200) - self.s.timeout = 1 - self.s.writeTimeout = 1 - self.s.newline = b'\r\n' - - self.s.flush() - self.s.reset_input_buffer() - self.s.reset_output_buffer() - self.s.read(2048) - - def send(self, cmd): - self.s.write(cmd.encode('utf8') + b'\r') - resp = self.s.read(2048) - print(len(resp), cmd, "\n", resp) - assert b"OK" in resp - - def recv(self): - return self.s.readline() - -def build_msg(state): - """ - NMEA sentences: - https://campar.in.tum.de/twiki/pub/Chair/NaviGpsDemon/nmea.html#RMC - NAV messages: - https://www.unicorecomm.com/assets/upload/file/UFirebird_Standard_Positioning_Products_Protocol_Specification_CH.pdf - """ - - msg = messaging.new_message('gpsLocation', valid=True) - gps = msg.gpsLocation - - gnrmc = state['$GNRMC'] - gps.hasFix = gnrmc[1] == 'A' - gps.source = log.GpsLocationData.SensorSource.unicore - gps.latitude = (sfloat(gnrmc[3][:2]) + (sfloat(gnrmc[3][2:]) / 60)) * (1 if gnrmc[4] == "N" else -1) - gps.longitude = (sfloat(gnrmc[5][:3]) + (sfloat(gnrmc[5][3:]) / 60)) * (1 if gnrmc[6] == "E" else -1) - - try: - date = gnrmc[9][:6] - dt = datetime.datetime.strptime(f"{date} {gnrmc[1]}", '%d%m%y %H%M%S.%f') - gps.unixTimestampMillis = dt.timestamp()*1e3 - except Exception: - pass - - gps.bearingDeg = sfloat(gnrmc[8]) - - if len(state['$GNGGA']): - gngga = state['$GNGGA'] - if gngga[10] == 'M': - gps.altitude = sfloat(gngga[9]) - - if len(state['$GNGSA']): - gngsa = state['$GNGSA'] - gps.horizontalAccuracy = sfloat(gngsa[4]) - gps.verticalAccuracy = sfloat(gngsa[5]) - - #if len(state['$NAVACC']): - # # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A - # navacc = state['$NAVACC'] - # gps.horizontalAccuracy = sfloat(navacc[3]) - # gps.speedAccuracy = sfloat(navacc[4]) - # gps.bearingAccuracyDeg = sfloat(navacc[5]) - - if len(state['$NAVVEL']): - # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A - navvel = state['$NAVVEL'] - vECEF = [ - sfloat(navvel[4]), - sfloat(navvel[5]), - sfloat(navvel[6]), - ] - - lat = np.radians(gps.latitude) - lon = np.radians(gps.longitude) - R = np.array([ - [-np.sin(lat) * np.cos(lon), -np.sin(lon), -np.cos(lat) * np.cos(lon)], - [-np.sin(lat) * np.sin(lon), np.cos(lon), -np.cos(lat) * np.sin(lon)], - [np.cos(lat), 0, -np.sin(lat)] - ]) - - vNED = [float(x) for x in R.dot(vECEF)] - gps.vNED = vNED - gps.speed = np.linalg.norm(vNED) - - # TODO: set these from the module - gps.bearingAccuracyDeg = 5. - gps.speedAccuracy = 3. - - return msg - - -@retry(attempts=10, delay=0.1) -def setup(u): - at_cmd('AT+CGPS=0') - at_cmd('AT+CGPS=1') - time.sleep(1.0) - - # setup NAVXXX outputs - for i in range(4): - u.send(f"$CFGMSG,1,{i},1") - for i in (1, 3): - u.send(f"$CFGMSG,3,{i},1") - - # 10Hz NAV outputs - u.send("$CFGNAV,100,100,1000") - - -def main(): - wait_for_modem("AT+CGPS?") - - u = Unicore() - setup(u) - - state = defaultdict(list) - pm = messaging.PubMaster(['gpsLocation']) - while True: - try: - msg = u.recv().decode('utf8').strip() - if "DEBUG" in os.environ: - print(repr(msg)) - - if len(msg) > 0: - if checksum(msg) != msg.split('*')[1]: - cloudlog.error(f"invalid checksum: {repr(msg)}") - continue - - k = msg.split(',')[0] - state[k] = msg.split(',') - if '$GNRMC' not in msg: - continue - - pm.send('gpsLocation', build_msg(state)) - except Exception: - traceback.print_exc() - cloudlog.exception("gps.issue") - - -if __name__ == "__main__": - main() diff --git a/tools/cabana/SConscript b/tools/cabana/SConscript index 74dc4d9f50..1cacaba4a2 100644 --- a/tools/cabana/SConscript +++ b/tools/cabana/SConscript @@ -39,4 +39,4 @@ output_json_file = 'tools/cabana/dbc/car_fingerprint_to_dbc.json' generate_dbc = cabana_env.Command('#' + output_json_file, ['dbc/generate_dbc_json.py'], "python3 tools/cabana/dbc/generate_dbc_json.py --out " + output_json_file) -cabana_env.Depends(generate_dbc, ["#common", "#selfdrive/pandad", '#opendbc', "#cereal", "#msgq_repo", Glob("#opendbc/*.dbc")]) +cabana_env.Depends(generate_dbc, ["#common", "#selfdrive/pandad", '#opendbc_repo', "#cereal", "#msgq_repo"]) diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 9bf80deb98..08acba9dd8 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -126,9 +126,8 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) const { return it != last_msgs.end() ? it->second : empty_data; } -// it is thread safe to update data in updateLastMsgsTo. -// updateLastMsgsTo is always called in UI thread. void AbstractStream::updateLastMsgsTo(double sec) { + std::lock_guard lk(mutex_); current_sec_ = sec; uint64_t last_ts = toMonoTime(sec); std::unordered_map msgs; @@ -160,7 +159,10 @@ void AbstractStream::updateLastMsgsTo(double sec) { std::any_of(messages_.cbegin(), messages_.cend(), [this](const auto &m) { return !last_msgs.count(m.first); }); last_msgs = messages_; + mutex_.unlock(); + emit msgsReceived(nullptr, id_changed); + resumeStream(); } const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) { diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index 7ae119bcf0..5ecf086c95 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -108,7 +108,7 @@ protected: void mergeEvents(const std::vector &events); const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c); void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size); - + virtual void resumeStream() {} std::vector all_events_; double current_sec_ = 0; std::optional> time_range_; diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index bd48d97d52..bfe5ca74da 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -23,13 +23,10 @@ ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent) { }); } -static bool event_filter(const Event *e, void *opaque) { - return ((ReplayStream *)opaque)->eventFilter(e); -} - void ReplayStream::mergeSegments() { - for (auto &[n, seg] : replay->segments()) { - if (seg && seg->isLoaded() && !processed_segments.count(n)) { + auto event_data = replay->getEventData(); + for (const auto &[n, seg] : event_data->segments) { + if (!processed_segments.count(n)) { processed_segments.insert(n); std::vector new_events; @@ -50,16 +47,16 @@ void ReplayStream::mergeSegments() { bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { replay.reset(new Replay(route.toStdString(), {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"}, - {}, nullptr, replay_flags, data_dir.toStdString(), this)); + {}, nullptr, replay_flags, data_dir.toStdString())); replay->setSegmentCacheLimit(settings.max_cached_minutes); - replay->installEventFilter(event_filter, this); + replay->installEventFilter([this](const Event *event) { return eventFilter(event); }); // Forward replay callbacks to corresponding Qt signals. + replay->onSeeking = [this](double sec) { emit seeking(sec); }; + replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); }; replay->onQLogLoaded = [this](std::shared_ptr qlog) { emit qLogLoaded(qlog); }; + replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); }; - QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking); - QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); - QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments); bool success = replay->load(); if (!success) { if (replay->lastRouteError() == RouteLoadError::Unauthorized) { diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 1d1cdaec9e..2d7f335193 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -22,7 +22,7 @@ public: bool eventFilter(const Event *event); void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); } bool liveStreaming() const override { return false; } - inline QString routeName() const override { return QString::fromStdString(replay->route()->name()); } + inline QString routeName() const override { return QString::fromStdString(replay->route().name()); } inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } double minSeconds() const override { return replay->minSeconds(); } double maxSeconds() const { return replay->maxSeconds(); } @@ -32,6 +32,7 @@ public: inline float getSpeed() const { return replay->getSpeed(); } inline Replay *getReplay() const { return replay.get(); } inline bool isPaused() const override { return replay->isPaused(); } + void resumeStream() override { return replay->resumeStream(); } void pause(bool pause) override; signals: diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index 3dddc0fb64..0b2beb1dd6 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -247,8 +247,9 @@ void Slider::paintEvent(QPaintEvent *ev) { QColor empty_color = palette().color(QPalette::Window); empty_color.setAlpha(160); - for (const auto &[n, seg] : replay->segments()) { - if (!(seg && seg->isLoaded())) + const auto event_data = replay->getEventData(); + for (const auto &[n, _] : replay->route().segments()) { + if (!event_data->isSegmentLoaded(n)) fillRange(n * 60.0, (n + 1) * 60.0, empty_color); } } @@ -341,7 +342,9 @@ void StreamCameraView::drawThumbnail(QPainter &p) { p.drawPixmap(x, y, thumb); p.setPen(QPen(palette().color(QPalette::BrightText), 2)); - p.drawText(x, y, thumb.width(), thumb.height() - THUMBNAIL_MARGIN, Qt::AlignHCenter | Qt::AlignBottom, QString::number(seconds)); + p.setFont(QFont(font().family(), 10)); + p.drawText(x, y, thumb.width(), thumb.height() - THUMBNAIL_MARGIN, + Qt::AlignHCenter | Qt::AlignBottom, QString::number(seconds, 'f', 3)); } } diff --git a/tools/lib/filereader.py b/tools/lib/filereader.py index 8206ad2228..6773d5a599 100644 --- a/tools/lib/filereader.py +++ b/tools/lib/filereader.py @@ -5,12 +5,15 @@ from urllib.parse import urlparse from openpilot.tools.lib.url_file import URLFile DATA_ENDPOINT = os.getenv("DATA_ENDPOINT", "http://data-raw.comma.internal/") +LOCAL_IPS = ["10.", "192.168.", *[f"172.{i}" for i in range(16, 32)]] def internal_source_available(url=DATA_ENDPOINT): try: hostname = urlparse(url).hostname port = urlparse(url).port or 80 + if not socket.gethostbyname(hostname).startswith(LOCAL_IPS): + return False with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as s: s.settimeout(0.5) s.connect((hostname, port)) diff --git a/tools/longitudinal_maneuvers/generate_report.py b/tools/longitudinal_maneuvers/generate_report.py index aed972f606..e5972495b7 100755 --- a/tools/longitudinal_maneuvers/generate_report.py +++ b/tools/longitudinal_maneuvers/generate_report.py @@ -5,6 +5,7 @@ import io import os import math import pprint +import webbrowser from collections import defaultdict from pathlib import Path import matplotlib.pyplot as plt @@ -143,7 +144,8 @@ def report(platform, route, _description, CP, ID, maneuvers): with open(output_fn, "w") as f: f.write(''.join(builder)) - print(f"\nReport written to {output_fn}\n") + print(f"\nOpening report: {output_fn}\n") + webbrowser.open_new_tab(str(output_fn)) if __name__ == '__main__': diff --git a/tools/plotjuggler/layouts/controls_mismatch_debug.xml b/tools/plotjuggler/layouts/controls_mismatch_debug.xml index 7f9def379c..646e12a281 100644 --- a/tools/plotjuggler/layouts/controls_mismatch_debug.xml +++ b/tools/plotjuggler/layouts/controls_mismatch_debug.xml @@ -8,7 +8,7 @@ - + diff --git a/tools/replay/SConscript b/tools/replay/SConscript index 4a907849cb..18849407cf 100644 --- a/tools/replay/SConscript +++ b/tools/replay/SConscript @@ -1,8 +1,10 @@ -Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal') +Import('env', 'arch', 'common', 'messaging', 'visionipc', 'cereal') -base_frameworks = qt_env['FRAMEWORKS'] -base_libs = [common, messaging, cereal, visionipc, - 'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"] +replay_env = env.Clone() +replay_env['CCFLAGS'] += ['-Wno-deprecated-declarations'] + +base_frameworks = [] +base_libs = [common, messaging, cereal, visionipc, 'm', 'ssl', 'crypto', 'pthread'] if arch == "Darwin": base_frameworks.append('OpenCL') @@ -10,11 +12,11 @@ else: base_libs.append('OpenCL') replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", - "route.cc", "util.cc", "timeline.cc", "api.cc"] -replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) + "route.cc", "util.cc", "seg_mgr.cc", "timeline.cc", "api.cc"] +replay_lib = replay_env.Library("replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) Export('replay_lib') replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs -qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) +replay_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) if GetOption('extras'): - qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs]) + replay_env.Program('tests/test_replay', ['tests/test_replay.cc'], LIBS=replay_libs) diff --git a/tools/replay/consoleui.cc b/tools/replay/consoleui.cc index b5415ac808..503902622c 100644 --- a/tools/replay/consoleui.cc +++ b/tools/replay/consoleui.cc @@ -6,8 +6,6 @@ #include #include -#include - #include "common/ratekeeper.h" #include "common/util.h" #include "common/version.h" @@ -57,6 +55,8 @@ void add_str(WINDOW *w, const char *str, Color color = Color::Default, bool bold if (color != Color::Default) wattroff(w, COLOR_PAIR(color)); } +ExitHandler do_exit; + } // namespace ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "liveParameters"}) { @@ -95,6 +95,8 @@ ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "livePara } ConsoleUI::~ConsoleUI() { + installDownloadProgressHandler(nullptr); + installMessageHandler(nullptr); endwin(); } @@ -233,7 +235,7 @@ void ConsoleUI::updateProgressBar() { void ConsoleUI::updateSummary() { const auto &route = replay->route(); - mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route->name().c_str(), route->segments().size()); + mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route.name().c_str(), route.segments().size()); mvwprintw(w[Win::Stats], 1, 0, "Car Fingerprint: %s", replay->carFingerprint().c_str()); wrefresh(w[Win::Stats]); } @@ -349,7 +351,8 @@ void ConsoleUI::handleKey(char c) { int ConsoleUI::exec() { RateKeeper rk("Replay", 20); - while (true) { + + while (!do_exit) { int c = getch(); if (c == 'q' || c == 'Q') { break; @@ -373,7 +376,6 @@ int ConsoleUI::exec() { logs.clear(); } - qApp->processEvents(); rk.keepTime(); } return 0; diff --git a/tools/replay/main.cc b/tools/replay/main.cc index b880e99e23..31493d1486 100644 --- a/tools/replay/main.cc +++ b/tools/replay/main.cc @@ -1,6 +1,5 @@ #include -#include #include #include #include @@ -126,7 +125,6 @@ int main(int argc, char *argv[]) { util::set_file_descriptor_limit(1024); #endif - QCoreApplication app(argc, argv); ReplayConfig config; if (!parseArgs(argc, argv, config)) { @@ -138,18 +136,18 @@ int main(int argc, char *argv[]) { op_prefix = std::make_unique(config.prefix); } - Replay *replay = new Replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir, &app); + Replay replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir); if (config.cache_segments > 0) { - replay->setSegmentCacheLimit(config.cache_segments); + replay.setSegmentCacheLimit(config.cache_segments); } if (config.playback_speed > 0) { - replay->setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back())); + replay.setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back())); } - if (!replay->load()) { + if (!replay.load()) { return 1; } - ConsoleUI console_ui(replay); - replay->start(config.start_seconds); + ConsoleUI console_ui(&replay); + replay.start(config.start_seconds); return console_ui.exec(); } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 82e231937c..2bd3614530 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -4,7 +4,6 @@ #include #include "cereal/services.h" #include "common/params.h" -#include "common/timing.h" #include "tools/replay/util.h" static void interrupt_sleep_handler(int signal) {} @@ -12,145 +11,124 @@ static void interrupt_sleep_handler(int signal) {} // Helper function to notify events with safety checks template void notifyEvent(Callback &callback, Args &&...args) { - if (callback) { - callback(std::forward(args)...); - } + if (callback) callback(std::forward(args)...); } -Replay::Replay(const std::string &route, std::vector allow, std::vector block, SubMaster *sm_, - uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { - // Register signal handler for SIGUSR1 +Replay::Replay(const std::string &route, std::vector allow, std::vector block, + SubMaster *sm, uint32_t flags, const std::string &data_dir) + : sm_(sm), flags_(flags), seg_mgr_(std::make_unique(route, flags, data_dir)) { std::signal(SIGUSR1, interrupt_sleep_handler); if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) { block.insert(block.end(), {"uiDebug", "userFlag"}); } + setupServices(allow, block); + setupSegmentManager(!allow.empty() || !block.empty()); +} +void Replay::setupServices(const std::vector &allow, const std::vector &block) { auto event_schema = capnp::Schema::from().asStruct(); - sockets_.resize(event_schema.getUnionFields().size()); - std::vector active_services; + sockets_.resize(event_schema.getUnionFields().size(), nullptr); + std::vector active_services; for (const auto &[name, _] : services) { - bool in_block = std::find(block.begin(), block.end(), name) != block.end(); - bool in_allow = std::find(allow.begin(), allow.end(), name) != allow.end(); - if (!in_block && (allow.empty() || in_allow)) { + bool is_blocked = std::find(block.begin(), block.end(), name) != block.end(); + bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end(); + if (is_allowed && !is_blocked) { uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue(); sockets_[which] = name.c_str(); - active_services.push_back(name); + active_services.push_back(name.c_str()); } } - - if (!allow.empty()) { - for (int i = 0; i < sockets_.size(); ++i) { - filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]); - } - } - rInfo("active services: %s", join(active_services, ", ").c_str()); - rInfo("loading route %s", route.c_str()); - - if (sm == nullptr) { - std::vector socket_names; - std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(socket_names), - [](const char *name) { return name != nullptr; }); - pm = std::make_unique(socket_names); + if (!sm_) { + pm_ = std::make_unique(active_services); + } +} + +void Replay::setupSegmentManager(bool has_filters) { + seg_mgr_->setCallback([this]() { handleSegmentMerge(); }); + + if (has_filters) { + std::vector filters(sockets_.size(), false); + for (size_t i = 0; i < sockets_.size(); ++i) { + filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]); + } + seg_mgr_->setFilters(filters); } - route_ = std::make_unique(route, data_dir); } Replay::~Replay() { - stop(); -} - -void Replay::stop() { - exit_ = true; - if (stream_thread_ != nullptr) { + seg_mgr_.reset(); + if (stream_thread_.joinable()) { rInfo("shutdown: in progress..."); - pauseStreamThread(); - stream_cv_.notify_one(); - stream_thread_->quit(); - stream_thread_->wait(); - stream_thread_->deleteLater(); - stream_thread_ = nullptr; + interruptStream([this]() { + exit_ = true; + return false; + }); + stream_thread_.join(); rInfo("shutdown: done"); } - camera_server_.reset(nullptr); - segments_.clear(); + camera_server_.reset(); } bool Replay::load() { - if (!route_->load()) { - rError("failed to load route %s from %s", route_->name().c_str(), - route_->dir().empty() ? "server" : route_->dir().c_str()); - return false; - } + rInfo("loading route %s", seg_mgr_->route_.name().c_str()); + if (!seg_mgr_->load()) return false; - for (auto &[n, f] : route_->segments()) { - bool has_log = !f.rlog.empty() || !f.qlog.empty(); - bool has_video = !f.road_cam.empty() || !f.qcamera.empty(); - if (has_log && (has_video || hasFlag(REPLAY_FLAG_NO_VIPC))) { - segments_.insert({n, nullptr}); - } - } - if (segments_.empty()) { - rInfo("no valid segments in route: %s", route_->name().c_str()); - return false; - } - rInfo("load route %s with %zu valid segments", route_->name().c_str(), segments_.size()); - max_seconds_ = (segments_.rbegin()->first + 1) * 60; + min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60; + max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60; return true; } -void Replay::start(int seconds) { - seekTo(route_->identifier().begin_segment * 60 + seconds, false); -} - -void Replay::updateEvents(const std::function &update_events_function) { - pauseStreamThread(); +void Replay::interruptStream(const std::function &update_fn) { + if (stream_thread_.joinable() && stream_thread_id) { + pthread_kill(stream_thread_id, SIGUSR1); // Interrupt sleep in stream thread + } { - std::unique_lock lk(stream_lock_); - events_ready_ = update_events_function(); - paused_ = user_paused_; + interrupt_requested_ = true; + std::unique_lock lock(stream_lock_); + events_ready_ = update_fn(); + interrupt_requested_ = user_paused_; } stream_cv_.notify_one(); } void Replay::seekTo(double seconds, bool relative) { - updateEvents([&]() { - double target_time = relative ? seconds + currentSeconds() : seconds; - target_time = std::max(double(0.0), target_time); - int target_segment = (int)target_time / 60; - if (segments_.count(target_segment) == 0) { - rWarning("Can't seek to %.2f s segment %d is invalid", target_time, target_segment); - return true; - } - if (target_time > max_seconds_) { - rWarning("Can't seek to %.2f s, time is invalid", target_time); - return true; - } + double target_time = relative ? seconds + currentSeconds() : seconds; + target_time = std::max(0.0, target_time); + int target_segment = target_time / 60; + if (!seg_mgr_->hasSegment(target_segment)) { + rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment); + return; + } - rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); + rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); + notifyEvent(onSeeking, target_time); + + double seeked_to_sec = -1; + interruptStream([&]() { current_segment_ = target_segment; cur_mono_time_ = route_start_ts_ + target_time * 1e9; seeking_to_ = target_time; + + if (event_data_->isSegmentLoaded(target_segment)) { + seeked_to_sec = *seeking_to_; + seeking_to_.reset(); + } return false; }); - checkSeekProgress(); - updateSegmentsCache(); + checkSeekProgress(seeked_to_sec); + seg_mgr_->setCurrentSegment(target_segment); } -void Replay::checkSeekProgress() { - if (seeking_to_) { - auto it = segments_.find(int(*seeking_to_ / 60)); - if (it != segments_.end() && it->second && it->second->isLoaded()) { - emit seekedTo(*seeking_to_); - seeking_to_ = std::nullopt; - // wake up stream thread - updateEvents([]() { return true; }); +void Replay::checkSeekProgress(double seeked_to_sec) { + if (seeked_to_sec >= 0) { + if (onSeekedTo) { + onSeekedTo(seeked_to_sec); } else { - // Emit signal indicating the ongoing seek operation - emit seeking(*seeking_to_); + interruptStream([]() { return true; }); } } } @@ -163,125 +141,45 @@ void Replay::seekToFlag(FindFlag flag) { void Replay::pause(bool pause) { if (user_paused_ != pause) { - pauseStreamThread(); - { - std::unique_lock lk(stream_lock_); + interruptStream([=]() { rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds()); - paused_ = user_paused_ = pause; - } - stream_cv_.notify_one(); - } -} - -void Replay::pauseStreamThread() { - paused_ = true; - // Send SIGUSR1 to interrupt clock_nanosleep - if (stream_thread_ && stream_thread_id) { - pthread_kill(stream_thread_id, SIGUSR1); - } -} - -void Replay::segmentLoadFinished(int seg_num, bool success) { - if (!success) { - rWarning("failed to load segment %d, removing it from current replay list", seg_num); - updateEvents([&]() { - segments_.erase(seg_num); - return !segments_.empty(); + user_paused_ = pause; + return !pause; }); } - QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); } -void Replay::updateSegmentsCache() { - auto cur = segments_.lower_bound(current_segment_.load()); - if (cur == segments_.end()) return; +void Replay::handleSegmentMerge() { + if (exit_) return; - // Calculate the range of segments to load - auto begin = std::prev(cur, std::min(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); - auto end = std::next(begin, std::min(segment_cache_limit, std::distance(begin, segments_.end()))); - begin = std::prev(end, std::min(segment_cache_limit, std::distance(segments_.begin(), end))); + double seeked_to_sec = -1; + interruptStream([&]() { + event_data_ = seg_mgr_->getEventData(); + notifyEvent(onSegmentsMerged); - loadSegmentInRange(begin, cur, end); - mergeSegments(begin, end); - - // free segments out of current semgnt window. - std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); }); - std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); }); - - // start stream thread - const auto &cur_segment = cur->second; - if (stream_thread_ == nullptr && cur_segment->isLoaded()) { - startStream(cur_segment.get()); - } -} - -void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { - auto loadNextSegment = [this](auto first, auto last) { - auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); - if (it != last && !it->second) { - rDebug("loading segment %d...", it->first); - it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_, - [this](int seg_num, bool success) { - segmentLoadFinished(seg_num, success); - }); - return true; + bool segment_loaded = event_data_->isSegmentLoaded(current_segment_); + if (seeking_to_ && segment_loaded) { + seeked_to_sec = *seeking_to_; + seeking_to_.reset(); + return false; } - return false; - }; - - // Try loading forward segments, then reverse segments - if (!loadNextSegment(cur, end)) { - loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin)); - } -} - -void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { - std::set segments_to_merge; - size_t new_events_size = 0; - for (auto it = begin; it != end; ++it) { - if (it->second && it->second->isLoaded()) { - segments_to_merge.insert(it->first); - new_events_size += it->second->log->events.size(); - } - } - - if (segments_to_merge == merged_segments_) return; - - rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{}, - [](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str()); - - std::vector new_events; - new_events.reserve(new_events_size); - - // Merge events from segments_to_merge into new_events - for (int n : segments_to_merge) { - size_t size = new_events.size(); - const auto &events = segments_.at(n)->log->events; - std::copy_if(events.begin(), events.end(), std::back_inserter(new_events), - [this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; }); - std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end()); - } - - if (stream_thread_) { - emit segmentsMerged(); - } - - updateEvents([&]() { - events_.swap(new_events); - merged_segments_ = segments_to_merge; - // Wake up the stream thread if the current segment is loaded or invalid. - return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0)); + return segment_loaded; }); - checkSeekProgress(); + + checkSeekProgress(seeked_to_sec); + if (!stream_thread_.joinable() && !event_data_->events.empty()) { + startStream(); + } } -void Replay::startStream(const Segment *cur_segment) { +void Replay::startStream() { + const auto &cur_segment = event_data_->segments.begin()->second; const auto &events = cur_segment->log->events; route_start_ts_ = events.front().mono_time; cur_mono_time_ += route_start_ts_ - 1; // get datetime from INIT_DATA, fallback to datetime in the route name - route_date_time_ = route()->datetime(); + route_date_time_ = route().datetime(); auto it = std::find_if(events.cbegin(), events.cend(), [](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; }); if (it != events.cend()) { @@ -299,6 +197,7 @@ void Replay::startStream(const Segment *cur_segment) { capnp::FlatArrayMessageReader reader(it->data); auto event = reader.getRoot(); car_fingerprint_ = event.getCarParams().getCarFingerprint(); + capnp::MallocMessageBuilder builder; builder.setRoot(event.getCarParams()); auto words = capnp::messageToFlatArray(builder); @@ -320,26 +219,18 @@ void Replay::startStream(const Segment *cur_segment) { camera_server_ = std::make_unique(camera_size); } - emit segmentsMerged(); + timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE), + [this](std::shared_ptr log) { notifyEvent(onQLogLoaded, log); }); - timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE), - [this](std::shared_ptr log) { - notifyEvent(onQLogLoaded, log); - }); - // start stream thread - stream_thread_ = new QThread(); - QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); }); - stream_thread_->start(); - - emit streamStarted(); + stream_thread_ = std::thread(&Replay::streamThread, this); } void Replay::publishMessage(const Event *e) { - if (event_filter && event_filter(e, filter_opaque)) return; + if (event_filter_ && event_filter_(e)) return; - if (sm == nullptr) { + if (!sm_) { auto bytes = e->data.asBytes(); - int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); + int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); if (ret == -1) { rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); sockets_[e->which] = nullptr; @@ -347,7 +238,7 @@ void Replay::publishMessage(const Event *e) { } else { capnp::FlatArrayMessageReader reader(e->data); auto event = reader.getRoot(); - sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}}); + sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}}); } } @@ -363,9 +254,9 @@ void Replay::publishFrame(const Event *e) { if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM))) return; // Camera isdisabled - if (isSegmentMerged(e->eidx_segnum)) { - auto &segment = segments_.at(e->eidx_segnum); - if (auto &frame = segment->frames[cam]; frame) { + auto seg_it = event_data_->segments.find(e->eidx_segnum); + if (seg_it != event_data_->segments.end()) { + if (auto &frame = seg_it->second->frames[cam]; frame) { camera_server_->pushFrame(cam, frame.get(), e); } } @@ -377,32 +268,33 @@ void Replay::streamThread() { std::unique_lock lk(stream_lock_); while (true) { - stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); }); + stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); }); if (exit_) break; - Event event(cur_which, cur_mono_time_, {}); - auto first = std::upper_bound(events_.cbegin(), events_.cend(), event); - if (first == events_.cend()) { + const auto &events = event_data_->events; + auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {})); + if (first == events.cend()) { rInfo("waiting for events..."); events_ready_ = false; continue; } - auto it = publishEvents(first, events_.cend()); + auto it = publishEvents(first, events.cend()); // Ensure frames are sent before unlocking to prevent race conditions if (camera_server_) { camera_server_->waitForSent(); } - if (it != events_.cend()) { + if (it != events.cend()) { cur_which = it->which; } else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) { - // Check for loop end and restart if necessary - int last_segment = segments_.rbegin()->first; - if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { + int last_segment = seg_mgr_->route_.segments().rbegin()->first; + if (event_data_->isSegmentLoaded(last_segment)) { rInfo("reaches the end of route, restart from beginning"); - QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, minSeconds(), false), Qt::QueuedConnection); + stream_lock_.unlock(); + seekTo(minSeconds(), false); + stream_lock_.lock(); } } } @@ -414,16 +306,16 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con uint64_t loop_start_ts = nanos_since_boot(); double prev_replay_speed = speed_; - for (; !paused_ && first != last; ++first) { + for (; !interrupt_requested_ && first != last; ++first) { const Event &evt = *first; int segment = toSeconds(evt.mono_time) / 60; if (current_segment_ != segment) { current_segment_ = segment; - QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); + seg_mgr_->setCurrentSegment(current_segment_); } - // Skip events if socket is not present + // Skip events if socket is not present if (!sockets_[evt.which]) continue; cur_mono_time_ = evt.mono_time; @@ -438,10 +330,10 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con loop_start_ts = current_nanos; prev_replay_speed = speed_; } else if (time_diff > 0) { - precise_nano_sleep(time_diff, paused_); + precise_nano_sleep(time_diff, interrupt_requested_); } - if (paused_) break; + if (interrupt_requested_) break; if (evt.eidx_segnum == -1) { publishMessage(&evt); diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 0b4906532e..d549eaefc4 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -1,25 +1,19 @@ #pragma once #include -#include +#include #include +#include #include -#include #include #include -#include - -#include #include "tools/replay/camera.h" -#include "tools/replay/route.h" +#include "tools/replay/seg_mgr.h" #include "tools/replay/timeline.h" #define DEMO_ROUTE "a2a0ccea32023010|2023-07-27--13-01-19" -// one segment uses about 100M of memory -constexpr int MIN_SEGMENTS_CACHE = 5; - enum REPLAY_FLAGS { REPLAY_FLAG_NONE = 0x0000, REPLAY_FLAG_DCAM = 0x0002, @@ -32,111 +26,85 @@ enum REPLAY_FLAGS { REPLAY_FLAG_ALL_SERVICES = 0x0800, }; -typedef bool (*replayEventFilter)(const Event *, void *); -typedef std::map> SegmentMap; - -class Replay : public QObject { - Q_OBJECT - +class Replay { public: Replay(const std::string &route, std::vector allow, std::vector block, SubMaster *sm = nullptr, - uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = "", QObject *parent = 0); + uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = ""); ~Replay(); bool load(); - RouteLoadError lastRouteError() const { return route_->lastError(); } - void start(int seconds = 0); - void stop(); + RouteLoadError lastRouteError() const { return route().lastError(); } + void start(int seconds = 0) { seekTo(min_seconds_ + seconds, false); } void pause(bool pause); void seekToFlag(FindFlag flag); void seekTo(double seconds, bool relative); inline bool isPaused() const { return user_paused_; } - // the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming. - // the filter function must return true if the event should be filtered. - // otherwise it must return false. - inline void installEventFilter(replayEventFilter filter, void *opaque) { - filter_opaque = opaque; - event_filter = filter; - } - inline int segmentCacheLimit() const { return segment_cache_limit; } - inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(MIN_SEGMENTS_CACHE, n); } + inline int segmentCacheLimit() const { return seg_mgr_->segment_cache_limit_; } + inline void setSegmentCacheLimit(int n) { seg_mgr_->segment_cache_limit_ = std::max(MIN_SEGMENTS_CACHE, n); } inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; } void setLoop(bool loop) { loop ? flags_ &= ~REPLAY_FLAG_NO_LOOP : flags_ |= REPLAY_FLAG_NO_LOOP; } bool loop() const { return !(flags_ & REPLAY_FLAG_NO_LOOP); } - inline const Route* route() const { return route_.get(); } + const Route &route() const { return seg_mgr_->route_; } inline double currentSeconds() const { return double(cur_mono_time_ - route_start_ts_) / 1e9; } inline std::time_t routeDateTime() const { return route_date_time_; } inline uint64_t routeStartNanos() const { return route_start_ts_; } inline double toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; } - inline double minSeconds() const { return !segments_.empty() ? segments_.begin()->first * 60 : 0; } + inline double minSeconds() const { return min_seconds_; } inline double maxSeconds() const { return max_seconds_; } inline void setSpeed(float speed) { speed_ = speed; } inline float getSpeed() const { return speed_; } - inline const SegmentMap &segments() const { return segments_; } inline const std::string &carFingerprint() const { return car_fingerprint_; } - inline const std::shared_ptr> getTimeline() const { return timeline_.get(); } + inline const std::shared_ptr> getTimeline() const { return timeline_.getEntries(); } inline const std::optional findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); } + const std::shared_ptr getEventData() const { return event_data_; } + void installEventFilter(std::function filter) { event_filter_ = filter; } + void resumeStream() { interruptStream([]() { return true; }); } // Event callback functions + std::function onSegmentsMerged = nullptr; + std::function onSeeking = nullptr; + std::function onSeekedTo = nullptr; std::function)> onQLogLoaded = nullptr; - -signals: - void streamStarted(); - void segmentsMerged(); - void seeking(double sec); - void seekedTo(double sec); - void minMaxTimeChanged(double min_sec, double max_sec); - -protected: - std::optional find(FindFlag flag); - void pauseStreamThread(); - void startStream(const Segment *cur_segment); +private: + void setupServices(const std::vector &allow, const std::vector &block); + void setupSegmentManager(bool has_filters); + void startStream(); void streamThread(); - void updateSegmentsCache(); - void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); - void segmentLoadFinished(int seg_num, bool success); - void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); - void updateEvents(const std::function& update_events_function); + void handleSegmentMerge(); + void interruptStream(const std::function& update_fn); std::vector::const_iterator publishEvents(std::vector::const_iterator first, std::vector::const_iterator last); void publishMessage(const Event *e); void publishFrame(const Event *e); - void checkSeekProgress(); - inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; } + void checkSeekProgress(double seeked_to_sec); + std::unique_ptr seg_mgr_; Timeline timeline_; pthread_t stream_thread_id = 0; - QThread *stream_thread_ = nullptr; + std::thread stream_thread_; std::mutex stream_lock_; bool user_paused_ = false; std::condition_variable stream_cv_; - std::atomic current_segment_ = 0; + int current_segment_ = 0; std::optional seeking_to_; - SegmentMap segments_; - // the following variables must be protected with stream_lock_ std::atomic exit_ = false; - std::atomic paused_ = false; + std::atomic interrupt_requested_ = false; bool events_ready_ = false; std::time_t route_date_time_; uint64_t route_start_ts_ = 0; std::atomic cur_mono_time_ = 0; - std::atomic max_seconds_ = 0; - std::vector events_; - std::set merged_segments_; - - // messaging - SubMaster *sm = nullptr; - std::unique_ptr pm; + double min_seconds_ = 0; + double max_seconds_ = 0; + SubMaster *sm_ = nullptr; + std::unique_ptr pm_; std::vector sockets_; - std::vector filters_; - std::unique_ptr route_; std::unique_ptr camera_server_; std::atomic flags_ = REPLAY_FLAG_NONE; std::string car_fingerprint_; std::atomic speed_ = 1.0; - replayEventFilter event_filter = nullptr; - void *filter_opaque = nullptr; - int segment_cache_limit = MIN_SEGMENTS_CACHE; + std::function event_filter_ = nullptr; + + std::shared_ptr event_data_ = std::make_shared(); }; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index 9306b9fb07..7731d0daf4 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -159,7 +159,7 @@ void Route::addFileToSegment(int n, const std::string &file) { Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, std::function callback) - : seg_num(n), flags(flags), filters_(filters), onLoadFinished_(callback) { + : seg_num(n), flags(flags), filters_(filters), on_load_finished_(callback) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const std::array file_list = { (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.empty() ? files.qcamera : files.road_cam, @@ -178,7 +178,7 @@ Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vec Segment::~Segment() { { std::lock_guard lock(mutex_); - onLoadFinished_ = nullptr; // Prevent callback after destruction + on_load_finished_ = nullptr; // Prevent callback after destruction } abort_ = true; for (auto &thread : threads_) { @@ -204,8 +204,14 @@ void Segment::loadFile(int id, const std::string file) { if (--loading_ == 0) { std::lock_guard lock(mutex_); - if (onLoadFinished_) { - onLoadFinished_(seg_num, !abort_); + load_state_ = !abort_ ? LoadState::Loaded : LoadState::Failed; + if (on_load_finished_) { + on_load_finished_(seg_num, !abort_); } } } + +Segment::LoadState Segment::getState() { + std::scoped_lock lock(mutex_); + return load_state_; +} diff --git a/tools/replay/route.h b/tools/replay/route.h index c2c7af6bc7..1806be5afa 100644 --- a/tools/replay/route.h +++ b/tools/replay/route.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -64,10 +65,12 @@ protected: class Segment { public: + enum class LoadState {Loading, Loaded, Failed}; + Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, std::function callback); ~Segment(); - inline bool isLoaded() const { return !loading_ && !abort_; } + LoadState getState(); const int seg_num = 0; std::unique_ptr log; @@ -80,7 +83,8 @@ protected: std::atomic loading_ = 0; std::mutex mutex_; std::vector threads_; - std::function onLoadFinished_ = nullptr; + std::function on_load_finished_ = nullptr; uint32_t flags; std::vector filters_; + LoadState load_state_ = LoadState::Loading; }; diff --git a/tools/replay/seg_mgr.cc b/tools/replay/seg_mgr.cc new file mode 100644 index 0000000000..954b25e874 --- /dev/null +++ b/tools/replay/seg_mgr.cc @@ -0,0 +1,135 @@ +#include "tools/replay/seg_mgr.h" + +#include + +SegmentManager::~SegmentManager() { + { + std::unique_lock lock(mutex_); + exit_ = true; + onSegmentMergedCallback_ = nullptr; + } + cv_.notify_one(); + if (thread_.joinable()) thread_.join(); +} + +bool SegmentManager::load() { + if (!route_.load()) { + rError("failed to load route: %s", route_.name().c_str()); + return false; + } + + for (const auto &[n, file] : route_.segments()) { + if (!file.rlog.empty() || !file.qlog.empty()) { + segments_.insert({n, nullptr}); + } + } + + if (segments_.empty()) { + rInfo("no valid segments in route: %s", route_.name().c_str()); + return false; + } + + rInfo("loaded route %s with %zu valid segments", route_.name().c_str(), segments_.size()); + thread_ = std::thread(&SegmentManager::manageSegmentCache, this); + return true; +} + +void SegmentManager::setCurrentSegment(int seg_num) { + { + std::unique_lock lock(mutex_); + cur_seg_num_ = seg_num; + needs_update_ = true; + } + cv_.notify_one(); +} + +void SegmentManager::manageSegmentCache() { + while (true) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() { return exit_ || needs_update_; }); + if (exit_) break; + + needs_update_ = false; + auto cur = segments_.lower_bound(cur_seg_num_); + if (cur == segments_.end()) continue; + + // Calculate the range of segments to load + auto begin = std::prev(cur, std::min(segment_cache_limit_ / 2, std::distance(segments_.begin(), cur))); + auto end = std::next(begin, std::min(segment_cache_limit_, std::distance(begin, segments_.end()))); + begin = std::prev(end, std::min(segment_cache_limit_, std::distance(segments_.begin(), end))); + + loadSegmentsInRange(begin, cur, end); + bool merged = mergeSegments(begin, end); + + // Free segments outside the current range + std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); }); + std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); }); + + lock.unlock(); + + if (merged && onSegmentMergedCallback_) { + onSegmentMergedCallback_(); // Notify listener that segments have been merged + } + } +} + +bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { + std::set segments_to_merge; + size_t total_event_count = 0; + for (auto it = begin; it != end; ++it) { + const auto &segment = it->second; + if (segment && segment->getState() == Segment::LoadState::Loaded) { + segments_to_merge.insert(segment->seg_num); + total_event_count += segment->log->events.size(); + } + } + + if (segments_to_merge == merged_segments_) return false; + + auto merged_event_data = std::make_shared(); + auto &merged_events = merged_event_data->events; + merged_events.reserve(total_event_count); + + rDebug("merging segments: %s", join(segments_to_merge, ", ").c_str()); + for (int n : segments_to_merge) { + const auto &events = segments_.at(n)->log->events; + if (events.empty()) continue; + + // Skip INIT_DATA if present + auto events_begin = (events.front().which == cereal::Event::Which::INIT_DATA) ? std::next(events.begin()) : events.begin(); + + size_t previous_size = merged_events.size(); + merged_events.insert(merged_events.end(), events_begin, events.end()); + std::inplace_merge(merged_events.begin(), merged_events.begin() + previous_size, merged_events.end()); + + merged_event_data->segments[n] = segments_.at(n); + } + + event_data_ = merged_event_data; + merged_segments_ = segments_to_merge; + + return true; +} + +void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { + auto tryLoadSegment = [this](auto first, auto last) { + for (auto it = first; it != last; ++it) { + auto &segment_ptr = it->second; + if (!segment_ptr) { + segment_ptr = std::make_shared( + it->first, route_.at(it->first), flags_, filters_, + [this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); }); + } + + if (segment_ptr->getState() == Segment::LoadState::Loading) { + return true; // Segment is still loading + } + } + return false; // No segments need loading + }; + + // Try forward loading, then reverse if necessary + if (!tryLoadSegment(cur, end)) { + tryLoadSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin)); + } +} diff --git a/tools/replay/seg_mgr.h b/tools/replay/seg_mgr.h new file mode 100644 index 0000000000..efb3d7f0ea --- /dev/null +++ b/tools/replay/seg_mgr.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "tools/replay/route.h" + +constexpr int MIN_SEGMENTS_CACHE = 5; + +using SegmentMap = std::map>; + +class SegmentManager { +public: + struct EventData { + std::vector events; // Events extracted from the segments + SegmentMap segments; // Associated segments that contributed to these events + bool isSegmentLoaded(int n) const { return segments.find(n) != segments.end(); } + }; + + SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "") + : flags_(flags), route_(route_name, data_dir) {}; + ~SegmentManager(); + + bool load(); + void setCurrentSegment(int seg_num); + void setCallback(const std::function &callback) { onSegmentMergedCallback_ = callback; } + void setFilters(const std::vector &filters) { filters_ = filters; } + const std::shared_ptr getEventData() const { return event_data_; } + bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); } + + Route route_; + int segment_cache_limit_ = MIN_SEGMENTS_CACHE; + +private: + void manageSegmentCache(); + void loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); + bool mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); + + std::vector filters_; + uint32_t flags_; + + std::mutex mutex_; + std::condition_variable cv_; + std::thread thread_; + std::atomic cur_seg_num_ = -1; + bool needs_update_ = false; + bool exit_ = false; + + SegmentMap segments_; + std::shared_ptr event_data_; + std::function onSegmentMergedCallback_ = nullptr; + std::set merged_segments_; +}; diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index 6b366169bb..aed3de59a8 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -1,27 +1,8 @@ -#include -#include - -#include - +#define CATCH_CONFIG_MAIN #include "catch2/catch.hpp" -#include "common/util.h" #include "tools/replay/replay.h" -#include "tools/replay/util.h" const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; -const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3"; - -const int TEST_REPLAY_SEGMENTS = std::getenv("TEST_REPLAY_SEGMENTS") ? atoi(std::getenv("TEST_REPLAY_SEGMENTS")) : 1; - -bool download_to_file(const std::string &url, const std::string &local_file, int chunk_size = 5 * 1024 * 1024, int retries = 3) { - do { - if (httpDownload(url, local_file, chunk_size)) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } while (--retries >= 0); - return false; -} TEST_CASE("LogReader") { SECTION("corrupt log") { @@ -34,67 +15,3 @@ TEST_CASE("LogReader") { REQUIRE(log.events.size() > 0); } } - -void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { - std::mutex mutex; - std::condition_variable cv; - Segment segment(n, segment_file, flags, {}, [&](int, bool) { - REQUIRE(segment.isLoaded() == true); - REQUIRE(segment.log != nullptr); - REQUIRE(segment.frames[RoadCam] != nullptr); - if (flags & REPLAY_FLAG_DCAM) { - REQUIRE(segment.frames[DriverCam] != nullptr); - } - if (flags & REPLAY_FLAG_ECAM) { - REQUIRE(segment.frames[WideRoadCam] != nullptr); - } - - // test LogReader & FrameReader - REQUIRE(segment.log->events.size() > 0); - REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end())); - - for (auto cam : ALL_CAMERAS) { - auto &fr = segment.frames[cam]; - if (!fr) continue; - - if (cam == RoadCam || cam == WideRoadCam) { - REQUIRE(fr->getFrameCount() == 1200); - } - auto [nv12_width, nv12_height, nv12_buffer_size] = get_nv12_info(fr->width, fr->height); - VisionBuf buf; - buf.allocate(nv12_buffer_size); - buf.init_yuv(fr->width, fr->height, nv12_width, nv12_width * nv12_height); - // sequence get 100 frames - for (int i = 0; i < 100; ++i) { - REQUIRE(fr->get(i, &buf)); - } - } - cv.notify_one(); - }); - - std::unique_lock lock(mutex); - cv.wait(lock); -} - -std::string download_demo_route() { - static std::string data_dir; - - if (data_dir == "") { - char tmp_path[] = "/tmp/root_XXXXXX"; - data_dir = mkdtemp(tmp_path); - - Route remote_route(DEMO_ROUTE); - assert(remote_route.load()); - - // Create a local route from remote for testing - const std::string route_name = std::string(DEMO_ROUTE).substr(17); - for (int i = 0; i < 2; ++i) { - std::string log_path = util::string_format("%s/%s--%d/", data_dir.c_str(), route_name.c_str(), i); - util::create_directories(log_path, 0755); - REQUIRE(download_to_file(remote_route.at(i).rlog, log_path + "rlog.bz2")); - REQUIRE(download_to_file(remote_route.at(i).qcamera, log_path + "qcamera.ts")); - } - } - - return data_dir; -} diff --git a/tools/replay/tests/test_runner.cc b/tools/replay/tests/test_runner.cc deleted file mode 100644 index b20ac86c64..0000000000 --- a/tools/replay/tests/test_runner.cc +++ /dev/null @@ -1,10 +0,0 @@ -#define CATCH_CONFIG_RUNNER -#include "catch2/catch.hpp" -#include - -int main(int argc, char **argv) { - // unit tests for Qt - QCoreApplication app(argc, argv); - const int res = Catch::Session().run(argc, argv); - return (res < 0xff ? res : 0xff); -} diff --git a/tools/replay/timeline.cc b/tools/replay/timeline.cc index a984c20016..d836de972b 100644 --- a/tools/replay/timeline.cc +++ b/tools/replay/timeline.cc @@ -18,7 +18,7 @@ void Timeline::initialize(const Route &route, uint64_t route_start_ts, bool loca } std::optional Timeline::find(double cur_ts, FindFlag flag) const { - for (const auto &entry : *get()) { + for (const auto &entry : *getEntries()) { if (entry.type == TimelineType::Engaged) { if (flag == FindFlag::nextEngagement && entry.start_time > cur_ts) { return entry.start_time; @@ -38,7 +38,7 @@ std::optional Timeline::find(double cur_ts, FindFlag flag) const { } std::optional Timeline::findAlertAtTime(double target_time) const { - for (const auto &entry : *get()) { + for (const auto &entry : *getEntries()) { if (entry.start_time > target_time) break; if (entry.end_time >= target_time && entry.type >= TimelineType::AlertInfo) { return entry; @@ -72,8 +72,9 @@ void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool l } // Sort and finalize the timeline entries - std::sort(staging_entries_.begin(), staging_entries_.end(), [](auto &a, auto &b) { return a.start_time < b.start_time; }); - timeline_entries_ = std::make_shared>(staging_entries_); + auto entries = std::make_shared>(staging_entries_); + std::sort(entries->begin(), entries->end(), [](auto &a, auto &b) { return a.start_time < b.start_time; }); + timeline_entries_ = entries; callback(log); // Notify the callback once the log is processed } diff --git a/tools/replay/timeline.h b/tools/replay/timeline.h index b2535fd8b0..689a80635f 100644 --- a/tools/replay/timeline.h +++ b/tools/replay/timeline.h @@ -27,7 +27,7 @@ public: std::function)> callback); std::optional find(double cur_ts, FindFlag flag) const; std::optional findAlertAtTime(double target_time) const; - const std::shared_ptr> get() const { return timeline_entries_; } + const std::shared_ptr> getEntries() const { return timeline_entries_; } private: void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache, diff --git a/tools/replay/util.cc b/tools/replay/util.cc index fac1e11c47..94cea961ff 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -362,11 +362,11 @@ std::string decompressZST(const std::byte *in, size_t in_size, std::atomic return {}; } -void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit) { +void precise_nano_sleep(int64_t nanoseconds, std::atomic &interrupt_requested) { struct timespec req, rem; req.tv_sec = nanoseconds / 1000000000; req.tv_nsec = nanoseconds % 1000000000; - while (!should_exit) { + while (!interrupt_requested) { #ifdef __APPLE__ int ret = nanosleep(&req, &rem); if (ret == 0 || errno != EINTR) diff --git a/tools/replay/util.h b/tools/replay/util.h index 46df2bc191..1f61951d21 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -47,7 +47,7 @@ private: }; std::string sha256(const std::string &str); -void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit); +void precise_nano_sleep(int64_t nanoseconds, std::atomic &interrupt_requested); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr); std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic *abort = nullptr); std::string decompressZST(const std::string &in, std::atomic *abort = nullptr);