From b58fddb83eab27504e0ba351d567a91e0d62e7e6 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sun, 28 Dec 2025 12:20:45 -0800 Subject: [PATCH] replay: add --benchmark mode (#36957) --- tools/replay/main.cc | 27 +++++++++++++++ tools/replay/replay.cc | 76 ++++++++++++++++++++++++++++++++++++++--- tools/replay/replay.h | 17 ++++++++- tools/replay/seg_mgr.cc | 6 ++++ tools/replay/seg_mgr.h | 2 ++ 5 files changed, 123 insertions(+), 5 deletions(-) diff --git a/tools/replay/main.cc b/tools/replay/main.cc index 460901219..30f85b170 100644 --- a/tools/replay/main.cc +++ b/tools/replay/main.cc @@ -1,11 +1,13 @@ #include +#include #include #include #include #include #include "common/prefix.h" +#include "common/timing.h" #include "tools/replay/consoleui.h" #include "tools/replay/replay.h" #include "tools/replay/util.h" @@ -31,6 +33,7 @@ Options: --no-hw-decoder Disable HW video decoding --no-vipc Do not output video --all Output all messages including bookmarkButton, uiDebug, userBookmark + --benchmark Run in benchmark mode (process all events then exit with stats) -h, --help Show this help message )"; @@ -66,6 +69,7 @@ bool parseArgs(int argc, char *argv[], ReplayConfig &config) { {"no-hw-decoder", no_argument, nullptr, 0}, {"no-vipc", no_argument, nullptr, 0}, {"all", no_argument, nullptr, 0}, + {"benchmark", no_argument, nullptr, 0}, {"help", no_argument, nullptr, 'h'}, {nullptr, 0, nullptr, 0}, // Terminating entry }; @@ -79,6 +83,7 @@ bool parseArgs(int argc, char *argv[], ReplayConfig &config) { {"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER}, {"no-vipc", REPLAY_FLAG_NO_VIPC}, {"all", REPLAY_FLAG_ALL_SERVICES}, + {"benchmark", REPLAY_FLAG_BENCHMARK}, }; if (argc == 1) { @@ -149,6 +154,28 @@ int main(int argc, char *argv[]) { return 1; } + if (config.flags & REPLAY_FLAG_BENCHMARK) { + replay.start(config.start_seconds); + replay.waitForFinished(); + + const auto &stats = replay.getBenchmarkStats(); + uint64_t process_start = stats.process_start_ts; + + std::cout << "\n===== REPLAY BENCHMARK RESULTS =====\n"; + std::cout << "Route: " << replay.route().name() << "\n\n"; + + std::cout << "TIMELINE:\n"; + std::cout << " t=0 ms process start\n"; + for (const auto &[ts, event] : stats.timeline) { + double ms = (ts - process_start) / 1e6; + std::cout << " t=" << std::fixed << std::setprecision(0) << ms << " ms" + << std::string(std::max(1, 8 - static_cast(std::to_string(static_cast(ms)).length())), ' ') + << event << "\n"; + } + + return 0; + } + ConsoleUI console_ui(&replay); replay.start(config.start_seconds); return console_ui.exec(); diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index cc105dd10..d8d59e41a 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -2,6 +2,8 @@ #include #include +#include +#include #include "cereal/services.h" #include "common/params.h" #include "tools/replay/util.h" @@ -19,6 +21,14 @@ Replay::Replay(const std::string &route, std::vector allow, std::ve : sm_(sm), flags_(flags), seg_mgr_(std::make_unique(route, flags, data_dir, auto_source)) { std::signal(SIGUSR1, interrupt_sleep_handler); + if (flags_ & REPLAY_FLAG_BENCHMARK) { + benchmark_stats_.process_start_ts = nanos_since_boot(); + seg_mgr_->setBenchmarkCallback([this](int seg_num, const std::string& event) { + benchmark_stats_.timeline.emplace_back(nanos_since_boot(), + "segment " + std::to_string(seg_num) + " " + event); + }); + } + if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) { block.insert(block.end(), {"bookmarkButton", "uiDebug", "userBookmark"}); } @@ -78,8 +88,13 @@ Replay::~Replay() { bool Replay::load() { rInfo("loading route %s", seg_mgr_->route_.name().c_str()); + if (!seg_mgr_->load()) return false; + if (hasFlag(REPLAY_FLAG_BENCHMARK)) { + benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "route metadata loaded"); + } + min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60; max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60; return true; @@ -257,8 +272,13 @@ void Replay::streamThread() { stream_thread_id = pthread_self(); std::unique_lock lk(stream_lock_); + int last_processed_segment = -1; + uint64_t segment_start_time = 0; + bool streaming_started = false; + while (true) { stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); }); + if (exit_) break; event_data_ = seg_mgr_->getEventData(); @@ -270,14 +290,19 @@ void Replay::streamThread() { continue; } - auto it = publishEvents(first, events.cend()); + if (!streaming_started && hasFlag(REPLAY_FLAG_BENCHMARK)) { + benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "streaming started"); + streaming_started = true; + } + + auto it = publishEvents(first, events.cend(), last_processed_segment, segment_start_time); // Ensure frames are sent before unlocking to prevent race conditions if (camera_server_) { camera_server_->waitForSent(); } - if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) { + if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP) && !hasFlag(REPLAY_FLAG_BENCHMARK)) { int last_segment = seg_mgr_->route_.segments().rbegin()->first; if (event_data_->isSegmentLoaded(last_segment)) { rInfo("reaches the end of route, restart from beginning"); @@ -285,12 +310,28 @@ void Replay::streamThread() { seekTo(minSeconds(), false); stream_lock_.lock(); } + } else if (it == events.cend() && hasFlag(REPLAY_FLAG_BENCHMARK)) { + // Exit benchmark mode after first segment completes + exit_ = true; + break; } } + + if (hasFlag(REPLAY_FLAG_BENCHMARK)) { + benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "benchmark done"); + + { + std::unique_lock lock(benchmark_lock_); + benchmark_done_ = true; + } + benchmark_cv_.notify_one(); + } } std::vector::const_iterator Replay::publishEvents(std::vector::const_iterator first, - std::vector::const_iterator last) { + std::vector::const_iterator last, + int &last_processed_segment, + uint64_t &segment_start_time) { uint64_t evt_start_ts = cur_mono_time_; uint64_t loop_start_ts = nanos_since_boot(); double prev_replay_speed = speed_; @@ -304,6 +345,23 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con seg_mgr_->setCurrentSegment(segment); } + // Track segment completion for benchmark timeline + if (hasFlag(REPLAY_FLAG_BENCHMARK) && segment != last_processed_segment) { + if (last_processed_segment >= 0 && segment_start_time > 0) { + uint64_t processing_time_ns = nanos_since_boot() - segment_start_time; + double processing_time_ms = processing_time_ns / 1e6; + double realtime_factor = 60.0 / (processing_time_ns / 1e9); // 60s per segment + + std::ostringstream oss; + oss << "segment " << last_processed_segment << " done publishing (" + << std::fixed << std::setprecision(0) << processing_time_ms << " ms, " + << std::fixed << std::setprecision(0) << realtime_factor << "x realtime)"; + benchmark_stats_.timeline.emplace_back(nanos_since_boot(), oss.str()); + } + segment_start_time = nanos_since_boot(); + last_processed_segment = segment; + } + cur_mono_time_ = evt.mono_time; cur_which_ = evt.which; @@ -320,7 +378,8 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con evt_start_ts = evt.mono_time; loop_start_ts = current_nanos; prev_replay_speed = speed_; - } else if (time_diff > 0) { + } else if (time_diff > 0 && !hasFlag(REPLAY_FLAG_BENCHMARK)) { + // Skip sleep in benchmark mode for maximum throughput precise_nano_sleep(time_diff, interrupt_requested_); } @@ -338,3 +397,12 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con return first; } + +void Replay::waitForFinished() { + if (!hasFlag(REPLAY_FLAG_BENCHMARK)) { + return; + } + + std::unique_lock lock(benchmark_lock_); + benchmark_cv_.wait(lock, [this]() { return benchmark_done_; }); +} diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 5e868d242..58c1b71b8 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -24,6 +24,12 @@ enum REPLAY_FLAGS { REPLAY_FLAG_NO_HW_DECODER = 0x0100, REPLAY_FLAG_NO_VIPC = 0x0400, REPLAY_FLAG_ALL_SERVICES = 0x0800, + REPLAY_FLAG_BENCHMARK = 0x1000, +}; + +struct BenchmarkStats { + uint64_t process_start_ts = 0; + std::vector> timeline; }; class Replay { @@ -57,6 +63,8 @@ public: inline const std::optional findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); } const std::shared_ptr getEventData() const { return seg_mgr_->getEventData(); } void installEventFilter(std::function filter) { event_filter_ = filter; } + void waitForFinished(); + const BenchmarkStats &getBenchmarkStats() const { return benchmark_stats_; } // Event callback functions std::function onSegmentsMerged = nullptr; @@ -72,7 +80,9 @@ private: void handleSegmentMerge(); void interruptStream(const std::function& update_fn); std::vector::const_iterator publishEvents(std::vector::const_iterator first, - std::vector::const_iterator last); + std::vector::const_iterator last, + int &last_processed_segment, + uint64_t &segment_start_time); void publishMessage(const Event *e); void publishFrame(const Event *e); void checkSeekProgress(); @@ -107,4 +117,9 @@ private: std::function event_filter_ = nullptr; std::shared_ptr event_data_ = std::make_shared(); + + BenchmarkStats benchmark_stats_; + std::condition_variable benchmark_cv_; + std::mutex benchmark_lock_; + bool benchmark_done_ = false; }; diff --git a/tools/replay/seg_mgr.cc b/tools/replay/seg_mgr.cc index f4e865d47..0778cacbc 100644 --- a/tools/replay/seg_mgr.cc +++ b/tools/replay/seg_mgr.cc @@ -118,9 +118,15 @@ void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap: for (auto it = first; it != last; ++it) { auto &segment_ptr = it->second; if (!segment_ptr) { + if (onBenchmarkEvent_) { + onBenchmarkEvent_(it->first, "loading"); + } segment_ptr = std::make_shared( it->first, route_.at(it->first), flags_, filters_, [this](int seg_num, bool success) { + if (onBenchmarkEvent_) { + onBenchmarkEvent_(seg_num, success ? "loaded" : "load failed"); + } std::unique_lock lock(mutex_); needs_update_ = true; cv_.notify_one(); diff --git a/tools/replay/seg_mgr.h b/tools/replay/seg_mgr.h index 640169749..54e156fb6 100644 --- a/tools/replay/seg_mgr.h +++ b/tools/replay/seg_mgr.h @@ -27,6 +27,7 @@ public: bool load(); void setCurrentSegment(int seg_num); void setCallback(const std::function &callback) { onSegmentMergedCallback_ = callback; } + void setBenchmarkCallback(const std::function &callback) { onBenchmarkEvent_ = callback; } void setFilters(const std::vector &filters) { filters_ = filters; } const std::shared_ptr getEventData() const { return std::atomic_load(&event_data_); } bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); } @@ -52,5 +53,6 @@ private: SegmentMap segments_; std::shared_ptr event_data_; std::function onSegmentMergedCallback_ = nullptr; + std::function onBenchmarkEvent_ = nullptr; std::set merged_segments_; };