mirror of https://github.com/commaai/panda.git
72 lines
2.1 KiB
Python
72 lines
2.1 KiB
Python
import time
|
|
import random
|
|
|
|
|
|
def get_random_can_messages(n):
|
|
m = []
|
|
for _ in range(n):
|
|
bus = random.randrange(3)
|
|
addr = random.randrange(1 << 29)
|
|
dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))])
|
|
m.append([addr, dat, bus])
|
|
return m
|
|
|
|
|
|
def time_many_sends(p, bus, p_recv=None, msg_count=100, two_pandas=False, msg_len=8):
|
|
if p_recv is None:
|
|
p_recv = p
|
|
if p == p_recv and two_pandas:
|
|
raise ValueError("Cannot have two pandas that are the same panda")
|
|
|
|
msg_id = random.randint(0x100, 0x200)
|
|
to_send = [(msg_id, b"\xaa" * msg_len, bus)] * msg_count
|
|
|
|
start_time = time.monotonic()
|
|
p.can_send_many(to_send)
|
|
r = []
|
|
r_echo = []
|
|
r_len_expected = msg_count if two_pandas else msg_count * 2
|
|
r_echo_len_exected = msg_count if two_pandas else 0
|
|
|
|
while len(r) < r_len_expected and (time.monotonic() - start_time) < 5:
|
|
r.extend(p_recv.can_recv())
|
|
end_time = time.monotonic()
|
|
if two_pandas:
|
|
while len(r_echo) < r_echo_len_exected and (time.monotonic() - start_time) < 10:
|
|
r_echo.extend(p.can_recv())
|
|
|
|
sent_echo = [x for x in r if x[2] == 0x80 | bus and x[0] == msg_id]
|
|
sent_echo.extend([x for x in r_echo if x[2] == 0x80 | bus and x[0] == msg_id])
|
|
resp = [x for x in r if x[2] == bus and x[0] == msg_id]
|
|
|
|
leftovers = [x for x in r if (x[2] != 0x80 | bus and x[2] != bus) or x[0] != msg_id]
|
|
assert len(leftovers) == 0
|
|
|
|
assert len(resp) == msg_count
|
|
assert len(sent_echo) == msg_count
|
|
|
|
end_time = (end_time - start_time) * 1000.0
|
|
comp_kbps = (1 + 11 + 1 + 1 + 1 + 4 + (msg_len * 8) + 15 + 1 + 1 + 1 + 7) * msg_count / end_time
|
|
|
|
return comp_kbps
|
|
|
|
|
|
def clear_can_buffers(panda, speed: int | None = None):
|
|
if speed is not None:
|
|
for bus in range(3):
|
|
panda.set_can_speed_kbps(bus, speed)
|
|
|
|
# clear tx buffers
|
|
for i in range(4):
|
|
panda.can_clear(i)
|
|
|
|
# clear rx buffers
|
|
panda.can_clear(0xFFFF)
|
|
r = [1]
|
|
st = time.monotonic()
|
|
while len(r) > 0:
|
|
r = panda.can_recv()
|
|
time.sleep(0.05)
|
|
if (time.monotonic() - st) > 10:
|
|
raise Exception("Unable to clear can buffers for panda ", panda.get_serial())
|