Get tests running again (#621)

* Get tests running again

* rn

* I don't understand what any of this means

* More updates

* Try fixing test_fake

* test fake passes

* All python tests pass
This commit is contained in:
Harald Schäfer 2024-06-07 13:32:22 -07:00 committed by GitHub
parent 7c00db60e5
commit 2ac05eae17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 817 additions and 3 deletions

27
.github/workflows/repo.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: repo
on:
schedule:
- cron: "0 15 1 * *"
workflow_dispatch:
jobs:
pre-commit-autoupdate:
name: pre-commit autoupdate
runs-on: ubuntu-latest
container:
steps:
- uses: actions/checkout@v3
- name: pre-commit autoupdate
run: |
git config --global --add safe.directory '*'
pre-commit autoupdate
- name: Create Pull Request
uses: peter-evans/create-pull-request@5b4a9f6a9e2af26e5f02351490b90d01eb8ec1e5
with:
token: ${{ secrets.ACTIONS_CREATE_PR_PAT }}
commit-message: Update pre-commit hook versions
title: 'pre-commit: autoupdate hooks'
branch: pre-commit-updates
base: master
delete-branch: true

61
.github/workflows/tests.yml vendored Normal file
View File

@ -0,0 +1,61 @@
name: tests
on: [push, pull_request]
env:
DOCKER_REGISTRY: ghcr.io/commaai
RUN: docker run -e PYTHONWARNINGS=error --shm-size 1G --name msgq msgq /bin/sh -c
RUN_NAMED: docker run -e PYTHONWARNINGS=error --shm-size 1G --rm msgq /bin/sh -c
CI_RUN: docker run -e GITHUB_ACTION -e GITHUB_REF -e GITHUB_HEAD_REF -e GITHUB_SHA -e GITHUB_REPOSITORY -e GITHUB_RUN_ID --rm msgqci /bin/bash -c
BUILD: docker buildx build --pull --load --cache-to type=inline --cache-from $DOCKER_REGISTRY/msgq:latest -t msgq -f Dockerfile .
PYTHONWARNINGS: error
jobs:
build:
name: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build docker image
run: eval "$BUILD"
- name: Push to dockerhub
if: github.ref == 'refs/heads/master' && github.event_name != 'pull_request' && github.repository == 'commaai/msgq'
run: |
docker login ghcr.io -u ${{ github.actor }} -p ${{ secrets.GITHUB_TOKEN }}
docker tag msgq $DOCKER_REGISTRY/msgq:latest
docker push $DOCKER_REGISTRY/msgq:latest
unit_tests:
name: unit tests
runs-on: ubuntu-latest
strategy:
matrix:
flags: ['', '--asan', '--ubsan']
backend: ['MSGQ', 'ZMQ']
steps:
- uses: actions/checkout@v3
- name: Build docker image
run: eval "$BUILD"
- name: C++ tests
run: |
$RUN "export ${{ matrix.backend }}=1 && \
scons ${{ matrix.flags }} -j$(nproc) && \
messaging/test_runner && \
visionipc/test_runner"
- name: python tests
run: $RUN_NAMED "${{ matrix.backend }}=1 coverage run -m unittest discover ."
- name: Upload coverage
run: |
docker commit msgq msgqci
$CI_RUN "cd /project/msgq && bash <(curl -s https://codecov.io/bash) -v -F unit_tests_${{ matrix.backend }}"
static_analysis:
name: static analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build docker image
run: eval "$BUILD"
- name: Static analysis
# TODO: a package pre-commit installs has a warning, remove the unset once that's fixed
run: $RUN "git init && git add -A && unset PYTHONWARNINGS && pre-commit run --all"

54
Dockerfile Normal file
View File

@ -0,0 +1,54 @@
FROM ubuntu:24.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
autoconf \
build-essential \
ca-certificates \
capnproto \
clang \
cppcheck \
curl \
git \
libbz2-dev \
libcapnp-dev \
libclang-rt-dev \
libffi-dev \
liblzma-dev \
libncurses5-dev \
libncursesw5-dev \
libreadline-dev \
libsqlite3-dev \
libssl-dev \
libtool \
libzmq3-dev \
llvm \
make \
cmake \
ocl-icd-opencl-dev \
opencl-headers \
python3-dev \
python3-pip \
tk-dev \
wget \
xz-utils \
zlib1g-dev \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install --break-system-packages --no-cache-dir pyyaml Cython scons pycapnp pre-commit ruff parameterized coverage numpy
WORKDIR /project/
RUN cd /tmp/ && \
git clone -b v2.x --depth 1 https://github.com/catchorg/Catch2.git && \
cd Catch2 && \
mv single_include/catch2/ /project/ && \
cd .. \
rm -rf Catch2
WORKDIR /project/msgq
ENV PYTHONPATH=/project
COPY . .
RUN rm -rf .git && \
scons -c && scons -j$(nproc)

View File

@ -19,9 +19,6 @@ messaging_objects = env.SharedObject([
messaging = env.Library('messaging', messaging_objects)
messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common])
if GetOption('extras'):
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging, common])
# Build Vision IPC
vipc_files = ['ipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
@ -46,6 +43,7 @@ envCython.Program(f'{visionipc_dir.abspath}/visionipc_pyx.so', f'{visionipc_dir.
LIBS=vipc_libs, FRAMEWORKS=vipc_frameworks)
if GetOption('extras'):
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging, common])
env.Program('visionipc/test_runner',
['visionipc/test_runner.cc', 'visionipc/visionipc_tests.cc'],
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)

