logreader: support reading from corrupt log (#23050)

* catch exception outside loop

* print decompress error

* add test case for corrupt log

* fix decompressbz2 stuck if log is corrupt

* recovered from corrupt data

* add output

* ass space

* std::endl

* override load(), load from buffer

* override FrameReader::load to load from the buffer

* replace NULL with nullptr

* fix test case for corrupt log

* Trigger Build

* check bzerror too

Co-authored-by: Willem Melching <willem.melching@gmail.com>
old-commit-hash: dbec761941
This commit is contained in:
Dean Lee 2021-11-29 21:10:24 +08:00 committed by GitHub
parent 3b8daea061
commit 71c282d42a
9 changed files with 85 additions and 34 deletions

View File

@ -23,7 +23,7 @@ std::string get_url(std::string route_name, const std::string &camera, int segme
}
void camera_init(VisionIpcServer *v, CameraState *s, int camera_id, unsigned int fps, cl_device_id device_id, cl_context ctx, VisionStreamType rgb_type, VisionStreamType yuv_type, const std::string &url) {
s->frame = new FrameReader(true);
s->frame = new FrameReader();
if (!s->frame->load(url)) {
printf("failed to load stream from %s", url.c_str());
assert(0);

View File

@ -34,8 +34,7 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat *
} // namespace
FrameReader::FrameReader(bool local_cache, int chunk_size, int retries) : FileReader(local_cache, chunk_size, retries) {
}
FrameReader::FrameReader() {}
FrameReader::~FrameReader() {
for (AVPacket *pkt : packets) {
@ -52,17 +51,22 @@ FrameReader::~FrameReader() {
}
}
bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic<bool> *abort) {
std::string content = read(url, abort);
if (content.empty()) return false;
bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
FileReader f(local_cache, chunk_size, retries);
std::string data = f.read(url, abort);
if (data.empty()) return false;
return load((std::byte *)data.data(), data.size(), no_cuda, abort);
}
bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::atomic<bool> *abort) {
input_ctx = avformat_alloc_context();
if (!input_ctx) return false;
struct buffer_data bd = {
.data = (uint8_t *)content.data(),
.data = (const uint8_t*)data,
.offset = 0,
.size = content.size(),
.size = size,
};
const int avio_ctx_buffer_size = 64 * 1024;
unsigned char *avio_ctx_buffer = (unsigned char *)av_malloc(avio_ctx_buffer_size);
@ -70,11 +74,11 @@ bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic<bool> *
input_ctx->pb = avio_ctx_;
input_ctx->probesize = 10 * 1024 * 1024; // 10MB
int ret = avformat_open_input(&input_ctx, url.c_str(), NULL, NULL);
int ret = avformat_open_input(&input_ctx, nullptr, nullptr, nullptr);
if (ret != 0) {
char err_str[1024] = {0};
av_strerror(ret, err_str, std::size(err_str));
printf("Error loading video - %s - %s\n", err_str, url.c_str());
printf("Error loading video - %s\n", err_str);
return false;
}
@ -103,7 +107,7 @@ bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic<bool> *
}
}
ret = avcodec_open2(decoder_ctx, decoder, NULL);
ret = avcodec_open2(decoder_ctx, decoder, nullptr);
if (ret < 0) return false;
packets.reserve(60 * 20); // 20fps, one minute

View File

