mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 12:23:53 +08:00
replay: improve segment downloading (#22504)
* improve segment downloading dd * continue * log retrying * check aborting_ in loop * std::endl * log download information * cleanup * continue * dd * move download to seperate funciton * simplify logging * continue * guard ts * cleanup * typo
This commit is contained in:
@@ -160,9 +160,12 @@ void Replay::queueSegment() {
|
||||
}
|
||||
|
||||
// start stream thread
|
||||
if (stream_thread_ == nullptr && cur != segments_.end() && cur->second->isLoaded()) {
|
||||
bool current_segment_loaded = (cur != segments_.end() && cur->second->isLoaded());
|
||||
if (stream_thread_ == nullptr && current_segment_loaded) {
|
||||
startStream(cur->second.get());
|
||||
}
|
||||
|
||||
enableHttpLogging(!current_segment_loaded);
|
||||
}
|
||||
|
||||
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
|
||||
|
||||
@@ -126,8 +126,7 @@ void Segment::loadFile(int id, const std::string file) {
|
||||
bool file_ready = util::file_exists(local_file);
|
||||
|
||||
if (!file_ready && is_remote) {
|
||||
// TODO: retry on failure
|
||||
file_ready = httpMultiPartDownload(file, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
|
||||
file_ready = downloadFile(id, file, local_file);
|
||||
}
|
||||
|
||||
if (!aborting_ && file_ready) {
|
||||
@@ -150,6 +149,22 @@ void Segment::loadFile(int id, const std::string file) {
|
||||
}
|
||||
}
|
||||
|
||||
bool Segment::downloadFile(int id, const std::string &url, const std::string local_file) {
|
||||
bool ret = false;
|
||||
int retries = 0;
|
||||
while (!aborting_) {
|
||||
ret = httpMultiPartDownload(url, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
|
||||
if (ret || aborting_) break;
|
||||
|
||||
if (++retries > max_retries_) {
|
||||
qInfo() << "download failed after retries" << max_retries_;
|
||||
break;
|
||||
}
|
||||
qInfo() << "download failed, retrying" << retries;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string Segment::cacheFilePath(const std::string &file) {
|
||||
QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery);
|
||||
QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex();
|
||||
|
||||
@@ -53,9 +53,11 @@ signals:
|
||||
|
||||
protected:
|
||||
void loadFile(int id, const std::string file);
|
||||
bool downloadFile(int id, const std::string &url, const std::string local_file);
|
||||
std::string cacheFilePath(const std::string &file);
|
||||
|
||||
std::atomic<bool> success_ = true, aborting_ = false;
|
||||
std::atomic<int> loading_ = 0;
|
||||
std::list<QThread*> loading_threads_;
|
||||
std::vector<QThread*> loading_threads_;
|
||||
const int max_retries_ = 3;
|
||||
};
|
||||
|
||||
@@ -13,20 +13,19 @@ std::string sha_256(const QString &dat) {
|
||||
|
||||
TEST_CASE("httpMultiPartDownload") {
|
||||
char filename[] = "/tmp/XXXXXX";
|
||||
int fd = mkstemp(filename);
|
||||
close(fd);
|
||||
close(mkstemp(filename));
|
||||
|
||||
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
|
||||
SECTION("http 200") {
|
||||
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
|
||||
SECTION("5 connections") {
|
||||
REQUIRE(httpMultiPartDownload(stream_url, filename, 5));
|
||||
std::string content = util::read_file(filename);
|
||||
REQUIRE(content.size() == 37495242);
|
||||
std::string checksum = sha_256(QString::fromStdString(content));
|
||||
REQUIRE(checksum == "d8ff81560ce7ed6f16d5fb5a6d6dd13aba06c8080c62cfe768327914318744c4");
|
||||
}
|
||||
SECTION("http 404") {
|
||||
REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false);
|
||||
SECTION("1 connection") {
|
||||
REQUIRE(httpMultiPartDownload(stream_url, filename, 1));
|
||||
}
|
||||
std::string content = util::read_file(filename);
|
||||
REQUIRE(content.size() == 9112651);
|
||||
std::string checksum = sha_256(QString::fromStdString(content));
|
||||
REQUIRE(checksum == "e44edfbb545abdddfd17020ced2b18b6ec36506152267f32b6a8e3341f8126d6");
|
||||
}
|
||||
|
||||
int random_int(int min, int max) {
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
|
||||
#include <bzlib.h>
|
||||
#include <curl/curl.h>
|
||||
|
||||
@@ -16,21 +20,26 @@ struct CURLGlobalInitializer {
|
||||
struct MultiPartWriter {
|
||||
int64_t offset;
|
||||
int64_t end;
|
||||
int64_t written;
|
||||
FILE *fp;
|
||||
};
|
||||
|
||||
static size_t write_cb(char *data, size_t n, size_t l, void *userp) {
|
||||
static size_t write_cb(char *data, size_t size, size_t count, void *userp) {
|
||||
MultiPartWriter *w = (MultiPartWriter *)userp;
|
||||
fseek(w->fp, w->offset, SEEK_SET);
|
||||
fwrite(data, l, n, w->fp);
|
||||
w->offset += n * l;
|
||||
return n * l;
|
||||
fwrite(data, size, count, w->fp);
|
||||
size_t bytes = size * count;
|
||||
w->offset += bytes;
|
||||
w->written += bytes;
|
||||
return bytes;
|
||||
}
|
||||
|
||||
static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; }
|
||||
static size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
|
||||
|
||||
int64_t getDownloadContentLength(const std::string &url) {
|
||||
int64_t getRemoteFileSize(const std::string &url) {
|
||||
CURL *curl = curl_easy_init();
|
||||
if (!curl) return -1;
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb);
|
||||
curl_easy_setopt(curl, CURLOPT_HEADER, 1);
|
||||
@@ -39,25 +48,47 @@ int64_t getDownloadContentLength(const std::string &url) {
|
||||
double content_length = -1;
|
||||
if (res == CURLE_OK) {
|
||||
res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length);
|
||||
} else {
|
||||
std::cout << "Download failed: error code: " << res << std::endl;
|
||||
}
|
||||
curl_easy_cleanup(curl);
|
||||
return res == CURLE_OK ? (int64_t)content_length : -1;
|
||||
}
|
||||
|
||||
std::string formattedDataSize(size_t size) {
|
||||
if (size < 1024) {
|
||||
return std::to_string(size) + " B";
|
||||
} else if (size < 1024 * 1024) {
|
||||
return util::string_format("%.2f KB", (float)size / 1024);
|
||||
} else {
|
||||
return util::string_format("%.2f MB", (float)size / (1024 * 1024));
|
||||
}
|
||||
}
|
||||
|
||||
static std::atomic<bool> enable_http_logging = false;
|
||||
|
||||
void enableHttpLogging(bool enable) {
|
||||
enable_http_logging = enable;
|
||||
}
|
||||
|
||||
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
|
||||
static CURLGlobalInitializer curl_initializer;
|
||||
static std::mutex lock;
|
||||
static uint64_t total_written = 0, prev_total_written = 0;
|
||||
static double last_print_ts = 0;
|
||||
|
||||
int64_t content_length = getDownloadContentLength(url);
|
||||
if (content_length == -1) return false;
|
||||
int64_t content_length = getRemoteFileSize(url);
|
||||
if (content_length <= 0) return false;
|
||||
|
||||
// create a tmp sparse file
|
||||
std::string tmp_file = target_file + ".tmp";
|
||||
const std::string tmp_file = target_file + ".tmp";
|
||||
FILE *fp = fopen(tmp_file.c_str(), "wb");
|
||||
assert(fp);
|
||||
fseek(fp, content_length - 1, SEEK_SET);
|
||||
fwrite("\0", 1, 1, fp);
|
||||
|
||||
CURLM *cm = curl_multi_init();
|
||||
|
||||
std::map<CURL *, MultiPartWriter> writers;
|
||||
const int part_size = content_length / parts;
|
||||
for (int i = 0; i < parts; ++i) {
|
||||
@@ -74,44 +105,63 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
|
||||
curl_easy_setopt(eh, CURLOPT_HTTPGET, 1);
|
||||
curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1);
|
||||
curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1);
|
||||
|
||||
curl_multi_add_handle(cm, eh);
|
||||
}
|
||||
|
||||
int running = 1, success_cnt = 0;
|
||||
while (!(abort && abort->load())) {
|
||||
CURLMcode ret = curl_multi_perform(cm, &running);
|
||||
int still_running = 1;
|
||||
size_t prev_written = 0;
|
||||
while (still_running > 0 && !(abort && *abort)) {
|
||||
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
curl_multi_perform(cm, &still_running);
|
||||
|
||||
if (!running) {
|
||||
CURLMsg *msg;
|
||||
int msgs_left = -1;
|
||||
while ((msg = curl_multi_info_read(cm, &msgs_left))) {
|
||||
if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) {
|
||||
int http_status_code = 0;
|
||||
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code);
|
||||
success_cnt += (http_status_code == 206);
|
||||
}
|
||||
size_t written = std::accumulate(writers.begin(), writers.end(), 0, [=](int v, auto &w) { return v + w.second.written; });
|
||||
int cur_written = written - prev_written;
|
||||
prev_written = written;
|
||||
|
||||
std::lock_guard lk(lock);
|
||||
double ts = millis_since_boot();
|
||||
total_written += cur_written;
|
||||
if ((ts - last_print_ts) > 2 * 1000) {
|
||||
if (enable_http_logging && last_print_ts > 0) {
|
||||
size_t average = (total_written - prev_total_written) / ((ts - last_print_ts) / 1000.);
|
||||
std::cout << "downloading segments at " << formattedDataSize(average) << "/S" << std::endl;
|
||||
}
|
||||
prev_total_written = total_written;
|
||||
last_print_ts = ts;
|
||||
}
|
||||
}
|
||||
|
||||
CURLMsg *msg;
|
||||
int msgs_left = -1;
|
||||
int complete = 0;
|
||||
while ((msg = curl_multi_info_read(cm, &msgs_left)) && !(abort && *abort)) {
|
||||
if (msg->msg == CURLMSG_DONE) {
|
||||
if (msg->data.result == CURLE_OK) {
|
||||
long res_status = 0;
|
||||
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &res_status);
|
||||
if (res_status == 206) {
|
||||
complete++;
|
||||
} else {
|
||||
std::cout << "Download failed: http error code: " << res_status << std::endl;
|
||||
}
|
||||
} else {
|
||||
std::cout << "Download failed: connection failure: " << msg->data.result << std::endl;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret == CURLM_OK) {
|
||||
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
fclose(fp);
|
||||
bool success = success_cnt == parts;
|
||||
if (success) {
|
||||
success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
|
||||
}
|
||||
|
||||
// cleanup curl
|
||||
for (auto &[e, w] : writers) {
|
||||
curl_multi_remove_handle(cm, e);
|
||||
curl_easy_cleanup(e);
|
||||
}
|
||||
|
||||
curl_multi_cleanup(cm);
|
||||
return success;
|
||||
fclose(fp);
|
||||
|
||||
bool ret = complete == parts;
|
||||
ret = ret && ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool readBZ2File(const std::string_view file, std::ostream &stream) {
|
||||
@@ -147,7 +197,7 @@ void precise_nano_sleep(long sleep_ns) {
|
||||
}
|
||||
// spin wait
|
||||
if (sleep_ns > 0) {
|
||||
while ((nanos_since_boot() - start_sleep) <= sleep_ns) {
|
||||
while ((nanos_since_boot() - start_sleep) <= sleep_ns) {
|
||||
usleep(0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
#include <atomic>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
void precise_nano_sleep(long sleep_ns);
|
||||
bool readBZ2File(const std::string_view file, std::ostream &stream);
|
||||
void enableHttpLogging(bool enable);
|
||||
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);
|
||||
|
||||
Reference in New Issue
Block a user