89
SConstruct Normal file
View File

@ -0,0 +1,89 @@
import os
import platform
import subprocess
import sysconfig
import numpy as np
arch = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip()
if platform.system() == "Darwin":
arch = "Darwin"
common = ''
cpppath = [
f"#/../",
'/usr/lib/include',
'/opt/homebrew/include',
sysconfig.get_paths()['include'],
]
libpath = [
'/opt/homebrew/lib',
]
AddOption('--minimal',
action='store_false',
dest='extras',
default=True,
help='the minimum build. no tests, tools, etc.')
AddOption('--asan',
action='store_true',
help='turn on ASAN')
AddOption('--ubsan',
action='store_true',
help='turn on UBSan')
ccflags = []
ldflags = []
if GetOption('ubsan'):
flags = [
"-fsanitize=undefined",
"-fno-sanitize-recover=undefined",
]
ccflags += flags
ldflags += flags
elif GetOption('asan'):
ccflags += ["-fsanitize=address", "-fno-omit-frame-pointer"]
ldflags += ["-fsanitize=address"]
env = Environment(
ENV=os.environ,
CC='clang',
CXX='clang++',
CCFLAGS=[
"-g",
"-fPIC",
"-O2",
"-Wunused",
"-Werror",
"-Wshadow",
"-Wno-vla-cxx-extension",
] + ccflags,
LDFLAGS=ldflags,
LINKFLAGS=ldflags,
CFLAGS="-std=gnu11",
CXXFLAGS="-std=c++1z",
CPPPATH=cpppath,
LIBPATH=libpath,
CYTHONCFILESUFFIX=".cpp",
tools=["default", "cython"]
)
Export('env', 'arch', 'common')
envCython = env.Clone(LIBS=[])
envCython["CPPPATH"] += [np.get_include()]
envCython["CCFLAGS"] += ["-Wno-#warnings", "-Wno-shadow", "-Wno-deprecated-declarations"]
envCython["CCFLAGS"].remove('-Werror')
if arch == "Darwin":
envCython["LINKFLAGS"] = ["-bundle", "-undefined", "dynamic_lookup"]
else:
envCython["LINKFLAGS"] = ["-pthread", "-shared"]
Export('envCython')
SConscript(['SConscript'])

8
codecov.yml Normal file
View File

@ -0,0 +1,8 @@
comment: false
coverage:
status:
project:
default:
informational: true
patch: off

View File

@ -0,0 +1,61 @@
# must be built with scons
from msgq.messaging.messaging_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
from msgq.messaging.messaging_pyx import MultiplePublishersError, MessagingError
from typing import Optional, List
assert MultiplePublishersError
assert MessagingError
assert toggle_fake_events
assert set_fake_prefix
assert get_fake_prefix
assert delete_fake_prefix
assert wait_for_one_event
NO_TRAVERSAL_LIMIT = 2**64-1
context = Context()
def fake_event_handle(endpoint: str, identifier: Optional[str] = None, override: bool = True, enable: bool = False) -> SocketEventHandle:
identifier = identifier or get_fake_prefix()
handle = SocketEventHandle(endpoint, identifier, override)
if override:
handle.enabled = enable
return handle
def pub_sock(endpoint: str) -> PubSocket:
sock = PubSocket()
sock.connect(context, endpoint)
return sock
def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1",
conflate: bool = False, timeout: Optional[int] = None) -> SubSocket:
sock = SubSocket()
sock.connect(context, endpoint, addr.encode('utf8'), conflate)
if timeout is not None:
sock.setTimeout(timeout)
if poller is not None:
poller.registerSocket(sock)
return sock
def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]:
"""Receive all message currently available on the queue"""
ret: List[bytes] = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)
if dat is None:
break
ret.append(dat)
return ret