@ -15,11 +15,12 @@ struct AVFrameDeleter {
void operator()(AVFrame* frame) const { av_frame_free(&frame); }
};
class FrameReader : protected FileReader {
class FrameReader {
public:
FrameReader(bool local_cache = false, int chunk_size = -1, int retries = 0);
FrameReader();
~FrameReader();
bool load(const std::string &url, bool no_cuda = false, std::atomic<bool> *abort = nullptr);
bool load(const std::string &url, bool no_cuda = false, std::atomic<bool> *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, bool no_cuda = false, std::atomic<bool> *abort = nullptr);
bool get(int idx, uint8_t *rgb, uint8_t *yuv);
int getRGBSize() const { return width * height * 3; }
int getYUVSize() const { return width * height * 3 / 2; }

View File

@ -1,6 +1,7 @@
#include "selfdrive/ui/replay/logreader.h"
#include <algorithm>
#include <iostream>
#include "selfdrive/ui/replay/util.h"
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
@ -26,7 +27,7 @@ Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(a
// class LogReader
LogReader::LogReader(bool local_cache, int chunk_size, int retries, size_t memory_pool_block_size) : FileReader(local_cache, chunk_size, retries) {
LogReader::LogReader(size_t memory_pool_block_size) {
#ifdef HAS_MEMORY_RESOURCE
const size_t buf_size = sizeof(Event) * memory_pool_block_size;
pool_buffer_ = ::operator new(buf_size);
@ -39,19 +40,32 @@ LogReader::~LogReader() {
for (Event *e : events) {
delete e;
}
#ifdef HAS_MEMORY_RESOURCE
delete mbr_;
::operator delete(pool_buffer_);
#endif
}
bool LogReader::load(const std::string &file, std::atomic<bool> *abort) {
raw_ = decompressBZ2(read(file, abort));
if (raw_.empty()) return false;
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
FileReader f(local_cache, chunk_size, retries);
std::string data = f.read(url, abort);
if (data.empty()) return false;
return load((std::byte*)data.data(), data.size(), abort);
}
bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
raw_ = decompressBZ2(data, size);
if (raw_.empty()) {
std::cout << "failed to decompress log" << std::endl;
return false;
}
try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0) {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0) {
try {
#ifdef HAS_MEMORY_RESOURCE
Event *evt = new (mbr_) Event(words);
#else
@ -62,20 +76,26 @@ bool LogReader::load(const std::string &file, std::atomic<bool> *abort) {
if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
#ifdef HAS_MEMORY_RESOURCE
Event *frame_evt = new (mbr_) Event(words, true);
#else
Event *frame_evt = new Event(words, true);
#endif
events.push_back(frame_evt);
}
words = kj::arrayPtr(evt->reader.getEnd(), words.end());
events.push_back(evt);
} catch (const kj::Exception &e) {
return false;
}
} catch (const kj::Exception &e) {
std::cout << "failed to parse log : " << e.getDescription().cStr() << std::endl;
if (events.empty()) return false;
std::cout << "read " << events.size() << " events from corrupt log" << std::endl;
}
std::sort(events.begin(), events.end(), Event::lessThan());
return true;
}

View File

@ -46,11 +46,12 @@ public:
bool frame;
};
class LogReader : protected FileReader {
class LogReader {
public:
LogReader(bool local_cache = false, int chunk_size = -1, int retries = 0, size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader();
bool load(const std::string &file, std::atomic<bool> *abort = nullptr);
bool load(const std::string &url, std::atomic<bool> *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events;

View File

@ -116,11 +116,11 @@ void Segment::loadFile(int id, const std::string file) {
const bool local_cache = !(flags & REPLAY_FLAG_NO_FILE_CACHE);
bool success = false;
if (id < MAX_CAMERAS) {
frames[id] = std::make_unique<FrameReader>(local_cache, 20 * 1024 * 1024, 3);
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_CUDA, &abort_);
frames[id] = std::make_unique<FrameReader>();
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_CUDA, &abort_, local_cache, 20 * 1024 * 1024, 3);
} else {
log = std::make_unique<LogReader>(local_cache, 0, 3);
success = log->load(file, &abort_);
log = std::make_unique<LogReader>();
success = log->load(file, &abort_, local_cache, 0, 3);
}
if (!success) {

View File

@ -50,6 +50,17 @@ TEST_CASE("FileReader") {
}
}
TEST_CASE("LogReader") {
SECTION("corrupt log") {
FileReader reader(true);
std::string corrupt_content = reader.read(TEST_RLOG_URL);
corrupt_content.resize(corrupt_content.length() / 2);
LogReader log;
REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size()));
REQUIRE(log.events.size() > 0);
}
}
TEST_CASE("Segment") {
auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA);
Route demo_route(DEMO_ROUTE);

View File

@ -194,19 +194,32 @@ bool httpDownload(const std::string &url, const std::string &file, size_t chunk_
}
std::string decompressBZ2(const std::string &in) {
if (in.empty()) return {};
return decompressBZ2((std::byte *)in.data(), in.size());
}
std::string decompressBZ2(const std::byte *in, size_t in_size) {
if (in_size == 0) return {};
bz_stream strm = {};
int bzerror = BZ2_bzDecompressInit(&strm, 0, 0);
assert(bzerror == BZ_OK);
strm.next_in = (char *)in.data();
strm.avail_in = in.size();
std::string out(in.size() * 5, '\0');
strm.next_in = (char *)in;
strm.avail_in = in_size;
std::string out(in_size * 5, '\0');
do {
strm.next_out = (char *)(&out[strm.total_out_lo32]);
strm.avail_out = out.size() - strm.total_out_lo32;
const char *prev_write_pos = strm.next_out;
bzerror = BZ2_bzDecompress(&strm);
if (bzerror == BZ_OK && prev_write_pos == strm.next_out) {
// content is corrupt
bzerror = BZ_STREAM_END;
std::cout << "decompressBZ2 error : content is corrupt" << std::endl;
break;
}
if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
out.resize(out.size() * 2);
}

View File

@ -6,6 +6,7 @@
std::string sha256(const std::string &str);
void precise_nano_sleep(long sleep_ns);
std::string decompressBZ2(const std::string &in);
std::string decompressBZ2(const std::byte *in, size_t in_size);
void enableHttpLogging(bool enable);
std::string getUrlWithoutQuery(const std::string &url);
size_t getRemoteFileSize(const std::string &url);