ELM327: Stable very long multiline message support with tests.

This commit is contained in:
Jessy Diamond Exum 2017-08-16 23:28:23 -07:00
parent ad79b7f4d2
commit 069f388eed
3 changed files with 52 additions and 18 deletions

View File

@ -544,7 +544,7 @@ int ICACHE_FLASH_ATTR elm_LINFast_process_echo() {
os_printf("\n");
#endif
if(lin_bus_initialized) {
if(lin_bus_initialized || loopcount == 0 && i == 4) {
lin_ringbuff_clear();
return -1;
} else {

View File

@ -36,8 +36,8 @@ def send_compare(s, dat, ret, timeout=4):
print("current recv data:", repr(res))
break;
res += s.recv(1000)
print("final recv data: '%s'" % repr(res))
assert ret == res, "Data does not agree (%s) (%s)"%(repr(ret), repr(res))
#print("final recv data: '%s'" % repr(res))
assert ret == res#, "Data does not agree (%s) (%s)"%(repr(ret), repr(res))
def sync_reset(s):
s.send("ATZ\r")
@ -268,6 +268,32 @@ def test_elm_send_lin_multiline_msg():
sim.join()
s.close()
def test_elm_send_lin_multiline_msg_throughput():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCarSimulator(serial, can=False, silent=True)
sim.start()
try:
sync_reset(s)
send_compare(s, b'ATSP5\r', b"ATSP5\rOK\r\r>") # Set Proto
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers OFF
send_compare(s, b'09fc\r', # headers OFF, Spaces OFF
b"BUS INIT: OK\r" +
b''.join((b'49FC' + hex(num+1)[2:].upper().zfill(2) +
b'AAAA' + hex(num+1)[2:].upper().zfill(4) + b'\r'
for num in range(80))) +
b"\r>",
timeout=10
)
finally:
sim.stop()
sim.join()
s.close()
def test_elm_panda_safety_mode_KWPFast():
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
p_car = Panda(serial) # Configure this so the messages will send
@ -476,12 +502,10 @@ def test_elm_send_can_multiline_msg():
sim.join()
s.close()
# TODO: Expand test to full throughput.
# Max throughput currently causes dropped wifi packets
def test_elm_send_can_multiline_msg_throughput():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
sim.start()
try:
@ -491,12 +515,14 @@ def test_elm_send_can_multiline_msg_throughput():
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
send_compare(s, b'09fd\r', # headers ON, Spaces OFF
("7E8123649FD01AAAAAA\r" +
"".join(
("7E82"+hex((num+1)%0x10)[2:].upper()+"AAAAAA" +
hex(num)[2:].upper().zfill(8) + "\r" for num in range(80))
) + "\r>").encode()
rows = 584
send_compare(s, b'09ff\r', # headers ON, Spaces OFF
("7E8" + "1" + hex((rows*7)+6)[2:].upper().zfill(3) + "49FF01"+"AA0000\r" +
"".join(
("7E82"+hex((num+1)%0x10)[2:].upper()+("AA"*5) +
hex(num+1)[2:].upper().zfill(4) + "\r" for num in range(rows))
) + "\r>").encode(),
timeout=10
)
finally:
sim.stop()

View File

@ -104,14 +104,17 @@ class ELMCarSimulator():
lin_buff = bytearray()
def _lin_send(self, to_addr, msg):
print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
if not self.__silent:
print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
PHYS_ADDR = 0x80
FUNC_ADDR = 0xC0
RECV = 0xF1
SEND = 0x33 # Car OBD Functional Address
headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr)
print(" Sending LIN", binascii.hexlify(headers+msg), hex(sum(bytearray(headers+msg))%0x100))
if not self.__silent:
print(" Sending LIN", binascii.hexlify(headers+msg),
hex(sum(bytearray(headers+msg))%0x100))
self.panda.kline_send(headers + msg)
def __reset_lin_timeout(self):
@ -144,7 +147,7 @@ class ELMCarSimulator():
if len(outmsg) <= 5:
self._lin_send(0x10, obd_header + outmsg)
else:
first_msg_len = min(4, len(outmsg)%4)
first_msg_len = min(4, len(outmsg)%4) or 4
self._lin_send(0x10, obd_header + b'\x01' +
b'\x00'*(4-first_msg_len) +
outmsg[:first_msg_len])
@ -165,9 +168,10 @@ class ELMCarSimulator():
while not self.__stop:
for address, ts, data, src in self.panda.can_recv():
if self.__on and src is 0 and len(data) == 8 and data[0] >= 2:
print("Processing CAN message", src, hex(address), binascii.hexlify(data))
if not self.__silent:
print("Processing CAN message", src, hex(address), binascii.hexlify(data))
self.__can_process_msg(data[1], data[2], address, ts, data, src)
else:
elif not self.__silent:
print("Rejecting CAN message", src, hex(address), binascii.hexlify(data))
def can_mode_11b(self):
@ -274,6 +278,8 @@ class ELMCarSimulator():
elif mode == 0x09: # Mode: Request vehicle information
if pid == 0x02: # Show VIN
return b"1D4GP00R55B123456"
if pid == 0xFC: # test long multi message. Ligned up for LIN responses
return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num+1) for num in range(80)))
if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
return b'\xAA\xAA\xAA' + b''.join(parts)
@ -281,7 +287,9 @@ class ELMCarSimulator():
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF:
return b"\xAA"*(0xFFF-3)
return b'\xAA\x00\x00' +\
b"".join(((b'\xAA'*5)+struct.pack(">H", num+1) for num in range(584)))
#return b"\xAA"*100#(0xFFF-3)
if __name__ == "__main__":