mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-18 21:14:01 +08:00
replay: bug fixes and improvements (#32193)
This commit is contained in:
@@ -33,13 +33,12 @@ void ReplayStream::mergeSegments() {
|
||||
|
||||
std::vector<const CanEvent *> new_events;
|
||||
new_events.reserve(seg->log->events.size());
|
||||
for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
const uint64_t ts = (*it)->mono_time;
|
||||
capnp::FlatArrayMessageReader reader((*it)->data);
|
||||
for (const Event &e : seg->log->events) {
|
||||
if (e.which == cereal::Event::Which::CAN) {
|
||||
capnp::FlatArrayMessageReader reader(e.data);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
for (const auto &c : event.getCan()) {
|
||||
new_events.push_back(newEvent(ts, c));
|
||||
new_events.push_back(newEvent(e.mono_time, c));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,13 +257,13 @@ void Slider::setTimeRange(double min, double max) {
|
||||
void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
|
||||
const auto &segments = qobject_cast<ReplayStream *>(can)->route()->segments();
|
||||
if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) {
|
||||
emit updateMaximumTime(qlog->events.back()->mono_time / 1e9 - can->routeStartTime());
|
||||
emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime());
|
||||
}
|
||||
|
||||
std::mutex mutex;
|
||||
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) {
|
||||
if (e->which == cereal::Event::Which::THUMBNAIL) {
|
||||
capnp::FlatArrayMessageReader reader(e->data);
|
||||
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) {
|
||||
if (e.which == cereal::Event::Which::THUMBNAIL) {
|
||||
capnp::FlatArrayMessageReader reader(e.data);
|
||||
auto thumb = reader.getRoot<cereal::Event>().getThumbnail();
|
||||
auto data = thumb.getThumbnail();
|
||||
if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) {
|
||||
@@ -271,13 +271,13 @@ void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
|
||||
std::lock_guard lk(mutex);
|
||||
thumbnails[thumb.getTimestampEof()] = scaled;
|
||||
}
|
||||
} else if (e->which == cereal::Event::Which::CONTROLS_STATE) {
|
||||
capnp::FlatArrayMessageReader reader(e->data);
|
||||
} else if (e.which == cereal::Event::Which::CONTROLS_STATE) {
|
||||
capnp::FlatArrayMessageReader reader(e.data);
|
||||
auto cs = reader.getRoot<cereal::Event>().getControlsState();
|
||||
if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 &&
|
||||
cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) {
|
||||
std::lock_guard lk(mutex);
|
||||
alerts.emplace(e->mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
|
||||
alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -172,7 +172,7 @@ void ConsoleUI::updateStatus() {
|
||||
if (status != Status::Paused) {
|
||||
auto events = replay->events();
|
||||
uint64_t current_mono_time = replay->routeStartTime() + replay->currentSeconds() * 1e9;
|
||||
bool playing = !events->empty() && events->back()->mono_time > current_mono_time;
|
||||
bool playing = !events->empty() && events->back().mono_time > current_mono_time;
|
||||
status = playing ? Status::Playing : Status::Waiting;
|
||||
}
|
||||
auto [status_str, status_color] = status_text[status];
|
||||
@@ -368,7 +368,6 @@ void ConsoleUI::handleKey(char c) {
|
||||
} else if (c == ' ') {
|
||||
pauseReplay(!replay->isPaused());
|
||||
} else if (c == 'q' || c == 'Q') {
|
||||
replay->stop();
|
||||
qApp->exit();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,10 @@ std::string FileReader::read(const std::string &file, std::atomic<bool> *abort)
|
||||
|
||||
std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
|
||||
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
|
||||
if (i > 0) rWarning("download failed, retrying %d", i);
|
||||
if (i > 0) {
|
||||
rWarning("download failed, retrying %d", i);
|
||||
util::sleep_for(3000);
|
||||
}
|
||||
|
||||
std::string result = httpGet(url, chunk_size_, abort);
|
||||
if (!result.empty()) {
|
||||
|
||||
@@ -4,16 +4,6 @@
|
||||
#include "tools/replay/filereader.h"
|
||||
#include "tools/replay/util.h"
|
||||
|
||||
LogReader::LogReader(size_t memory_pool_block_size) {
|
||||
events.reserve(memory_pool_block_size);
|
||||
}
|
||||
|
||||
LogReader::~LogReader() {
|
||||
for (Event *e : events) {
|
||||
delete e;
|
||||
}
|
||||
}
|
||||
|
||||
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
|
||||
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
|
||||
if (raw_.empty()) return false;
|
||||
@@ -22,17 +12,13 @@ bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool loca
|
||||
raw_ = decompressBZ2(raw_, abort);
|
||||
if (raw_.empty()) return false;
|
||||
}
|
||||
return parse(abort);
|
||||
return load(raw_.data(), raw_.size(), abort);
|
||||
}
|
||||
|
||||
bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
|
||||
raw_.assign((const char *)data, size);
|
||||
return parse(abort);
|
||||
}
|
||||
|
||||
bool LogReader::parse(std::atomic<bool> *abort) {
|
||||
bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
|
||||
try {
|
||||
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
|
||||
events.reserve(65000);
|
||||
kj::ArrayPtr<const capnp::word> words((const capnp::word *)data, size / sizeof(capnp::word));
|
||||
while (words.size() > 0 && !(abort && *abort)) {
|
||||
capnp::FlatArrayMessageReader reader(words);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
@@ -40,16 +26,16 @@ bool LogReader::parse(std::atomic<bool> *abort) {
|
||||
uint64_t mono_time = event.getLogMonoTime();
|
||||
auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
|
||||
|
||||
Event *evt = events.emplace_back(newEvent(which, mono_time, event_data));
|
||||
const Event &evt = events.emplace_back(which, mono_time, event_data);
|
||||
// Add encodeIdx packet again as a frame packet for the video stream
|
||||
if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
|
||||
evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
|
||||
evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
|
||||
if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
|
||||
evt.which == cereal::Event::DRIVER_ENCODE_IDX ||
|
||||
evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
|
||||
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
|
||||
if (uint64_t sof = idx.getTimestampSof()) {
|
||||
mono_time = sof;
|
||||
}
|
||||
events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum()));
|
||||
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
|
||||
}
|
||||
|
||||
words = kj::arrayPtr(reader.getEnd(), words.end());
|
||||
@@ -59,16 +45,9 @@ bool LogReader::parse(std::atomic<bool> *abort) {
|
||||
}
|
||||
|
||||
if (!events.empty() && !(abort && *abort)) {
|
||||
std::sort(events.begin(), events.end(), Event::lessThan());
|
||||
events.shrink_to_fit();
|
||||
std::sort(events.begin(), events.end());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum) {
|
||||
#ifdef HAS_MEMORY_RESOURCE
|
||||
return new (&mbr_) Event(which, mono_time, words, eidx_segnum);
|
||||
#else
|
||||
return new Event(which, mono_time, words, eidx_segnum);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#if __has_include(<memory_resource>)
|
||||
#define HAS_MEMORY_RESOURCE 1
|
||||
#include <memory_resource>
|
||||
#endif
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
@@ -13,27 +8,15 @@
|
||||
|
||||
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
|
||||
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
|
||||
const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000;
|
||||
|
||||
class Event {
|
||||
public:
|
||||
Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &data, int eidx_segnum = -1)
|
||||
: which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {}
|
||||
|
||||
struct lessThan {
|
||||
inline bool operator()(const Event *l, const Event *r) {
|
||||
return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which);
|
||||
}
|
||||
};
|
||||
|
||||
#if HAS_MEMORY_RESOURCE
|
||||
void *operator new(size_t size, std::pmr::monotonic_buffer_resource *mbr) {
|
||||
return mbr->allocate(size);
|
||||
bool operator<(const Event &other) const {
|
||||
return mono_time < other.mono_time || (mono_time == other.mono_time && which < other.which);
|
||||
}
|
||||
void operator delete(void *ptr) {
|
||||
// No-op. memory used by EventMemoryPool increases monotonically until the logReader is destroyed.
|
||||
}
|
||||
#endif
|
||||
|
||||
uint64_t mono_time;
|
||||
cereal::Event::Which which;
|
||||
@@ -43,18 +26,11 @@ public:
|
||||
|
||||
class LogReader {
|
||||
public:
|
||||
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
|
||||
~LogReader();
|
||||
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
|
||||
bool local_cache = false, int chunk_size = -1, int retries = 0);
|
||||
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
|
||||
std::vector<Event*> events;
|
||||
bool load(const char *data, size_t size, std::atomic<bool> *abort = nullptr);
|
||||
std::vector<Event> events;
|
||||
|
||||
private:
|
||||
Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum = -1);
|
||||
bool parse(std::atomic<bool> *abort);
|
||||
std::string raw_;
|
||||
#ifdef HAS_MEMORY_RESOURCE
|
||||
std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)};
|
||||
#endif
|
||||
};
|
||||
|
||||
@@ -2,15 +2,20 @@
|
||||
|
||||
#include <QDebug>
|
||||
#include <QtConcurrent>
|
||||
|
||||
#include <capnp/dynamic.h>
|
||||
#include <csignal>
|
||||
#include "cereal/services.h"
|
||||
#include "common/params.h"
|
||||
#include "common/timing.h"
|
||||
#include "tools/replay/util.h"
|
||||
|
||||
static void interrupt_sleep_handler(int signal) {}
|
||||
|
||||
Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_,
|
||||
uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
|
||||
// Register signal handler for SIGUSR1
|
||||
std::signal(SIGUSR1, interrupt_sleep_handler);
|
||||
|
||||
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
|
||||
block << "uiDebug" << "userFlag";
|
||||
}
|
||||
@@ -33,28 +38,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
|
||||
pm = std::make_unique<PubMaster>(s);
|
||||
}
|
||||
route_ = std::make_unique<Route>(route, data_dir);
|
||||
events_ = std::make_unique<std::vector<Event *>>();
|
||||
new_events_ = std::make_unique<std::vector<Event *>>();
|
||||
}
|
||||
|
||||
Replay::~Replay() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void Replay::stop() {
|
||||
if (!stream_thread_ && segments_.empty()) return;
|
||||
|
||||
rInfo("shutdown: in progress...");
|
||||
if (stream_thread_ != nullptr) {
|
||||
exit_ = updating_events_ = true;
|
||||
exit_ =true;
|
||||
paused_ = true;
|
||||
stream_cv_.notify_one();
|
||||
stream_thread_->quit();
|
||||
stream_thread_->wait();
|
||||
stream_thread_ = nullptr;
|
||||
delete stream_thread_;
|
||||
}
|
||||
camera_server_.reset(nullptr);
|
||||
timeline_future.waitForFinished();
|
||||
segments_.clear();
|
||||
rInfo("shutdown: done");
|
||||
}
|
||||
|
||||
@@ -84,13 +82,12 @@ void Replay::start(int seconds) {
|
||||
seekTo(route_->identifier().begin_segment * 60 + seconds, false);
|
||||
}
|
||||
|
||||
void Replay::updateEvents(const std::function<bool()> &lambda) {
|
||||
// set updating_events to true to force stream thread release the lock and wait for events_updated.
|
||||
updating_events_ = true;
|
||||
void Replay::updateEvents(const std::function<bool()> &update_events_function) {
|
||||
pauseStreamThread();
|
||||
{
|
||||
std::unique_lock lk(stream_lock_);
|
||||
events_updated_ = lambda();
|
||||
updating_events_ = false;
|
||||
events_ready_ = update_events_function();
|
||||
paused_ = user_paused_;
|
||||
}
|
||||
stream_cv_.notify_one();
|
||||
}
|
||||
@@ -117,7 +114,7 @@ void Replay::seekTo(double seconds, bool relative) {
|
||||
}
|
||||
return segment_merged;
|
||||
});
|
||||
queueSegment();
|
||||
updateSegmentsCache();
|
||||
}
|
||||
|
||||
void Replay::seekToFlag(FindFlag flag) {
|
||||
@@ -146,34 +143,34 @@ void Replay::buildTimeline() {
|
||||
std::shared_ptr<LogReader> log(new LogReader());
|
||||
if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
|
||||
|
||||
for (const Event *e : log->events) {
|
||||
if (e->which == cereal::Event::Which::CONTROLS_STATE) {
|
||||
capnp::FlatArrayMessageReader reader(e->data);
|
||||
for (const Event &e : log->events) {
|
||||
if (e.which == cereal::Event::Which::CONTROLS_STATE) {
|
||||
capnp::FlatArrayMessageReader reader(e.data);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
auto cs = event.getControlsState();
|
||||
|
||||
if (engaged != cs.getEnabled()) {
|
||||
if (engaged) {
|
||||
std::lock_guard lk(timeline_lock);
|
||||
timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged});
|
||||
timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
|
||||
}
|
||||
engaged_begin = e->mono_time;
|
||||
engaged_begin = e.mono_time;
|
||||
engaged = cs.getEnabled();
|
||||
}
|
||||
|
||||
if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
|
||||
if (!alert_type.empty() && alert_size != cereal::ControlsState::AlertSize::NONE) {
|
||||
std::lock_guard lk(timeline_lock);
|
||||
timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), timeline_types[(int)alert_status]});
|
||||
timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
|
||||
}
|
||||
alert_begin = e->mono_time;
|
||||
alert_begin = e.mono_time;
|
||||
alert_type = cs.getAlertType().cStr();
|
||||
alert_size = cs.getAlertSize();
|
||||
alert_status = cs.getAlertStatus();
|
||||
}
|
||||
} else if (e->which == cereal::Event::Which::USER_FLAG) {
|
||||
} else if (e.which == cereal::Event::Which::USER_FLAG) {
|
||||
std::lock_guard lk(timeline_lock);
|
||||
timeline.push_back({toSeconds(e->mono_time), toSeconds(e->mono_time), TimelineType::UserFlag});
|
||||
timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
|
||||
}
|
||||
}
|
||||
std::sort(timeline.begin(), timeline.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
|
||||
@@ -203,16 +200,22 @@ std::optional<uint64_t> Replay::find(FindFlag flag) {
|
||||
}
|
||||
|
||||
void Replay::pause(bool pause) {
|
||||
updateEvents([=]() {
|
||||
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
|
||||
paused_ = pause;
|
||||
return true;
|
||||
});
|
||||
if (user_paused_ != pause) {
|
||||
pauseStreamThread();
|
||||
{
|
||||
std::unique_lock lk(stream_lock_);
|
||||
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
|
||||
paused_ = user_paused_ = pause;
|
||||
}
|
||||
stream_cv_.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void Replay::setCurrentSegment(int n) {
|
||||
if (current_segment_.exchange(n) != n) {
|
||||
QMetaObject::invokeMethod(this, &Replay::queueSegment, Qt::QueuedConnection);
|
||||
void Replay::pauseStreamThread() {
|
||||
paused_ = true;
|
||||
// Send SIGUSR1 to interrupt clock_nanosleep
|
||||
if (stream_thread_ && stream_thread_id) {
|
||||
pthread_kill(stream_thread_id, SIGUSR1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,27 +225,22 @@ void Replay::segmentLoadFinished(bool success) {
|
||||
rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
|
||||
updateEvents([&]() {
|
||||
segments_.erase(seg->seg_num);
|
||||
return true;
|
||||
return !segments_.empty();
|
||||
});
|
||||
}
|
||||
queueSegment();
|
||||
updateSegmentsCache();
|
||||
}
|
||||
|
||||
void Replay::queueSegment() {
|
||||
void Replay::updateSegmentsCache() {
|
||||
auto cur = segments_.lower_bound(current_segment_.load());
|
||||
if (cur == segments_.end()) return;
|
||||
|
||||
// Calculate the range of segments to load
|
||||
auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
|
||||
auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end())));
|
||||
begin = std::prev(end, std::min<int>(segment_cache_limit, std::distance(segments_.begin(), end)));
|
||||
// load one segment at a time
|
||||
auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); });
|
||||
if (it != end && !it->second) {
|
||||
rDebug("loading segment %d...", it->first);
|
||||
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
|
||||
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
|
||||
}
|
||||
|
||||
loadSegmentInRange(begin, cur, end);
|
||||
mergeSegments(begin, end);
|
||||
|
||||
// free segments out of current semgnt window.
|
||||
@@ -257,69 +255,81 @@ void Replay::queueSegment() {
|
||||
}
|
||||
}
|
||||
|
||||
void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
|
||||
auto loadNext = [this](auto begin, auto end) {
|
||||
auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
|
||||
if (it != end && !it->second) {
|
||||
rDebug("loading segment %d...", it->first);
|
||||
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
|
||||
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
// Load forward segments, then try reverse
|
||||
if (!loadNext(cur, end)) {
|
||||
loadNext(std::make_reverse_iterator(cur), segments_.rend());
|
||||
}
|
||||
}
|
||||
|
||||
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
|
||||
std::vector<int> segments_need_merge;
|
||||
std::set<int> segments_to_merge;
|
||||
size_t new_events_size = 0;
|
||||
for (auto it = begin; it != end; ++it) {
|
||||
if (it->second && it->second->isLoaded()) {
|
||||
segments_need_merge.push_back(it->first);
|
||||
segments_to_merge.insert(it->first);
|
||||
new_events_size += it->second->log->events.size();
|
||||
}
|
||||
}
|
||||
|
||||
if (segments_need_merge != segments_merged_) {
|
||||
std::string s;
|
||||
for (int i = 0; i < segments_need_merge.size(); ++i) {
|
||||
s += std::to_string(segments_need_merge[i]);
|
||||
if (i != segments_need_merge.size() - 1) s += ", ";
|
||||
}
|
||||
rDebug("merge segments %s", s.c_str());
|
||||
new_events_->clear();
|
||||
new_events_->reserve(new_events_size);
|
||||
for (int n : segments_need_merge) {
|
||||
size_t size = new_events_->size();
|
||||
const auto &events = segments_[n]->log->events;
|
||||
std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_),
|
||||
[this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; });
|
||||
std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan());
|
||||
}
|
||||
if (segments_to_merge == merged_segments_) return;
|
||||
|
||||
if (stream_thread_) {
|
||||
emit segmentsMerged();
|
||||
rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{},
|
||||
[](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str());
|
||||
|
||||
// Check if seeking is in progress
|
||||
if (seeking_to_seconds_ >= 0) {
|
||||
int target_segment = int(seeking_to_seconds_ / 60);
|
||||
auto segment_found = std::find(segments_need_merge.begin(), segments_need_merge.end(), target_segment);
|
||||
std::vector<Event> new_events;
|
||||
new_events.reserve(new_events_size);
|
||||
|
||||
// If the target segment is found, emit seekedTo signal and reset seeking_to_seconds_
|
||||
if (segment_found != segments_need_merge.end()) {
|
||||
emit seekedTo(seeking_to_seconds_);
|
||||
seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek
|
||||
}
|
||||
}
|
||||
}
|
||||
updateEvents([&]() {
|
||||
events_.swap(new_events_);
|
||||
segments_merged_ = segments_need_merge;
|
||||
// Do not wake up the stream thread if the current segment has not been merged.
|
||||
return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0);
|
||||
});
|
||||
// Merge events from segments_to_merge into new_events
|
||||
for (int n : segments_to_merge) {
|
||||
size_t size = new_events.size();
|
||||
const auto &events = segments_.at(n)->log->events;
|
||||
std::copy_if(events.begin(), events.end(), std::back_inserter(new_events),
|
||||
[this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; });
|
||||
std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end());
|
||||
}
|
||||
|
||||
if (stream_thread_) {
|
||||
emit segmentsMerged();
|
||||
|
||||
// Check if seeking is in progress
|
||||
int target_segment = int(seeking_to_seconds_ / 60);
|
||||
if (seeking_to_seconds_ >= 0 && segments_to_merge.count(target_segment) > 0) {
|
||||
emit seekedTo(seeking_to_seconds_);
|
||||
seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek
|
||||
}
|
||||
}
|
||||
|
||||
updateEvents([&]() {
|
||||
events_.swap(new_events);
|
||||
merged_segments_ = segments_to_merge;
|
||||
// Wake up the stream thread if the current segment is loaded or invalid.
|
||||
return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0);
|
||||
});
|
||||
}
|
||||
|
||||
void Replay::startStream(const Segment *cur_segment) {
|
||||
const auto &events = cur_segment->log->events;
|
||||
|
||||
route_start_ts_ = events.front()->mono_time;
|
||||
route_start_ts_ = events.front().mono_time;
|
||||
cur_mono_time_ += route_start_ts_ - 1;
|
||||
|
||||
// get datetime from INIT_DATA, fallback to datetime in the route name
|
||||
route_date_time_ = route()->datetime();
|
||||
auto it = std::find_if(events.cbegin(), events.cend(),
|
||||
[](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
|
||||
[](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
|
||||
if (it != events.cend()) {
|
||||
capnp::FlatArrayMessageReader reader((*it)->data);
|
||||
capnp::FlatArrayMessageReader reader(it->data);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
uint64_t wall_time = event.getInitData().getWallTimeNanos();
|
||||
if (wall_time > 0) {
|
||||
@@ -328,9 +338,9 @@ void Replay::startStream(const Segment *cur_segment) {
|
||||
}
|
||||
|
||||
// write CarParams
|
||||
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; });
|
||||
it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
|
||||
if (it != events.end()) {
|
||||
capnp::FlatArrayMessageReader reader((*it)->data);
|
||||
capnp::FlatArrayMessageReader reader(it->data);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
car_fingerprint_ = event.getCarParams().getCarFingerprint();
|
||||
capnp::MallocMessageBuilder builder;
|
||||
@@ -357,8 +367,7 @@ void Replay::startStream(const Segment *cur_segment) {
|
||||
emit segmentsMerged();
|
||||
// start stream thread
|
||||
stream_thread_ = new QThread();
|
||||
QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); });
|
||||
QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater);
|
||||
QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
|
||||
stream_thread_->start();
|
||||
|
||||
timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
|
||||
@@ -382,83 +391,54 @@ void Replay::publishMessage(const Event *e) {
|
||||
}
|
||||
|
||||
void Replay::publishFrame(const Event *e) {
|
||||
static const std::map<cereal::Event::Which, CameraType> cam_types{
|
||||
{cereal::Event::ROAD_ENCODE_IDX, RoadCam},
|
||||
{cereal::Event::DRIVER_ENCODE_IDX, DriverCam},
|
||||
{cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam},
|
||||
};
|
||||
if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !hasFlag(REPLAY_FLAG_DCAM)) ||
|
||||
(e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) {
|
||||
return;
|
||||
CameraType cam;
|
||||
switch (e->which) {
|
||||
case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
|
||||
case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
|
||||
case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
|
||||
default: return; // Invalid event type
|
||||
}
|
||||
|
||||
if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
|
||||
return; // Camera isdisabled
|
||||
|
||||
if (isSegmentMerged(e->eidx_segnum)) {
|
||||
auto &segment = segments_.at(e->eidx_segnum);
|
||||
auto cam = cam_types.at(e->which);
|
||||
if (auto &frame = segment->frames[cam]; frame) {
|
||||
camera_server_->pushFrame(cam, frame.get(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Replay::stream() {
|
||||
void Replay::streamThread() {
|
||||
stream_thread_id = pthread_self();
|
||||
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
|
||||
double prev_replay_speed = speed_;
|
||||
std::unique_lock lk(stream_lock_);
|
||||
|
||||
while (true) {
|
||||
stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); });
|
||||
events_updated_ = false;
|
||||
stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); });
|
||||
if (exit_) break;
|
||||
|
||||
Event cur_event{cur_which, cur_mono_time_, {}};
|
||||
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
|
||||
if (eit == events_->end()) {
|
||||
Event event(cur_which, cur_mono_time_, {});
|
||||
auto first = std::upper_bound(events_.cbegin(), events_.cend(), event);
|
||||
if (first == events_.cend()) {
|
||||
rInfo("waiting for events...");
|
||||
events_ready_ = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
uint64_t evt_start_ts = cur_mono_time_;
|
||||
uint64_t loop_start_ts = nanos_since_boot();
|
||||
auto it = publishEvents(first, events_.cend());
|
||||
|
||||
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
|
||||
const Event *evt = (*eit);
|
||||
cur_which = evt->which;
|
||||
cur_mono_time_ = evt->mono_time;
|
||||
setCurrentSegment(toSeconds(cur_mono_time_) / 60);
|
||||
|
||||
if (sockets_[cur_which] != nullptr) {
|
||||
// keep time
|
||||
long etime = (cur_mono_time_ - evt_start_ts) / speed_;
|
||||
long rtime = nanos_since_boot() - loop_start_ts;
|
||||
long behind_ns = etime - rtime;
|
||||
// if behind_ns is greater than 1 second, it means that an invalid segment is skipped by seeking/replaying
|
||||
if (behind_ns >= 1 * 1e9 || speed_ != prev_replay_speed) {
|
||||
// reset event start times
|
||||
evt_start_ts = cur_mono_time_;
|
||||
loop_start_ts = nanos_since_boot();
|
||||
prev_replay_speed = speed_;
|
||||
} else if (behind_ns > 0) {
|
||||
precise_nano_sleep(behind_ns);
|
||||
}
|
||||
|
||||
if (evt->eidx_segnum == -1) {
|
||||
publishMessage(evt);
|
||||
} else if (camera_server_) {
|
||||
if (speed_ > 1.0) {
|
||||
camera_server_->waitForSent();
|
||||
}
|
||||
publishFrame(evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
|
||||
// Ensure frames are sent before unlocking to prevent race conditions
|
||||
if (camera_server_) {
|
||||
camera_server_->waitForSent();
|
||||
}
|
||||
|
||||
if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
|
||||
int last_segment = segments_.empty() ? 0 : segments_.rbegin()->first;
|
||||
if (it != events_.cend()) {
|
||||
cur_which = it->which;
|
||||
} else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
|
||||
// Check for loop end and restart if necessary
|
||||
int last_segment = segments_.rbegin()->first;
|
||||
if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
|
||||
rInfo("reaches the end of route, restart from beginning");
|
||||
QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, 0, false), Qt::QueuedConnection);
|
||||
@@ -466,3 +446,48 @@ void Replay::stream() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
|
||||
std::vector<Event>::const_iterator last) {
|
||||
uint64_t evt_start_ts = cur_mono_time_;
|
||||
uint64_t loop_start_ts = nanos_since_boot();
|
||||
double prev_replay_speed = speed_;
|
||||
|
||||
for (; !paused_ && first != last; ++first) {
|
||||
const Event &evt = *first;
|
||||
int segment = toSeconds(evt.mono_time) / 60;
|
||||
|
||||
if (current_segment_ != segment) {
|
||||
current_segment_ = segment;
|
||||
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
|
||||
}
|
||||
|
||||
// Skip events if socket is not present
|
||||
if (!sockets_[evt.which]) continue;
|
||||
|
||||
int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (nanos_since_boot() - loop_start_ts);
|
||||
// if time_diff is greater than 1 second, it means that an invalid segment is skipped
|
||||
if (time_diff >= 1e9 || speed_ != prev_replay_speed) {
|
||||
// reset event start times
|
||||
evt_start_ts = evt.mono_time;
|
||||
loop_start_ts = nanos_since_boot();
|
||||
prev_replay_speed = speed_;
|
||||
} else if (time_diff > 0) {
|
||||
precise_nano_sleep(time_diff);
|
||||
}
|
||||
|
||||
if (paused_) break;
|
||||
|
||||
cur_mono_time_ = evt.mono_time;
|
||||
if (evt.eidx_segnum == -1) {
|
||||
publishMessage(&evt);
|
||||
} else if (camera_server_) {
|
||||
if (speed_ > 1.0) {
|
||||
camera_server_->waitForSent();
|
||||
}
|
||||
publishFrame(&evt);
|
||||
}
|
||||
}
|
||||
|
||||
return first;
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
@@ -53,11 +54,10 @@ public:
|
||||
~Replay();
|
||||
bool load();
|
||||
void start(int seconds = 0);
|
||||
void stop();
|
||||
void pause(bool pause);
|
||||
void seekToFlag(FindFlag flag);
|
||||
void seekTo(double seconds, bool relative);
|
||||
inline bool isPaused() const { return paused_; }
|
||||
inline bool isPaused() const { return user_paused_; }
|
||||
// the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming.
|
||||
// the filter function must return true if the event should be filtered.
|
||||
// otherwise it must return false.
|
||||
@@ -79,7 +79,7 @@ public:
|
||||
inline int totalSeconds() const { return (!segments_.empty()) ? (segments_.rbegin()->first + 1) * 60 : 0; }
|
||||
inline void setSpeed(float speed) { speed_ = speed; }
|
||||
inline float getSpeed() const { return speed_; }
|
||||
inline const std::vector<Event *> *events() const { return events_.get(); }
|
||||
inline const std::vector<Event> *events() const { return &events_; }
|
||||
inline const std::map<int, std::unique_ptr<Segment>> &segments() const { return segments_; }
|
||||
inline const std::string &carFingerprint() const { return car_fingerprint_; }
|
||||
inline const std::vector<std::tuple<double, double, TimelineType>> getTimeline() {
|
||||
@@ -99,36 +99,37 @@ protected slots:
|
||||
protected:
|
||||
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
|
||||
std::optional<uint64_t> find(FindFlag flag);
|
||||
void pauseStreamThread();
|
||||
void startStream(const Segment *cur_segment);
|
||||
void stream();
|
||||
void setCurrentSegment(int n);
|
||||
void queueSegment();
|
||||
void streamThread();
|
||||
void updateSegmentsCache();
|
||||
void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end);
|
||||
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
|
||||
void updateEvents(const std::function<bool()>& lambda);
|
||||
void updateEvents(const std::function<bool()>& update_events_function);
|
||||
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
|
||||
std::vector<Event>::const_iterator last);
|
||||
void publishMessage(const Event *e);
|
||||
void publishFrame(const Event *e);
|
||||
void buildTimeline();
|
||||
inline bool isSegmentMerged(int n) {
|
||||
return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
|
||||
}
|
||||
inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; }
|
||||
|
||||
pthread_t stream_thread_id = 0;
|
||||
QThread *stream_thread_ = nullptr;
|
||||
std::mutex stream_lock_;
|
||||
bool user_paused_ = false;
|
||||
std::condition_variable stream_cv_;
|
||||
std::atomic<bool> updating_events_ = false;
|
||||
std::atomic<int> current_segment_ = 0;
|
||||
double seeking_to_seconds_ = -1;
|
||||
SegmentMap segments_;
|
||||
// the following variables must be protected with stream_lock_
|
||||
std::atomic<bool> exit_ = false;
|
||||
bool paused_ = false;
|
||||
bool events_updated_ = false;
|
||||
std::atomic<bool> paused_ = false;
|
||||
bool events_ready_ = false;
|
||||
QDateTime route_date_time_;
|
||||
uint64_t route_start_ts_ = 0;
|
||||
std::atomic<uint64_t> cur_mono_time_ = 0;
|
||||
std::unique_ptr<std::vector<Event *>> events_;
|
||||
std::unique_ptr<std::vector<Event *>> new_events_;
|
||||
std::vector<int> segments_merged_;
|
||||
std::vector<Event> events_;
|
||||
std::set<int> merged_segments_;
|
||||
|
||||
// messaging
|
||||
SubMaster *sm = nullptr;
|
||||
|
||||
@@ -77,7 +77,7 @@ bool Route::loadFromServer(int retries) {
|
||||
return false;
|
||||
}
|
||||
rWarning("Retrying %d/%d", i, retries);
|
||||
util::sleep_for(500);
|
||||
util::sleep_for(3000);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include <QDebug>
|
||||
#include <QEventLoop>
|
||||
|
||||
#include "catch2/catch.hpp"
|
||||
@@ -67,7 +66,7 @@ TEST_CASE("LogReader") {
|
||||
corrupt_content.resize(corrupt_content.length() / 2);
|
||||
corrupt_content = decompressBZ2(corrupt_content);
|
||||
LogReader log;
|
||||
REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size()));
|
||||
REQUIRE(log.load(corrupt_content.data(), corrupt_content.size()));
|
||||
REQUIRE(log.events.size() > 0);
|
||||
}
|
||||
}
|
||||
@@ -88,7 +87,7 @@ void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
|
||||
|
||||
// test LogReader & FrameReader
|
||||
REQUIRE(segment.log->events.size() > 0);
|
||||
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan()));
|
||||
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end()));
|
||||
|
||||
for (auto cam : ALL_CAMERAS) {
|
||||
auto &fr = segment.frames[cam];
|
||||
@@ -158,63 +157,20 @@ TEST_CASE("Remote route") {
|
||||
}
|
||||
}
|
||||
|
||||
// helper class for unit tests
|
||||
class TestReplay : public Replay {
|
||||
public:
|
||||
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {}
|
||||
void test_seek();
|
||||
void testSeekTo(int seek_to);
|
||||
};
|
||||
|
||||
void TestReplay::testSeekTo(int seek_to) {
|
||||
seekTo(seek_to, false);
|
||||
|
||||
while (true) {
|
||||
std::unique_lock lk(stream_lock_);
|
||||
stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
|
||||
events_updated_ = false;
|
||||
if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) {
|
||||
// wake up by the previous merging, skip it.
|
||||
continue;
|
||||
}
|
||||
|
||||
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_, {});
|
||||
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
|
||||
if (eit == events_->end()) {
|
||||
qDebug() << "waiting for events...";
|
||||
continue;
|
||||
}
|
||||
|
||||
REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan()));
|
||||
const int seek_to_segment = seek_to / 60;
|
||||
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
|
||||
current_segment_ = event_seconds / 60;
|
||||
INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]");
|
||||
REQUIRE(event_seconds >= seek_to);
|
||||
if (event_seconds > seek_to) {
|
||||
auto it = segments_.lower_bound(seek_to_segment);
|
||||
REQUIRE(it->first == current_segment_);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void TestReplay::test_seek() {
|
||||
// create a dummy stream thread
|
||||
stream_thread_ = new QThread(this);
|
||||
TEST_CASE("seek_to") {
|
||||
QEventLoop loop;
|
||||
std::thread thread = std::thread([&]() {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
testSeekTo(util::random_int(0, 2 * 60));
|
||||
}
|
||||
int seek_to = util::random_int(0, 2 * 59);
|
||||
Replay replay(DEMO_ROUTE, {}, {}, nullptr, REPLAY_FLAG_NO_VIPC);
|
||||
|
||||
QObject::connect(&replay, &Replay::seekedTo, [&](double sec) {
|
||||
INFO("seek to " << seek_to << "s seeked to" << sec);
|
||||
REQUIRE(sec >= seek_to);
|
||||
loop.quit();
|
||||
});
|
||||
loop.exec();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
TEST_CASE("Replay") {
|
||||
TestReplay replay(DEMO_ROUTE);
|
||||
REQUIRE(replay.load());
|
||||
replay.test_seek();
|
||||
replay.start();
|
||||
replay.seekTo(seek_to, false);
|
||||
|
||||
loop.exec();
|
||||
}
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
#include <curl/curl.h>
|
||||
#include <openssl/sha.h>
|
||||
|
||||
#include <cstdarg>
|
||||
#include <cstring>
|
||||
#include <cassert>
|
||||
#include <cmath>
|
||||
#include <cstdarg>
|
||||
#include <cstring>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
@@ -158,7 +158,10 @@ size_t getRemoteFileSize(const std::string &url, std::atomic<bool> *abort) {
|
||||
int still_running = 1;
|
||||
while (still_running > 0 && !(abort && *abort)) {
|
||||
CURLMcode mc = curl_multi_perform(cm, &still_running);
|
||||
if (!mc) curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
if (mc != CURLM_OK) break;
|
||||
if (still_running > 0) {
|
||||
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
double content_length = -1;
|
||||
@@ -208,10 +211,20 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont
|
||||
}
|
||||
|
||||
int still_running = 1;
|
||||
size_t prev_written = 0;
|
||||
while (still_running > 0 && !(abort && *abort)) {
|
||||
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
curl_multi_perform(cm, &still_running);
|
||||
download_stats.update(url, written);
|
||||
CURLMcode mc = curl_multi_perform(cm, &still_running);
|
||||
if (mc != CURLM_OK) {
|
||||
break;
|
||||
}
|
||||
if (still_running > 0) {
|
||||
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
}
|
||||
|
||||
if (((written - prev_written) / (double)content_length) >= 0.01) {
|
||||
download_stats.update(url, written);
|
||||
prev_written = written;
|
||||
}
|
||||
}
|
||||
|
||||
CURLMsg *msg;
|
||||
@@ -304,9 +317,11 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
|
||||
return {};
|
||||
}
|
||||
|
||||
void precise_nano_sleep(long sleep_ns) {
|
||||
struct timespec req = {.tv_sec = 0, .tv_nsec = sleep_ns};
|
||||
struct timespec rem = {};
|
||||
void precise_nano_sleep(int64_t nanoseconds) {
|
||||
struct timespec req, rem;
|
||||
|
||||
req.tv_sec = nanoseconds / 1e9;
|
||||
req.tv_nsec = nanoseconds % (int64_t)1e9;
|
||||
while (clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem) && errno == EINTR) {
|
||||
// Retry sleep if interrupted by a signal
|
||||
req = rem;
|
||||
|
||||
@@ -21,7 +21,7 @@ void logMessage(ReplyMsgType type, const char* fmt, ...);
|
||||
#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__)
|
||||
|
||||
std::string sha256(const std::string &str);
|
||||
void precise_nano_sleep(long sleep_ns);
|
||||
void precise_nano_sleep(int64_t nanoseconds);
|
||||
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
|
||||
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
|
||||
std::string getUrlWithoutQuery(const std::string &url);
|
||||
|
||||
Reference in New Issue
Block a user