mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-27 22:23:54 +08:00
cabana: fix wrong total seconds in replay mode (#28015)
* fix total_sec * fix chart max range * fix all_evnets order & use vector to improve performance * dynamic updating max time * get real time from qlog * Update tools/cabana/streams/livestream.cc Co-authored-by: Shane Smiskol <shane@smiskol.com> --------- Co-authored-by: Shane Smiskol <shane@smiskol.com>
This commit is contained in:
@@ -188,7 +188,7 @@ void ChartsWidget::updateState() {
|
||||
if (pos < 0 || pos > 0.8) {
|
||||
display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1);
|
||||
}
|
||||
double max_sec = std::min(std::floor(display_range.first + max_chart_range), can->lastEventSecond());
|
||||
double max_sec = std::min(std::floor(display_range.first + max_chart_range), can->totalSeconds());
|
||||
display_range.first = std::max(0.0, max_sec - max_chart_range);
|
||||
display_range.second = display_range.first + max_chart_range;
|
||||
} else if (cur_sec < (zoomed_range.first - 0.1) || cur_sec >= zoomed_range.second) {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#include "tools/cabana/streams/abstractstream.h"
|
||||
|
||||
#include <QTimer>
|
||||
|
||||
AbstractStream *can = nullptr;
|
||||
@@ -26,7 +27,7 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
|
||||
}
|
||||
|
||||
void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) {
|
||||
all_msgs[id].compute((const char*)data, size, sec, getSpeed());
|
||||
all_msgs[id].compute((const char *)data, size, sec, getSpeed());
|
||||
if (!new_msgs->contains(id)) {
|
||||
new_msgs->insert(id, {});
|
||||
}
|
||||
@@ -82,18 +83,23 @@ void AbstractStream::updateLastMsgsTo(double sec) {
|
||||
});
|
||||
}
|
||||
|
||||
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<const CanEvent *>> &msgs,
|
||||
std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
|
||||
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
|
||||
size_t memory_size = 0;
|
||||
size_t events_cnt = 0;
|
||||
for (auto it = first; it != last; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
memory_size += sizeof(CanEvent) + sizeof(uint8_t) * c.getDat().size();
|
||||
++events_cnt;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (memory_size == 0) return;
|
||||
|
||||
char *ptr = memory_blocks.emplace_back(new char[memory_size]).get();
|
||||
std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events_map;
|
||||
std::vector<const CanEvent *> new_events;
|
||||
new_events.reserve(events_cnt);
|
||||
for (auto it = first; it != last; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
uint64_t ts = (*it)->mono_time;
|
||||
@@ -106,31 +112,28 @@ void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<const
|
||||
e->size = dat.size();
|
||||
memcpy(e->dat, (uint8_t *)dat.begin(), e->size);
|
||||
|
||||
msgs[{.source = e->src, .address = e->address}].push_back(e);
|
||||
all_events_.push_back(e);
|
||||
new_events_map[{.source = e->src, .address = e->address}].push_back(e);
|
||||
new_events.push_back(e);
|
||||
ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) {
|
||||
if (first == last) return;
|
||||
|
||||
if (append) {
|
||||
parseEvents(events_, first, last);
|
||||
} else {
|
||||
std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events;
|
||||
parseEvents(new_events, first, last);
|
||||
for (auto &[id, new_e] : new_events) {
|
||||
auto &e = events_[id];
|
||||
auto it = std::upper_bound(e.cbegin(), e.cend(), new_e.front(), [](const CanEvent *l, const CanEvent *r) {
|
||||
return l->mono_time < r->mono_time;
|
||||
});
|
||||
e.insert(it, new_e.cbegin(), new_e.cend());
|
||||
}
|
||||
bool append = new_events.front()->mono_time > lastest_event_ts;
|
||||
for (auto &[id, new_e] : new_events_map) {
|
||||
auto &e = events_[id];
|
||||
auto pos = append ? e.end() : std::upper_bound(e.cbegin(), e.cend(), new_e.front(), [](const CanEvent *l, const CanEvent *r) {
|
||||
return l->mono_time < r->mono_time;
|
||||
});
|
||||
e.insert(pos, new_e.cbegin(), new_e.cend());
|
||||
}
|
||||
total_sec = (all_events_.back()->mono_time - all_events_.front()->mono_time) / 1e9;
|
||||
|
||||
auto pos = append ? all_events_.end() : std::upper_bound(all_events_.begin(), all_events_.end(), new_events.front(), [](auto l, auto r) {
|
||||
return l->mono_time < r->mono_time;
|
||||
});
|
||||
all_events_.insert(pos, new_events.cbegin(), new_events.cend());
|
||||
|
||||
lastest_event_ts = all_events_.back()->mono_time;
|
||||
emit eventsMerged();
|
||||
}
|
||||
|
||||
|
||||
@@ -41,13 +41,12 @@ public:
|
||||
AbstractStream(QObject *parent);
|
||||
virtual ~AbstractStream() {};
|
||||
inline bool liveStreaming() const { return route() == nullptr; }
|
||||
inline double lastEventSecond() const { return lastEventMonoTime() / 1e9 - routeStartTime(); }
|
||||
virtual void seekTo(double ts) {}
|
||||
virtual QString routeName() const = 0;
|
||||
virtual QString carFingerprint() const { return ""; }
|
||||
virtual double routeStartTime() const { return 0; }
|
||||
virtual double currentSec() const = 0;
|
||||
double totalSeconds() const { return total_sec; }
|
||||
virtual double totalSeconds() const { return lastEventMonoTime() / 1e9 - routeStartTime(); }
|
||||
const CanData &lastMessage(const MessageId &id);
|
||||
virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; }
|
||||
virtual const Route *route() const { return nullptr; }
|
||||
@@ -55,8 +54,8 @@ public:
|
||||
virtual double getSpeed() { return 1; }
|
||||
virtual bool isPaused() const { return false; }
|
||||
virtual void pause(bool pause) {}
|
||||
const std::deque<const CanEvent *> &allEvents() const { return all_events_; }
|
||||
const std::deque<const CanEvent *> &events(const MessageId &id) const { return events_.at(id); }
|
||||
const std::vector<const CanEvent *> &allEvents() const { return all_events_; }
|
||||
const std::vector<const CanEvent *> &events(const MessageId &id) const { return events_.at(id); }
|
||||
virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; }
|
||||
|
||||
signals:
|
||||
@@ -74,20 +73,19 @@ public:
|
||||
SourceSet sources;
|
||||
|
||||
protected:
|
||||
void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append);
|
||||
void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
|
||||
bool postEvents();
|
||||
uint64_t lastEventMonoTime() const { return all_events_.empty() ? 0 : all_events_.back()->mono_time; }
|
||||
uint64_t lastEventMonoTime() const { return lastest_event_ts; }
|
||||
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
|
||||
void updateMessages(QHash<MessageId, CanData> *);
|
||||
void parseEvents(std::unordered_map<MessageId, std::deque<const CanEvent *>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
|
||||
void updateLastMsgsTo(double sec);
|
||||
|
||||
double total_sec = 0;
|
||||
uint64_t lastest_event_ts = 0;
|
||||
std::atomic<bool> processing = false;
|
||||
std::unique_ptr<QHash<MessageId, CanData>> new_msgs;
|
||||
QHash<MessageId, CanData> all_msgs;
|
||||
std::unordered_map<MessageId, std::deque<const CanEvent *>> events_;
|
||||
std::deque<const CanEvent *> all_events_;
|
||||
std::unordered_map<MessageId, std::vector<const CanEvent *>> events_;
|
||||
std::vector<const CanEvent *> all_events_;
|
||||
std::deque<std::unique_ptr<char[]>> memory_blocks;
|
||||
};
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ void LiveStream::timerEvent(QTimerEvent *event) {
|
||||
{
|
||||
// merge events received from live stream thread.
|
||||
std::lock_guard lk(lock);
|
||||
mergeEvents(receivedEvents.cbegin(), receivedEvents.cend(), true);
|
||||
mergeEvents(receivedEvents.cbegin(), receivedEvents.cend());
|
||||
receivedEvents.clear();
|
||||
receivedMessages.clear();
|
||||
}
|
||||
@@ -67,7 +67,6 @@ void LiveStream::timerEvent(QTimerEvent *event) {
|
||||
|
||||
void LiveStream::updateEvents() {
|
||||
static double prev_speed = 1.0;
|
||||
static uint64_t prev_newest_event_ts = all_events_.back()->mono_time;
|
||||
|
||||
if (first_update_ts == 0) {
|
||||
first_update_ts = nanos_since_boot();
|
||||
@@ -82,16 +81,13 @@ void LiveStream::updateEvents() {
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t last_event_ts = all_events_.back()->mono_time;
|
||||
bool at_the_end = current_event_ts == prev_newest_event_ts;
|
||||
if (!at_the_end) {
|
||||
last_event_ts = first_event_ts + (nanos_since_boot() - first_update_ts) * speed_;
|
||||
}
|
||||
|
||||
uint64_t last_ts = post_last_event && speed_ == 1.0
|
||||
? all_events_.back()->mono_time
|
||||
: first_event_ts + (nanos_since_boot() - first_update_ts) * speed_;
|
||||
auto first = std::upper_bound(all_events_.cbegin(), all_events_.cend(), current_event_ts, [](uint64_t ts, auto e) {
|
||||
return ts < e->mono_time;
|
||||
});
|
||||
auto last = std::upper_bound(first, all_events_.cend(), last_event_ts, [](uint64_t ts, auto e) {
|
||||
auto last = std::upper_bound(first, all_events_.cend(), last_ts, [](uint64_t ts, auto e) {
|
||||
return ts < e->mono_time;
|
||||
});
|
||||
|
||||
@@ -102,14 +98,13 @@ void LiveStream::updateEvents() {
|
||||
current_event_ts = e->mono_time;
|
||||
}
|
||||
postEvents();
|
||||
prev_newest_event_ts = all_events_.back()->mono_time;
|
||||
}
|
||||
|
||||
void LiveStream::seekTo(double sec) {
|
||||
sec = std::max(0.0, sec);
|
||||
first_update_ts = nanos_since_boot();
|
||||
first_event_ts = std::min<uint64_t>(sec * 1e9 + begin_event_ts, lastEventMonoTime());
|
||||
current_event_ts = first_event_ts;
|
||||
current_event_ts = first_event_ts = std::min<uint64_t>(sec * 1e9 + begin_event_ts, lastEventMonoTime());
|
||||
post_last_event = (first_event_ts == lastEventMonoTime());
|
||||
emit seekedTo((current_event_ts - begin_event_ts) / 1e9);
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ private:
|
||||
uint64_t current_event_ts = 0;
|
||||
uint64_t first_event_ts = 0;
|
||||
uint64_t first_update_ts = 0;
|
||||
bool post_last_event = true;
|
||||
double speed_ = 1;
|
||||
bool paused_ = false;
|
||||
};
|
||||
|
||||
@@ -25,10 +25,9 @@ static bool event_filter(const Event *e, void *opaque) {
|
||||
void ReplayStream::mergeSegments() {
|
||||
for (auto &[n, seg] : replay->segments()) {
|
||||
if (seg && seg->isLoaded() && !processed_segments.count(n)) {
|
||||
const auto &events = seg->log->events;
|
||||
bool append = processed_segments.empty() || *processed_segments.rbegin() < n;
|
||||
processed_segments.insert(n);
|
||||
mergeEvents(events.cbegin(), events.cend(), append);
|
||||
const auto &events = seg->log->events;
|
||||
mergeEvents(events.cbegin(), events.cend());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ public:
|
||||
void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); };
|
||||
inline QString routeName() const override { return replay->route()->name(); }
|
||||
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
|
||||
double totalSeconds() const override { return replay->totalSeconds(); }
|
||||
inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; }
|
||||
inline double routeStartTime() const override { return replay->routeStartTime() / (double)1e9; }
|
||||
inline double currentSec() const override { return replay->currentSeconds(); }
|
||||
|
||||
@@ -113,21 +113,25 @@ QWidget *VideoWidget::createCameraWidget() {
|
||||
l->addLayout(slider_layout);
|
||||
QObject::connect(slider, &QSlider::sliderReleased, [this]() { can->seekTo(slider->value() / 1000.0); });
|
||||
QObject::connect(slider, &QSlider::valueChanged, [=](int value) { time_label->setText(utils::formatSeconds(value / 1000)); });
|
||||
QObject::connect(slider, &Slider::updateMaximumTime, this, &VideoWidget::setMaximumTime);
|
||||
QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); });
|
||||
QObject::connect(can, &AbstractStream::updated, this, &VideoWidget::updateState);
|
||||
QObject::connect(can, &AbstractStream::eventsMerged, [this]() {
|
||||
end_time_label->setText(utils::formatSeconds(can->totalSeconds()));
|
||||
slider->setRange(0, can->totalSeconds() * 1000);
|
||||
});
|
||||
QObject::connect(can, &AbstractStream::streamStarted, [this]() { setMaximumTime(can->totalSeconds()); });
|
||||
return w;
|
||||
}
|
||||
|
||||
void VideoWidget::setMaximumTime(double sec) {
|
||||
maximum_time = sec;
|
||||
end_time_label->setText(utils::formatSeconds(sec));
|
||||
slider->setRange(0, sec * 1000);
|
||||
}
|
||||
|
||||
void VideoWidget::rangeChanged(double min, double max, bool is_zoomed) {
|
||||
if (can->liveStreaming()) return;
|
||||
|
||||
if (!is_zoomed) {
|
||||
min = 0;
|
||||
max = can->totalSeconds();
|
||||
max = maximum_time;
|
||||
}
|
||||
end_time_label->setText(utils::formatSeconds(max));
|
||||
slider->setRange(min * 1000, max * 1000);
|
||||
@@ -180,10 +184,15 @@ void Slider::streamStarted() {
|
||||
|
||||
void Slider::loadThumbnails() {
|
||||
const auto &segments = can->route()->segments();
|
||||
double max_time = 0;
|
||||
for (auto it = segments.rbegin(); it != segments.rend() && !abort_load_thumbnail; ++it) {
|
||||
LogReader log;
|
||||
std::string qlog = it->second.qlog.toStdString();
|
||||
if (!qlog.empty() && log.load(qlog, &abort_load_thumbnail, {cereal::Event::Which::THUMBNAIL, cereal::Event::Which::CONTROLS_STATE}, true, 0, 3)) {
|
||||
if (max_time == 0 && !log.events.empty()) {
|
||||
max_time = (*(log.events.rbegin()))->mono_time / 1e9 - can->routeStartTime();
|
||||
emit updateMaximumTime(max_time);
|
||||
}
|
||||
for (auto ev = log.events.cbegin(); ev != log.events.cend() && !abort_load_thumbnail; ++ev) {
|
||||
if ((*ev)->which == cereal::Event::Which::THUMBNAIL) {
|
||||
auto thumb = (*ev)->event.getThumbnail();
|
||||
|
||||
@@ -37,6 +37,9 @@ public:
|
||||
Slider(QWidget *parent);
|
||||
~Slider();
|
||||
|
||||
signals:
|
||||
void updateMaximumTime(double);
|
||||
|
||||
private:
|
||||
void mousePressEvent(QMouseEvent *e) override;
|
||||
void mouseMoveEvent(QMouseEvent *e) override;
|
||||
@@ -46,6 +49,7 @@ private:
|
||||
void streamStarted();
|
||||
void loadThumbnails();
|
||||
|
||||
double max_sec = 0;
|
||||
int slider_x = -1;
|
||||
std::vector<std::tuple<int, int, TimelineType>> timeline;
|
||||
std::mutex thumbnail_lock;
|
||||
@@ -64,6 +68,7 @@ class VideoWidget : public QFrame {
|
||||
public:
|
||||
VideoWidget(QWidget *parnet = nullptr);
|
||||
void rangeChanged(double min, double max, bool is_zommed);
|
||||
void setMaximumTime(double sec);
|
||||
|
||||
protected:
|
||||
void updateState();
|
||||
@@ -71,6 +76,7 @@ protected:
|
||||
QWidget *createCameraWidget();
|
||||
|
||||
CameraWidget *cam_widget;
|
||||
double maximum_time = 0;
|
||||
QLabel *end_time_label;
|
||||
QLabel *time_label;
|
||||
QHBoxLayout *slider_layout;
|
||||
|
||||
Reference in New Issue
Block a user