logmessaged: handle big cloudlogs (#28553)

* logmessaged: handle big cloudlogs

* revert

* switch
old-commit-hash: a1b6697629
This commit is contained in:
Adeeb Shihadeh 2023-06-14 22:48:51 -07:00 committed by GitHub
parent 8b0492c8fe
commit 4aa12c2864
4 changed files with 84 additions and 14 deletions

View File

@ -209,6 +209,7 @@ jobs:
$UNIT_TEST system/loggerd && \
$UNIT_TEST selfdrive/car && \
$UNIT_TEST selfdrive/locationd && \
$UNIT_TEST system/tests && \
$UNIT_TEST system/ubloxd && \
selfdrive/locationd/test/_test_locationd_lib.py && \
./system/ubloxd/tests/test_glonass_runner && \

View File

@ -12,7 +12,7 @@ def main() -> NoReturn:
log_handler.setFormatter(SwagLogFileFormatter(None))
log_level = 20 # logging.INFO
ctx = zmq.Context().instance()
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.PULL)
sock.bind("ipc:///tmp/logmessage")
@ -20,23 +20,32 @@ def main() -> NoReturn:
log_message_sock = messaging.pub_sock('logMessage')
error_log_message_sock = messaging.pub_sock('errorLogMessage')
while True:
dat = b''.join(sock.recv_multipart())
level = dat[0]
record = dat[1:].decode("utf-8")
if level >= log_level:
log_handler.emit(record)
try:
while True:
dat = b''.join(sock.recv_multipart())
level = dat[0]
record = dat[1:].decode("utf-8")
if level >= log_level:
log_handler.emit(record)
# then we publish them
msg = messaging.new_message()
msg.logMessage = record
log_message_sock.send(msg.to_bytes())
if len(record) > 2*1024*1024:
print("WARNING: log too big to publish", len(record))
print(print(record[:100]))
continue
if level >= 40: # logging.ERROR
# then we publish them
msg = messaging.new_message()
msg.errorLogMessage = record
error_log_message_sock.send(msg.to_bytes())
msg.logMessage = record
log_message_sock.send(msg.to_bytes())
if level >= 40: # logging.ERROR
msg = messaging.new_message()
msg.errorLogMessage = record
error_log_message_sock.send(msg.to_bytes())
finally:
sock.close()
ctx.term()
log_handler.close()
if __name__ == "__main__":
main()

0
system/tests/__init__.py Normal file
View File

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
import glob
import os
import shutil
import time
import unittest
import cereal.messaging as messaging
from selfdrive.manager.process_config import managed_processes
from system.swaglog import cloudlog, SWAGLOG_DIR
class TestLogmessaged(unittest.TestCase):
def setUp(self):
if os.path.exists(SWAGLOG_DIR):
shutil.rmtree(SWAGLOG_DIR)
managed_processes['logmessaged'].start()
self.sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
self.error_sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
# ensure sockets are connected
time.sleep(0.2)
messaging.drain_sock(self.sock)
messaging.drain_sock(self.error_sock)
def tearDown(self):
del self.sock
del self.error_sock
managed_processes['logmessaged'].stop(block=True)
def _get_log_files(self):
return list(glob.glob(os.path.join(SWAGLOG_DIR, "swaglog.*")))
def test_simple_log(self):
msgs = [f"abc {i}" for i in range(10)]
for m in msgs:
cloudlog.error(m)
time.sleep(3)
m = messaging.drain_sock(self.sock)
assert len(m) == len(msgs)
assert len(self._get_log_files()) >= 1
def test_big_log(self):
n = 10
msg = "a"*3*1024*1024
for _ in range(n):
cloudlog.info(msg)
time.sleep(3)
msgs = messaging.drain_sock(self.sock)
assert len(msgs) == 0
logsize = sum([os.path.getsize(f) for f in self._get_log_files()])
assert (n*len(msg)) < logsize < (n*(len(msg)+1024))
if __name__ == "__main__":
unittest.main()