mirror of https://github.com/commaai/cereal.git
142 lines
4.0 KiB
Python
Executable File
142 lines
4.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import random
|
|
import time
|
|
from typing import Sized, cast
|
|
import unittest
|
|
|
|
import cereal.messaging as messaging
|
|
from cereal.messaging.tests.test_messaging import events, random_sock, random_socks, \
|
|
random_bytes, random_carstate, assert_carstate, \
|
|
zmq_sleep
|
|
|
|
|
|
class TestSubMaster(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# ZMQ pub socket takes too long to die
|
|
# sleep to prevent multiple publishers error between tests
|
|
zmq_sleep(3)
|
|
|
|
def test_init(self):
|
|
sm = messaging.SubMaster(events)
|
|
for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive,
|
|
sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]:
|
|
self.assertEqual(len(cast(Sized, p)), len(events))
|
|
|
|
def test_init_state(self):
|
|
socks = random_socks()
|
|
sm = messaging.SubMaster(socks)
|
|
self.assertEqual(sm.frame, -1)
|
|
self.assertFalse(any(sm.updated.values()))
|
|
self.assertFalse(any(sm.alive.values()))
|
|
self.assertTrue(all(t == 0. for t in sm.rcv_time.values()))
|
|
self.assertTrue(all(f == 0 for f in sm.rcv_frame.values()))
|
|
self.assertTrue(all(t == 0 for t in sm.logMonoTime.values()))
|
|
|
|
for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive,
|
|
sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]:
|
|
self.assertEqual(len(cast(Sized, p)), len(socks))
|
|
|
|
def test_getitem(self):
|
|
sock = "carState"
|
|
pub_sock = messaging.pub_sock(sock)
|
|
sm = messaging.SubMaster([sock,])
|
|
zmq_sleep()
|
|
|
|
msg = random_carstate()
|
|
pub_sock.send(msg.to_bytes())
|
|
sm.update(1000)
|
|
assert_carstate(msg.carState, sm[sock])
|
|
|
|
# TODO: break this test up to individually test SubMaster.update and SubMaster.update_msgs
|
|
def test_update(self):
|
|
sock = "carState"
|
|
pub_sock = messaging.pub_sock(sock)
|
|
sm = messaging.SubMaster([sock,])
|
|
zmq_sleep()
|
|
|
|
for i in range(10):
|
|
msg = messaging.new_message(sock)
|
|
pub_sock.send(msg.to_bytes())
|
|
sm.update(1000)
|
|
self.assertEqual(sm.frame, i)
|
|
self.assertTrue(all(sm.updated.values()))
|
|
|
|
def test_update_timeout(self):
|
|
sock = random_sock()
|
|
sm = messaging.SubMaster([sock,])
|
|
for _ in range(5):
|
|
timeout = random.randrange(1000, 5000)
|
|
start_time = time.monotonic()
|
|
sm.update(timeout)
|
|
t = time.monotonic() - start_time
|
|
self.assertGreaterEqual(t, timeout/1000.)
|
|
self.assertLess(t, 5)
|
|
self.assertFalse(any(sm.updated.values()))
|
|
|
|
def test_alive(self):
|
|
pass
|
|
|
|
def test_ignore_alive(self):
|
|
pass
|
|
|
|
def test_valid(self):
|
|
pass
|
|
|
|
# SubMaster should always conflate
|
|
def test_conflate(self):
|
|
sock = "carState"
|
|
pub_sock = messaging.pub_sock(sock)
|
|
sm = messaging.SubMaster([sock,])
|
|
|
|
n = 10
|
|
for i in range(n+1):
|
|
msg = messaging.new_message(sock)
|
|
msg.carState.vEgo = i
|
|
pub_sock.send(msg.to_bytes())
|
|
time.sleep(0.01)
|
|
sm.update(1000)
|
|
self.assertEqual(sm[sock].vEgo, n)
|
|
|
|
|
|
class TestPubMaster(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# ZMQ pub socket takes too long to die
|
|
# sleep to prevent multiple publishers error between tests
|
|
zmq_sleep(3)
|
|
|
|
def test_init(self):
|
|
messaging.PubMaster(events)
|
|
|
|
def test_send(self):
|
|
socks = random_socks()
|
|
pm = messaging.PubMaster(socks)
|
|
sub_socks = {s: messaging.sub_sock(s, conflate=True, timeout=1000) for s in socks}
|
|
zmq_sleep()
|
|
|
|
# PubMaster accepts either a capnp msg builder or bytes
|
|
for capnp in [True, False]:
|
|
for i in range(100):
|
|
sock = socks[i % len(socks)]
|
|
|
|
if capnp:
|
|
try:
|
|
msg = messaging.new_message(sock)
|
|
except Exception:
|
|
msg = messaging.new_message(sock, random.randrange(50))
|
|
else:
|
|
msg = random_bytes()
|
|
|
|
pm.send(sock, msg)
|
|
recvd = sub_socks[sock].receive()
|
|
|
|
if capnp:
|
|
msg.clear_write_flag()
|
|
msg = msg.to_bytes()
|
|
self.assertEqual(msg, recvd, i)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|