View File

View File

@ -0,0 +1,193 @@
import os
import unittest
import multiprocessing
import platform
from parameterized import parameterized_class
from typing import Optional
import msgq.messaging as messaging
WAIT_TIMEOUT = 5
@unittest.skipIf(platform.system() == "Darwin", "Events not supported on macOS")
class TestEvents(unittest.TestCase):
def test_mutation(self):
handle = messaging.fake_event_handle("carState")
event = handle.recv_called_event
self.assertFalse(event.peek())
event.set()
self.assertTrue(event.peek())
event.clear()
self.assertFalse(event.peek())
del event
def test_wait(self):
handle = messaging.fake_event_handle("carState")
event = handle.recv_called_event
event.set()
try:
event.wait(WAIT_TIMEOUT)
self.assertTrue(event.peek())
except RuntimeError:
self.fail("event.wait() timed out")
def test_wait_multiprocess(self):
handle = messaging.fake_event_handle("carState")
event = handle.recv_called_event
def set_event_run():
event.set()
try:
p = multiprocessing.Process(target=set_event_run)
p.start()
event.wait(WAIT_TIMEOUT)
self.assertTrue(event.peek())
except RuntimeError:
self.fail("event.wait() timed out")
p.kill()
def test_wait_zero_timeout(self):
handle = messaging.fake_event_handle("carState")
event = handle.recv_called_event
try:
event.wait(0)
self.fail("event.wait() did not time out")
except RuntimeError:
self.assertFalse(event.peek())
@unittest.skipIf(platform.system() == "Darwin", "FakeSockets not supported on macOS")
@unittest.skipIf("ZMQ" in os.environ, "FakeSockets not supported on ZMQ")
@parameterized_class([{"prefix": None}, {"prefix": "test"}])
class TestFakeSockets(unittest.TestCase):
prefix: Optional[str] = None
def setUp(self):
messaging.toggle_fake_events(True)
if self.prefix is not None:
messaging.set_fake_prefix(self.prefix)
else:
messaging.delete_fake_prefix()
def tearDown(self):
messaging.toggle_fake_events(False)
messaging.delete_fake_prefix()
def test_event_handle_init(self):
handle = messaging.fake_event_handle("controlsState", override=True)
self.assertFalse(handle.enabled)
self.assertGreaterEqual(handle.recv_called_event.fd, 0)
self.assertGreaterEqual(handle.recv_ready_event.fd, 0)
def test_non_managed_socket_state(self):
# non managed socket should have zero state
_ = messaging.pub_sock("ubloxGnss")
handle = messaging.fake_event_handle("ubloxGnss", override=False)
self.assertFalse(handle.enabled)
self.assertEqual(handle.recv_called_event.fd, 0)
self.assertEqual(handle.recv_ready_event.fd, 0)
def test_managed_socket_state(self):
# managed socket should not change anything about the state
handle = messaging.fake_event_handle("ubloxGnss")
handle.enabled = True
expected_enabled = handle.enabled
expected_recv_called_fd = handle.recv_called_event.fd
expected_recv_ready_fd = handle.recv_ready_event.fd
_ = messaging.pub_sock("ubloxGnss")
self.assertEqual(handle.enabled, expected_enabled)
self.assertEqual(handle.recv_called_event.fd, expected_recv_called_fd)
self.assertEqual(handle.recv_ready_event.fd, expected_recv_ready_fd)
def test_sockets_enable_disable(self):
carState_handle = messaging.fake_event_handle("ubloxGnss", enable=True)
recv_called = carState_handle.recv_called_event
recv_ready = carState_handle.recv_ready_event
pub_sock = messaging.pub_sock("ubloxGnss")
sub_sock = messaging.sub_sock("ubloxGnss")
try:
carState_handle.enabled = True
recv_ready.set()
pub_sock.send(b"test")
_ = sub_sock.receive()
self.assertTrue(recv_called.peek())
recv_called.clear()
carState_handle.enabled = False
recv_ready.set()
pub_sock.send(b"test")
_ = sub_sock.receive()
self.assertFalse(recv_called.peek())
except RuntimeError:
self.fail("event.wait() timed out")
def test_synced_pub_sub(self):
def daemon_repub_process_run():
pub_sock = messaging.pub_sock("ubloxGnss")
sub_sock = messaging.sub_sock("carState")
frame = -1
while True:
frame += 1
msg = sub_sock.receive(non_blocking=True)
if msg is None:
print("none received")
continue
bts = frame.to_bytes(8, 'little')
pub_sock.send(bts)
carState_handle = messaging.fake_event_handle("carState", enable=True)
recv_called = carState_handle.recv_called_event
recv_ready = carState_handle.recv_ready_event
p = multiprocessing.Process(target=daemon_repub_process_run)
p.start()
pub_sock = messaging.pub_sock("carState")
sub_sock = messaging.sub_sock("ubloxGnss")
try:
for i in range(10):
recv_called.wait(WAIT_TIMEOUT)
recv_called.clear()
if i == 0:
sub_sock.receive(non_blocking=True)
bts = i.to_bytes(8, 'little')
pub_sock.send(bts)
recv_ready.set()
recv_called.wait(WAIT_TIMEOUT)
msg = sub_sock.receive(non_blocking=True)
self.assertIsNotNone(msg)
self.assertEqual(len(msg), 8)
frame = int.from_bytes(msg, 'little')
self.assertEqual(frame, i)
except RuntimeError:
self.fail("event.wait() timed out")
finally:
p.kill()
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
import os
import random
import threading
import time
import string
import unittest
import msgq.messaging as messaging
def random_sock():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
def random_bytes(length=1000):
return bytes([random.randrange(0xFF) for _ in range(length)])
def zmq_sleep(t=1):
if "ZMQ" in os.environ:
time.sleep(t)
def zmq_expected_failure(func):
if "ZMQ" in os.environ:
return unittest.expectedFailure(func)
else:
return func
def delayed_send(delay, sock, dat):
def send_func():
sock.send(dat)
threading.Timer(delay, send_func).start()
class TestPubSubSockets(unittest.TestCase):
def setUp(self):
# ZMQ pub socket takes too long to die
# sleep to prevent multiple publishers error between tests
zmq_sleep()
def test_pub_sub(self):
sock = random_sock()
pub_sock = messaging.pub_sock(sock)
sub_sock = messaging.sub_sock(sock, conflate=False, timeout=None)
zmq_sleep(3)
for _ in range(1000):
msg = random_bytes()
pub_sock.send(msg)
recvd = sub_sock.receive()
self.assertEqual(msg, recvd)
def test_conflate(self):
sock = random_sock()
pub_sock = messaging.pub_sock(sock)
for conflate in [True, False]:
for _ in range(10):
num_msgs = random.randint(3, 10)
sub_sock = messaging.sub_sock(sock, conflate=conflate, timeout=None)
zmq_sleep()
sent_msgs = []
for __ in range(num_msgs):
msg = random_bytes()
pub_sock.send(msg)
sent_msgs.append(msg)
time.sleep(0.1)
recvd_msgs = messaging.drain_sock_raw(sub_sock)
if conflate:
self.assertEqual(len(recvd_msgs), 1)
else:
# TODO: compare actual data
self.assertEqual(len(recvd_msgs), len(sent_msgs))
def test_receive_timeout(self):
sock = random_sock()
for _ in range(10):
timeout = random.randrange(200)
sub_sock = messaging.sub_sock(sock, timeout=timeout)
zmq_sleep()
start_time = time.monotonic()
recvd = sub_sock.receive()
self.assertLess(time.monotonic() - start_time, 0.2)
assert recvd is None
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,142 @@
import unittest
import time
import msgq.messaging as messaging
import concurrent.futures
SERVICE_NAME = 'myService'
def poller():
context = messaging.Context()
p = messaging.Poller()
sub = messaging.SubSocket()
sub.connect(context, SERVICE_NAME)
p.registerSocket(sub)
socks = p.poll(10000)
r = [s.receive(non_blocking=True) for s in socks]
return r
class TestPoller(unittest.TestCase):
def test_poll_once(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, SERVICE_NAME)
with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)
time.sleep(0.1) # Slow joiner syndrome
# Send message
pub.send(b"a")
# Wait for poll result
result = poll.result()
del pub
context.term()
self.assertEqual(result, [b"a"])
def test_poll_and_create_many_subscribers(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, SERVICE_NAME)
with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)
time.sleep(0.1) # Slow joiner syndrome
c = messaging.Context()
for _ in range(10):
messaging.SubSocket().connect(c, SERVICE_NAME)
time.sleep(0.1)
# Send message
pub.send(b"a")
# Wait for poll result
result = poll.result()
del pub
context.term()
self.assertEqual(result, [b"a"])
def test_multiple_publishers_exception(self):
context = messaging.Context()
with self.assertRaises(messaging.MultiplePublishersError):
pub1 = messaging.PubSocket()
pub1.connect(context, SERVICE_NAME)
pub2 = messaging.PubSocket()
pub2.connect(context, SERVICE_NAME)
pub1.send(b"a")
del pub1
del pub2
context.term()
def test_multiple_messages(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, SERVICE_NAME)
sub = messaging.SubSocket()
sub.connect(context, SERVICE_NAME)
time.sleep(0.1) # Slow joiner
for i in range(1, 100):
pub.send(b'a'*i)
msg_seen = False
i = 1
while True:
r = sub.receive(non_blocking=True)
if r is not None:
self.assertEqual(b'a'*i, r)
msg_seen = True
i += 1
if r is None and msg_seen: # ZMQ sometimes receives nothing on the first receive
break
del pub
del sub
context.term()
def test_conflate(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, SERVICE_NAME)
sub = messaging.SubSocket()
sub.connect(context, SERVICE_NAME, conflate=True)
time.sleep(0.1) # Slow joiner
pub.send(b'a')
pub.send(b'b')
self.assertEqual(b'b', sub.receive())
del pub
del sub
context.term()
if __name__ == "__main__":
unittest.main()

