replay: add --benchmark mode (#36957)
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
#include <getopt.h>
|
||||
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common/prefix.h"
|
||||
#include "common/timing.h"
|
||||
#include "tools/replay/consoleui.h"
|
||||
#include "tools/replay/replay.h"
|
||||
#include "tools/replay/util.h"
|
||||
@@ -31,6 +33,7 @@ Options:
|
||||
--no-hw-decoder Disable HW video decoding
|
||||
--no-vipc Do not output video
|
||||
--all Output all messages including bookmarkButton, uiDebug, userBookmark
|
||||
--benchmark Run in benchmark mode (process all events then exit with stats)
|
||||
-h, --help Show this help message
|
||||
)";
|
||||
|
||||
@@ -66,6 +69,7 @@ bool parseArgs(int argc, char *argv[], ReplayConfig &config) {
|
||||
{"no-hw-decoder", no_argument, nullptr, 0},
|
||||
{"no-vipc", no_argument, nullptr, 0},
|
||||
{"all", no_argument, nullptr, 0},
|
||||
{"benchmark", no_argument, nullptr, 0},
|
||||
{"help", no_argument, nullptr, 'h'},
|
||||
{nullptr, 0, nullptr, 0}, // Terminating entry
|
||||
};
|
||||
@@ -79,6 +83,7 @@ bool parseArgs(int argc, char *argv[], ReplayConfig &config) {
|
||||
{"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER},
|
||||
{"no-vipc", REPLAY_FLAG_NO_VIPC},
|
||||
{"all", REPLAY_FLAG_ALL_SERVICES},
|
||||
{"benchmark", REPLAY_FLAG_BENCHMARK},
|
||||
};
|
||||
|
||||
if (argc == 1) {
|
||||
@@ -149,6 +154,28 @@ int main(int argc, char *argv[]) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (config.flags & REPLAY_FLAG_BENCHMARK) {
|
||||
replay.start(config.start_seconds);
|
||||
replay.waitForFinished();
|
||||
|
||||
const auto &stats = replay.getBenchmarkStats();
|
||||
uint64_t process_start = stats.process_start_ts;
|
||||
|
||||
std::cout << "\n===== REPLAY BENCHMARK RESULTS =====\n";
|
||||
std::cout << "Route: " << replay.route().name() << "\n\n";
|
||||
|
||||
std::cout << "TIMELINE:\n";
|
||||
std::cout << " t=0 ms process start\n";
|
||||
for (const auto &[ts, event] : stats.timeline) {
|
||||
double ms = (ts - process_start) / 1e6;
|
||||
std::cout << " t=" << std::fixed << std::setprecision(0) << ms << " ms"
|
||||
<< std::string(std::max(1, 8 - static_cast<int>(std::to_string(static_cast<int>(ms)).length())), ' ')
|
||||
<< event << "\n";
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
ConsoleUI console_ui(&replay);
|
||||
replay.start(config.start_seconds);
|
||||
return console_ui.exec();
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
#include <capnp/dynamic.h>
|
||||
#include <csignal>
|
||||
#include <iomanip>
|
||||
#include <sstream>
|
||||
#include "cereal/services.h"
|
||||
#include "common/params.h"
|
||||
#include "tools/replay/util.h"
|
||||
@@ -19,6 +21,14 @@ Replay::Replay(const std::string &route, std::vector<std::string> allow, std::ve
|
||||
: sm_(sm), flags_(flags), seg_mgr_(std::make_unique<SegmentManager>(route, flags, data_dir, auto_source)) {
|
||||
std::signal(SIGUSR1, interrupt_sleep_handler);
|
||||
|
||||
if (flags_ & REPLAY_FLAG_BENCHMARK) {
|
||||
benchmark_stats_.process_start_ts = nanos_since_boot();
|
||||
seg_mgr_->setBenchmarkCallback([this](int seg_num, const std::string& event) {
|
||||
benchmark_stats_.timeline.emplace_back(nanos_since_boot(),
|
||||
"segment " + std::to_string(seg_num) + " " + event);
|
||||
});
|
||||
}
|
||||
|
||||
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
|
||||
block.insert(block.end(), {"bookmarkButton", "uiDebug", "userBookmark"});
|
||||
}
|
||||
@@ -78,8 +88,13 @@ Replay::~Replay() {
|
||||
|
||||
bool Replay::load() {
|
||||
rInfo("loading route %s", seg_mgr_->route_.name().c_str());
|
||||
|
||||
if (!seg_mgr_->load()) return false;
|
||||
|
||||
if (hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "route metadata loaded");
|
||||
}
|
||||
|
||||
min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60;
|
||||
max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60;
|
||||
return true;
|
||||
@@ -257,8 +272,13 @@ void Replay::streamThread() {
|
||||
stream_thread_id = pthread_self();
|
||||
std::unique_lock lk(stream_lock_);
|
||||
|
||||
int last_processed_segment = -1;
|
||||
uint64_t segment_start_time = 0;
|
||||
bool streaming_started = false;
|
||||
|
||||
while (true) {
|
||||
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
|
||||
|
||||
if (exit_) break;
|
||||
|
||||
event_data_ = seg_mgr_->getEventData();
|
||||
@@ -270,14 +290,19 @@ void Replay::streamThread() {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto it = publishEvents(first, events.cend());
|
||||
if (!streaming_started && hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "streaming started");
|
||||
streaming_started = true;
|
||||
}
|
||||
|
||||
auto it = publishEvents(first, events.cend(), last_processed_segment, segment_start_time);
|
||||
|
||||
// Ensure frames are sent before unlocking to prevent race conditions
|
||||
if (camera_server_) {
|
||||
camera_server_->waitForSent();
|
||||
}
|
||||
|
||||
if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
|
||||
if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP) && !hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
int last_segment = seg_mgr_->route_.segments().rbegin()->first;
|
||||
if (event_data_->isSegmentLoaded(last_segment)) {
|
||||
rInfo("reaches the end of route, restart from beginning");
|
||||
@@ -285,12 +310,28 @@ void Replay::streamThread() {
|
||||
seekTo(minSeconds(), false);
|
||||
stream_lock_.lock();
|
||||
}
|
||||
} else if (it == events.cend() && hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
// Exit benchmark mode after first segment completes
|
||||
exit_ = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "benchmark done");
|
||||
|
||||
{
|
||||
std::unique_lock lock(benchmark_lock_);
|
||||
benchmark_done_ = true;
|
||||
}
|
||||
benchmark_cv_.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
|
||||
std::vector<Event>::const_iterator last) {
|
||||
std::vector<Event>::const_iterator last,
|
||||
int &last_processed_segment,
|
||||
uint64_t &segment_start_time) {
|
||||
uint64_t evt_start_ts = cur_mono_time_;
|
||||
uint64_t loop_start_ts = nanos_since_boot();
|
||||
double prev_replay_speed = speed_;
|
||||
@@ -304,6 +345,23 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
|
||||
seg_mgr_->setCurrentSegment(segment);
|
||||
}
|
||||
|
||||
// Track segment completion for benchmark timeline
|
||||
if (hasFlag(REPLAY_FLAG_BENCHMARK) && segment != last_processed_segment) {
|
||||
if (last_processed_segment >= 0 && segment_start_time > 0) {
|
||||
uint64_t processing_time_ns = nanos_since_boot() - segment_start_time;
|
||||
double processing_time_ms = processing_time_ns / 1e6;
|
||||
double realtime_factor = 60.0 / (processing_time_ns / 1e9); // 60s per segment
|
||||
|
||||
std::ostringstream oss;
|
||||
oss << "segment " << last_processed_segment << " done publishing ("
|
||||
<< std::fixed << std::setprecision(0) << processing_time_ms << " ms, "
|
||||
<< std::fixed << std::setprecision(0) << realtime_factor << "x realtime)";
|
||||
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), oss.str());
|
||||
}
|
||||
segment_start_time = nanos_since_boot();
|
||||
last_processed_segment = segment;
|
||||
}
|
||||
|
||||
cur_mono_time_ = evt.mono_time;
|
||||
cur_which_ = evt.which;
|
||||
|
||||
@@ -320,7 +378,8 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
|
||||
evt_start_ts = evt.mono_time;
|
||||
loop_start_ts = current_nanos;
|
||||
prev_replay_speed = speed_;
|
||||
} else if (time_diff > 0) {
|
||||
} else if (time_diff > 0 && !hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
// Skip sleep in benchmark mode for maximum throughput
|
||||
precise_nano_sleep(time_diff, interrupt_requested_);
|
||||
}
|
||||
|
||||
@@ -338,3 +397,12 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
|
||||
|
||||
return first;
|
||||
}
|
||||
|
||||
void Replay::waitForFinished() {
|
||||
if (!hasFlag(REPLAY_FLAG_BENCHMARK)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::unique_lock lock(benchmark_lock_);
|
||||
benchmark_cv_.wait(lock, [this]() { return benchmark_done_; });
|
||||
}
|
||||
|
||||
@@ -24,6 +24,12 @@ enum REPLAY_FLAGS {
|
||||
REPLAY_FLAG_NO_HW_DECODER = 0x0100,
|
||||
REPLAY_FLAG_NO_VIPC = 0x0400,
|
||||
REPLAY_FLAG_ALL_SERVICES = 0x0800,
|
||||
REPLAY_FLAG_BENCHMARK = 0x1000,
|
||||
};
|
||||
|
||||
struct BenchmarkStats {
|
||||
uint64_t process_start_ts = 0;
|
||||
std::vector<std::pair<uint64_t, std::string>> timeline;
|
||||
};
|
||||
|
||||
class Replay {
|
||||
@@ -57,6 +63,8 @@ public:
|
||||
inline const std::optional<Timeline::Entry> findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); }
|
||||
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return seg_mgr_->getEventData(); }
|
||||
void installEventFilter(std::function<bool(const Event *)> filter) { event_filter_ = filter; }
|
||||
void waitForFinished();
|
||||
const BenchmarkStats &getBenchmarkStats() const { return benchmark_stats_; }
|
||||
|
||||
// Event callback functions
|
||||
std::function<void()> onSegmentsMerged = nullptr;
|
||||
@@ -72,7 +80,9 @@ private:
|
||||
void handleSegmentMerge();
|
||||
void interruptStream(const std::function<bool()>& update_fn);
|
||||
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
|
||||
std::vector<Event>::const_iterator last);
|
||||
std::vector<Event>::const_iterator last,
|
||||
int &last_processed_segment,
|
||||
uint64_t &segment_start_time);
|
||||
void publishMessage(const Event *e);
|
||||
void publishFrame(const Event *e);
|
||||
void checkSeekProgress();
|
||||
@@ -107,4 +117,9 @@ private:
|
||||
std::function<bool(const Event *)> event_filter_ = nullptr;
|
||||
|
||||
std::shared_ptr<SegmentManager::EventData> event_data_ = std::make_shared<SegmentManager::EventData>();
|
||||
|
||||
BenchmarkStats benchmark_stats_;
|
||||
std::condition_variable benchmark_cv_;
|
||||
std::mutex benchmark_lock_;
|
||||
bool benchmark_done_ = false;
|
||||
};
|
||||
|
||||
@@ -118,9 +118,15 @@ void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap:
|
||||
for (auto it = first; it != last; ++it) {
|
||||
auto &segment_ptr = it->second;
|
||||
if (!segment_ptr) {
|
||||
if (onBenchmarkEvent_) {
|
||||
onBenchmarkEvent_(it->first, "loading");
|
||||
}
|
||||
segment_ptr = std::make_shared<Segment>(
|
||||
it->first, route_.at(it->first), flags_, filters_,
|
||||
[this](int seg_num, bool success) {
|
||||
if (onBenchmarkEvent_) {
|
||||
onBenchmarkEvent_(seg_num, success ? "loaded" : "load failed");
|
||||
}
|
||||
std::unique_lock lock(mutex_);
|
||||
needs_update_ = true;
|
||||
cv_.notify_one();
|
||||
|
||||
@@ -27,6 +27,7 @@ public:
|
||||
bool load();
|
||||
void setCurrentSegment(int seg_num);
|
||||
void setCallback(const std::function<void()> &callback) { onSegmentMergedCallback_ = callback; }
|
||||
void setBenchmarkCallback(const std::function<void(int, const std::string&)> &callback) { onBenchmarkEvent_ = callback; }
|
||||
void setFilters(const std::vector<bool> &filters) { filters_ = filters; }
|
||||
const std::shared_ptr<EventData> getEventData() const { return std::atomic_load(&event_data_); }
|
||||
bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); }
|
||||
@@ -52,5 +53,6 @@ private:
|
||||
SegmentMap segments_;
|
||||
std::shared_ptr<EventData> event_data_;
|
||||
std::function<void()> onSegmentMergedCallback_ = nullptr;
|
||||
std::function<void(int, const std::string&)> onBenchmarkEvent_ = nullptr;
|
||||
std::set<int> merged_segments_;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user