mirror of https://github.com/commaai/openpilot.git
51 lines
1.2 KiB
Python
51 lines
1.2 KiB
Python
"""
|
|
Utilities for generating mock messages for testing.
|
|
example in common/tests/test_mock.py
|
|
"""
|
|
|
|
|
|
import functools
|
|
import threading
|
|
from cereal.messaging import PubMaster
|
|
from cereal.services import SERVICE_LIST
|
|
from openpilot.common.mock.generators import generate_liveLocationKalman
|
|
from openpilot.common.realtime import Ratekeeper
|
|
|
|
|
|
MOCK_GENERATOR = {
|
|
"liveLocationKalman": generate_liveLocationKalman
|
|
}
|
|
|
|
|
|
def generate_messages_loop(services: list[str], done: threading.Event):
|
|
pm = PubMaster(services)
|
|
rk = Ratekeeper(100)
|
|
i = 0
|
|
while not done.is_set():
|
|
for s in services:
|
|
should_send = i % (100/SERVICE_LIST[s].frequency) == 0
|
|
if should_send:
|
|
message = MOCK_GENERATOR[s]()
|
|
pm.send(s, message)
|
|
i += 1
|
|
rk.keep_time()
|
|
|
|
|
|
def mock_messages(services: list[str] | str):
|
|
if isinstance(services, str):
|
|
services = [services]
|
|
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
done = threading.Event()
|
|
t = threading.Thread(target=generate_messages_loop, args=(services, done))
|
|
t.start()
|
|
try:
|
|
return func(*args, **kwargs)
|
|
finally:
|
|
done.set()
|
|
t.join()
|
|
return wrapper
|
|
return decorator
|