Files
sunnypilot/common/mock/__init__.py
Justin Newberry 05204fbde3 Create message mocking tools (#31249)
* add mocking tools

* fix map renderer

* use for power draw

* fix those

* whitespace

* rename to services

* fix the rate

* remove
old-commit-hash: 086c509fde
2024-01-31 18:47:49 -08:00

52 lines
1.2 KiB
Python

"""
Utilities for generating mock messages for testing.
example in common/tests/test_mock.py
"""
import functools
import threading
from typing import List, Union
from cereal.messaging import PubMaster
from cereal.services import SERVICE_LIST
from openpilot.common.mock.generators import generate_liveLocationKalman
from openpilot.common.realtime import Ratekeeper
MOCK_GENERATOR = {
"liveLocationKalman": generate_liveLocationKalman
}
def generate_messages_loop(services: List[str], done: threading.Event):
pm = PubMaster(services)
rk = Ratekeeper(100)
i = 0
while not done.is_set():
for s in services:
should_send = i % (100/SERVICE_LIST[s].frequency) == 0
if should_send:
message = MOCK_GENERATOR[s]()
pm.send(s, message)
i += 1
rk.keep_time()
def mock_messages(services: Union[List[str], str]):
if isinstance(services, str):
services = [services]
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
done = threading.Event()
t = threading.Thread(target=generate_messages_loop, args=(services, done))
t.start()
try:
return func(*args, **kwargs)
finally:
done.set()
t.join()
return wrapper
return decorator