Files
onepilot/tools/replay/replay.cc
github-actions[bot] 7fa972be6a sunnypilot v2026.02.09-4080
version: sunnypilot v2025.003.000 (dev)
date: 2026-02-09T02:04:38
master commit: 254f55ac15a40343d7255f2f098de3442e0c4a6f
2026-02-09 02:04:38 +00:00

409 lines
14 KiB
C++

#include "tools/replay/replay.h"
#include <capnp/dynamic.h>
#include <csignal>
#include <iomanip>
#include <sstream>
#include "cereal/services.h"
#include "common/params.h"
#include "tools/replay/util.h"
static void interrupt_sleep_handler(int signal) {}
// Helper function to notify events with safety checks
template <typename Callback, typename... Args>
void notifyEvent(Callback &callback, Args &&...args) {
if (callback) callback(std::forward<Args>(args)...);
}
Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block,
SubMaster *sm, uint32_t flags, const std::string &data_dir, bool auto_source)
: sm_(sm), flags_(flags), seg_mgr_(std::make_unique<SegmentManager>(route, flags, data_dir, auto_source)) {
std::signal(SIGUSR1, interrupt_sleep_handler);
if (flags_ & REPLAY_FLAG_BENCHMARK) {
benchmark_stats_.process_start_ts = nanos_since_boot();
seg_mgr_->setBenchmarkCallback([this](int seg_num, const std::string& event) {
benchmark_stats_.timeline.emplace_back(nanos_since_boot(),
"segment " + std::to_string(seg_num) + " " + event);
});
}
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block.insert(block.end(), {"bookmarkButton", "uiDebug", "userBookmark"});
}
setupServices(allow, block);
setupSegmentManager(!allow.empty() || !block.empty());
}
void Replay::setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block) {
auto event_schema = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_schema.getUnionFields().size(), nullptr);
std::vector<const char *> active_services;
active_services.reserve(services.size());
for (const auto &[name, _] : services) {
bool is_blocked = std::find(block.begin(), block.end(), name) != block.end();
bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end();
if (is_allowed && !is_blocked) {
uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue();
sockets_[which] = name.c_str();
active_services.push_back(name.c_str());
}
}
std::string services_str = join(active_services, ", ");
rInfo("active services: %s", services_str.c_str());
if (!sm_) {
pm_ = std::make_unique<PubMaster>(active_services);
}
}
void Replay::setupSegmentManager(bool has_filters) {
seg_mgr_->setCallback([this]() { handleSegmentMerge(); });
if (has_filters) {
std::vector<bool> filters(sockets_.size(), false);
for (size_t i = 0; i < sockets_.size(); ++i) {
filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
}
seg_mgr_->setFilters(filters);
}
}
Replay::~Replay() {
if (stream_thread_.joinable()) {
rInfo("shutdown: in progress...");
interruptStream([this]() {
exit_ = true;
return false;
});
stream_thread_.join();
rInfo("shutdown: done");
}
camera_server_.reset();
seg_mgr_.reset();
}
bool Replay::load() {
rInfo("loading route %s", seg_mgr_->route_.name().c_str());
if (!seg_mgr_->load()) return false;
if (hasFlag(REPLAY_FLAG_BENCHMARK)) {
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "route metadata loaded");
}
min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60;
max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60;
return true;
}
void Replay::interruptStream(const std::function<bool()> &update_fn) {
if (stream_thread_.joinable() && stream_thread_id) {
pthread_kill(stream_thread_id, SIGUSR1); // Interrupt sleep in stream thread
}
{
interrupt_requested_ = true;
std::unique_lock lock(stream_lock_);
events_ready_ = update_fn();
interrupt_requested_ = user_paused_;
}
stream_cv_.notify_one();
}
void Replay::seekTo(double seconds, bool relative) {
double target_time = relative ? seconds + currentSeconds() : seconds;
target_time = std::max(0.0, target_time);
int target_segment = target_time / 60;
if (!seg_mgr_->hasSegment(target_segment)) {
rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment);
return;
}
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
notifyEvent(onSeeking, target_time);
interruptStream([&]() {
current_segment_.store(target_segment);
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
cur_which_ = cereal::Event::Which::INIT_DATA;
seeking_to_.store(target_time, std::memory_order_relaxed);
return false;
});
seg_mgr_->setCurrentSegment(target_segment);
checkSeekProgress();
}
void Replay::checkSeekProgress() {
if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_.load())) return;
double seek_to = seeking_to_.exchange(-1.0, std::memory_order_acquire);
if (seek_to >= 0 && onSeekedTo) {
onSeekedTo(seek_to);
}
// Resume the interrupted stream
interruptStream([]() { return true; });
}
void Replay::seekToFlag(FindFlag flag) {
if (auto next = timeline_.find(currentSeconds(), flag)) {
seekTo(*next - 2, false); // seek to 2 seconds before next
}
}
void Replay::pause(bool pause) {
if (user_paused_ != pause) {
interruptStream([=]() {
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
user_paused_ = pause;
return !pause;
});
}
}
void Replay::handleSegmentMerge() {
if (exit_) return;
auto event_data = seg_mgr_->getEventData();
if (!stream_thread_.joinable() && !event_data->segments.empty()) {
startStream(event_data->segments.begin()->second);
}
notifyEvent(onSegmentsMerged);
// Interrupt the stream to handle segment merge
interruptStream([]() { return false; });
checkSeekProgress();
}
void Replay::startStream(const std::shared_ptr<Segment> segment) {
const auto &events = segment->log->events;
route_start_ts_ = events.front().mono_time;
cur_mono_time_ += route_start_ts_ - 1;
// get datetime from INIT_DATA, fallback to datetime in the route name
route_date_time_ = route().datetime();
auto it = std::find_if(events.cbegin(), events.cend(),
[](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
if (it != events.cend()) {
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) {
route_date_time_ = wall_time / 1e6;
}
}
// write CarParams
it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) {
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
car_fingerprint_ = event.getCarParams().getCarFingerprint();
capnp::MallocMessageBuilder builder;
builder.setRoot(event.getCarParams());
auto words = capnp::messageToFlatArray(builder);
auto bytes = words.asBytes();
Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
Params().put("CarParamsPersistent", (const char *)bytes.begin(), bytes.size());
} else {
rWarning("failed to read CarParams from current segment");
}
// start camera server
if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
std::pair<int, int> camera_size[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) {
if (auto &fr = segment->frames[type]) {
camera_size[type] = {fr->width, fr->height};
}
}
camera_server_ = std::make_unique<CameraServer>(camera_size);
}
timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) { notifyEvent(onQLogLoaded, log); });
stream_thread_ = std::thread(&Replay::streamThread, this);
}
void Replay::publishMessage(const Event *e) {
if (event_filter_ && event_filter_(e)) return;
if (!sm_) {
auto bytes = e->data.asBytes();
int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
sockets_[e->which] = nullptr;
}
} else {
capnp::FlatArrayMessageReader reader(e->data);
auto event = reader.getRoot<cereal::Event>();
sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
}
}
void Replay::publishFrame(const Event *e) {
CameraType cam;
switch (e->which) {
case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
default: return; // Invalid event type
}
if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
return; // Camera isdisabled
auto seg_it = event_data_->segments.find(e->eidx_segnum);
if (seg_it != event_data_->segments.end()) {
if (auto &frame = seg_it->second->frames[cam]; frame) {
camera_server_->pushFrame(cam, frame.get(), e);
}
}
}
void Replay::streamThread() {
stream_thread_id = pthread_self();
std::unique_lock lk(stream_lock_);
int last_processed_segment = -1;
uint64_t segment_start_time = 0;
bool streaming_started = false;
while (true) {
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
if (exit_) break;
event_data_ = seg_mgr_->getEventData();
const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {}));
if (first == events.cend()) {
rInfo("waiting for events...");
events_ready_ = false;
continue;
}
if (!streaming_started && hasFlag(REPLAY_FLAG_BENCHMARK)) {
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "streaming started");
streaming_started = true;
}
auto it = publishEvents(first, events.cend(), last_processed_segment, segment_start_time);
// Ensure frames are sent before unlocking to prevent race conditions
if (camera_server_) {
camera_server_->waitForSent();
}
if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP) && !hasFlag(REPLAY_FLAG_BENCHMARK)) {
int last_segment = seg_mgr_->route_.segments().rbegin()->first;
if (event_data_->isSegmentLoaded(last_segment)) {
rInfo("reaches the end of route, restart from beginning");
stream_lock_.unlock();
seekTo(minSeconds(), false);
stream_lock_.lock();
}
} else if (it == events.cend() && hasFlag(REPLAY_FLAG_BENCHMARK)) {
// Exit benchmark mode after first segment completes
exit_ = true;
break;
}
}
if (hasFlag(REPLAY_FLAG_BENCHMARK)) {
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), "benchmark done");
{
std::unique_lock lock(benchmark_lock_);
benchmark_done_ = true;
}
benchmark_cv_.notify_one();
}
}
std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last,
int &last_processed_segment,
uint64_t &segment_start_time) {
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
double prev_replay_speed = speed_;
for (; !interrupt_requested_ && first != last; ++first) {
const Event &evt = *first;
int segment = toSeconds(evt.mono_time) / 60;
if (current_segment_.load(std::memory_order_relaxed) != segment) {
current_segment_.store(segment, std::memory_order_relaxed);
seg_mgr_->setCurrentSegment(segment);
}
// Track segment completion for benchmark timeline
if (hasFlag(REPLAY_FLAG_BENCHMARK) && segment != last_processed_segment) {
if (last_processed_segment >= 0 && segment_start_time > 0) {
uint64_t processing_time_ns = nanos_since_boot() - segment_start_time;
double processing_time_ms = processing_time_ns / 1e6;
double realtime_factor = 60.0 / (processing_time_ns / 1e9); // 60s per segment
std::ostringstream oss;
oss << "segment " << last_processed_segment << " done publishing ("
<< std::fixed << std::setprecision(0) << processing_time_ms << " ms, "
<< std::fixed << std::setprecision(0) << realtime_factor << "x realtime)";
benchmark_stats_.timeline.emplace_back(nanos_since_boot(), oss.str());
}
segment_start_time = nanos_since_boot();
last_processed_segment = segment;
}
cur_mono_time_ = evt.mono_time;
cur_which_ = evt.which;
// Skip events if socket is not present
if (!sockets_[evt.which]) continue;
const uint64_t current_nanos = nanos_since_boot();
const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);
// Reset timestamps for potential synchronization issues:
// - A negative time_diff may indicate slow execution or system wake-up,
// - A time_diff exceeding 1 second suggests a skipped segment.
if ((time_diff < -1e9 || time_diff >= 1e9) || speed_ != prev_replay_speed) {
evt_start_ts = evt.mono_time;
loop_start_ts = current_nanos;
prev_replay_speed = speed_;
} else if (time_diff > 0 && !hasFlag(REPLAY_FLAG_BENCHMARK)) {
// Skip sleep in benchmark mode for maximum throughput
precise_nano_sleep(time_diff, interrupt_requested_);
}
if (interrupt_requested_) break;
if (evt.eidx_segnum == -1) {
publishMessage(&evt);
} else if (camera_server_) {
if (speed_ > 1.0) {
camera_server_->waitForSent();
}
publishFrame(&evt);
}
}
return first;
}
void Replay::waitForFinished() {
if (!hasFlag(REPLAY_FLAG_BENCHMARK)) {
return;
}
std::unique_lock lock(benchmark_lock_);
benchmark_cv_.wait(lock, [this]() { return benchmark_done_; });
}