openpilot0/common/mock/__init__.py

51 lines
1.2 KiB
Python

"""
Utilities for generating mock messages for testing.
example in common/tests/test_mock.py
"""
import functools
import threading
from cereal.messaging import PubMaster
from cereal.services import SERVICE_LIST
from openpilot.common.mock.generators import generate_livePose
from openpilot.common.realtime import Ratekeeper
MOCK_GENERATOR = {
"livePose": generate_livePose
}
def generate_messages_loop(services: list[str], done: threading.Event):
pm = PubMaster(services)
rk = Ratekeeper(100)
i = 0
while not done.is_set():
for s in services:
should_send = i % (100/SERVICE_LIST[s].frequency) == 0
if should_send:
message = MOCK_GENERATOR[s]()
pm.send(s, message)
i += 1
rk.keep_time()
def mock_messages(services: list[str] | str):
if isinstance(services, str):
services = [services]
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
done = threading.Event()
t = threading.Thread(target=generate_messages_loop, args=(services, done))
t.start()
try:
return func(*args, **kwargs)
finally:
done.set()
t.join()
return wrapper
return decorator