21
pyproject.toml Normal file
View File

@ -0,0 +1,21 @@
# https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml
[tool.ruff]
lint.select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"]
lint.ignore = ["W292", "E741", "E402", "C408", "ISC003"]
lint.flake8-implicit-str-concat.allow-multiline=false
line-length = 160
target-version="py311"
[mypy.tool]
# third-party packages
ignore_missing_imports=true
# helpful warnings
warn_redundant_casts=true
warn_unreachable=true
warn_unused_ignores=true
# restrict dynamic typing
warn_return_any=true
check_untyped_defs=true

View File

@ -0,0 +1,72 @@
import re
import SCons
from SCons.Action import Action
from SCons.Scanner import Scanner
pyx_from_import_re = re.compile(r'^from\s+(\S+)\s+cimport', re.M)
pyx_import_re = re.compile(r'^cimport\s+(\S+)', re.M)
cdef_import_re = re.compile(r'^cdef extern from\s+.(\S+).:', re.M)
def pyx_scan(node, env, path, arg=None):
contents = node.get_text_contents()
# from <module> cimport ...
matches = pyx_from_import_re.findall(contents)
# cimport <module>
matches += pyx_import_re.findall(contents)
# Modules can be either .pxd or .pyx files
files = [m.replace('.', '/') + '.pxd' for m in matches]
files += [m.replace('.', '/') + '.pyx' for m in matches]
# cdef extern from <file>
files += cdef_import_re.findall(contents)
# Handle relative imports
cur_dir = str(node.get_dir())
files = [cur_dir + f if f.startswith('/') else f for f in files]
# Filter out non-existing files (probably system imports)
files = [f for f in files if env.File(f).exists()]
return env.File(files)
pyxscanner = Scanner(function=pyx_scan, skeys=['.pyx', '.pxd'], recursive=True)
cythonAction = Action("$CYTHONCOM")
def create_builder(env):
try:
cython = env['BUILDERS']['Cython']
except KeyError:
cython = SCons.Builder.Builder(
action=cythonAction,
emitter={},
suffix=cython_suffix_emitter,
single_source=1
)
env.Append(SCANNERS=pyxscanner)
env['BUILDERS']['Cython'] = cython
return cython
def cython_suffix_emitter(env, source):
return "$CYTHONCFILESUFFIX"
def generate(env):
env["CYTHON"] = "cythonize"
env["CYTHONCOM"] = "$CYTHON $CYTHONFLAGS $SOURCE"
env["CYTHONCFILESUFFIX"] = ".cpp"
c_file, _ = SCons.Tool.createCFileBuilders(env)
c_file.suffix['.pyx'] = cython_suffix_emitter
c_file.add_action('.pyx', cythonAction)
c_file.suffix['.py'] = cython_suffix_emitter
c_file.add_action('.py', cythonAction)
create_builder(env)
def exists(env):
return True