diff --git a/messaging/__init__.py b/messaging/__init__.py index 1628dd5..0440104 100644 --- a/messaging/__init__.py +++ b/messaging/__init__.py @@ -13,6 +13,7 @@ from cereal.services import service_list assert MultiplePublishersError assert MessagingError +NO_TRAVERSAL_LIMIT = 2**64-1 AVG_FREQ_HISTORY = 100 SIMULATION = "SIMULATION" in os.environ @@ -26,6 +27,9 @@ except ImportError: context = Context() +def log_from_bytes(dat: bytes) -> capnp.lib.capnp._DynamicStructReader: + return log.Event.from_bytes(dat, traversal_limit_in_words=NO_TRAVERSAL_LIMIT) + def new_message(service: Optional[str] = None, size: Optional[int] = None) -> capnp.lib.capnp._DynamicStructBuilder: dat = log.Event.new_message() dat.logMonoTime = int(sec_since_boot() * 1e9) @@ -83,7 +87,7 @@ def drain_sock(sock: SubSocket, wait_for_one: bool = False) -> List[capnp.lib.ca if dat is None: # Timeout hit break - dat = log.Event.from_bytes(dat) + dat = log_from_bytes(dat) ret.append(dat) return ret @@ -106,20 +110,20 @@ def recv_sock(sock: SubSocket, wait: bool = False) -> Union[None, capnp.lib.capn dat = rcv if dat is not None: - dat = log.Event.from_bytes(dat) + dat = log_from_bytes(dat) return dat def recv_one(sock: SubSocket) -> Union[None, capnp.lib.capnp._DynamicStructReader]: dat = sock.receive() if dat is not None: - dat = log.Event.from_bytes(dat) + dat = log_from_bytes(dat) return dat def recv_one_or_none(sock: SubSocket) -> Union[None, capnp.lib.capnp._DynamicStructReader]: dat = sock.receive(non_blocking=True) if dat is not None: - dat = log.Event.from_bytes(dat) + dat = log_from_bytes(dat) return dat def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader: @@ -127,7 +131,7 @@ def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader: while True: dat = sock.receive() if dat is not None: - return log.Event.from_bytes(dat) + return log_from_bytes(dat) class SubMaster(): def __init__(self, services: List[str], poll: Optional[List[str]] = None, diff --git a/messaging/messaging.h b/messaging/messaging.h index 8c0825b..4a184ff 100644 --- a/messaging/messaging.h +++ b/messaging/messaging.h @@ -69,7 +69,7 @@ public: SubMaster(const std::vector &service_list, const char *address = nullptr, const std::vector &ignore_alive = {}); void update(int timeout = 1000); - void update_msgs(uint64_t current_time, std::vector> messages); + void update_msgs(uint64_t current_time, const std::vector> &messages); inline bool allAlive(const std::vector &service_list = {}) { return all_(service_list, false, true); } inline bool allValid(const std::vector &service_list = {}) { return all_(service_list, true, false); } inline bool allAliveAndValid(const std::vector &service_list = {}) { return all_(service_list, true, true); } diff --git a/messaging/socketmaster.cc b/messaging/socketmaster.cc index c88a033..56698dc 100644 --- a/messaging/socketmaster.cc +++ b/messaging/socketmaster.cc @@ -92,7 +92,9 @@ void SubMaster::update(int timeout) { SubMessage *m = messages_.at(s); m->msg_reader->~FlatArrayMessageReader(); - m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(m->aligned_buf.align(msg)); + capnp::ReaderOptions options; + options.traversalLimitInWords = kj::maxValue; // Don't limit + m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(m->aligned_buf.align(msg), options); delete msg; messages.push_back({m->name, m->msg_reader->getRoot()}); } @@ -100,7 +102,7 @@ void SubMaster::update(int timeout) { update_msgs(current_time, messages); } -void SubMaster::update_msgs(uint64_t current_time, std::vector> messages){ +void SubMaster::update_msgs(uint64_t current_time, const std::vector> &messages){ if (++frame == UINT64_MAX) frame = 1; for(auto &kv : messages) {