Files
panda-meb/tests/throughput_test.py
2017-05-16 17:01:29 -07:00

60 lines
1.4 KiB
Python
Executable File

#!/usr/bin/env python
import os
import struct
import time
from panda.lib.panda import Panda, PandaWifiStreaming
from tqdm import tqdm
# test throughput between USB and wifi
if __name__ == "__main__":
print Panda.list()
p_out = Panda("108018800f51363038363036")
print p_out.get_serial()
#p_in = Panda("02001b000f51363038363036")
p_in = Panda("WIFI")
print p_in.get_serial()
p_in = PandaWifiStreaming()
#while 1:
# p_in.can_recv()
#exit(0)
p_out.set_controls_allowed(True)
set_out, set_in = set(), set()
# drain
p_out.can_recv()
p_in.can_recv()
BATCH_SIZE = 1
for a in tqdm(range(0, 10000, BATCH_SIZE)):
for b in range(0, BATCH_SIZE):
msg = "\xaa"*4 + struct.pack("I", a+b)
if a%2 == 0:
p_out.can_send(0xaa, msg, 0)
dat_out, dat_in = p_out.can_recv(), p_in.can_recv()
if len(dat_in) != 0:
print len(dat_in)
num_out = [struct.unpack("I", i[4:])[0] for _, _, i, _ in dat_out]
num_in = [struct.unpack("I", i[4:])[0] for _, _, i, _ in dat_in]
set_in.update(num_in)
set_out.update(num_out)
# swag
print "waiting for packets"
time.sleep(1.0)
dat_in = p_in.can_recv()
num_in = [struct.unpack("I", i[4:])[0] for _, _, i, _ in dat_in]
set_in.update(num_in)
if len(set_out - set_in):
print "MISSING %d" % len(set_out - set_in)
if len(set_out - set_in) < 100:
print map(hex, sorted(list(set_out - set_in)))