mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-03-02 21:53:53 +08:00
67d5208 fix signedness issue in toyota safety fe15d3f bump pandacan 11c2b08 add fault invalid 2c26e45 add sleep 27c7637 forgot the counter 3a6d7db don't hang bfa7d2e canloader works b259e2a can flasher is close to working 83f2edf isotp can support in softloader 7ae7c79 typo e85cc47 forgot the selfs 190b4f6 start work on canflasher 5c655c9 add recover support ae3457f usbflash is reliable f7a0ab0 pedal usbflash works 585d0f9 add way to call isotp be82899 despite it being bad code, move isotp 000715b start work on pedal canloader 626e312 pedal has a bootstub now 3662d1e redundant check 81e6b0d fix bug 083cd12 should have bounty to refactor that ish b65d30c bad asserts b2e6c3f isotp untested support for subaddr 30fd66a Merge pull request #93 from vntarasov/volt 06f5109 Merge pull request #94 from gregjhogan/can-printer-hex c7d098c Merge pull request #95 from gregjhogan/setup-script 22fe250 Merge pull request #99 from gregjhogan/bit-transition-example ba16ba3 Merge pull request #100 from gregjhogan/j2534-troubleshooting-instructions ad08ea4 Merge pull request #90 from gregjhogan/can-forwarding f3b6f5d added j2534 troubleshooting instructions 858d150 added script to find bits that transition from 0 to 1 c6acac8 added checking pedal interceptor message length f7226ff added brake safety checks d0c2634 added gas safety checks d378e4a removed bosch safety forwarding restriction on 29 bit addresses 5c7ef9e added bosch safety hooks and forwarding 90c64b6 add note 23de8d4 Merge pull request #97 from commaai/pedal_improvements 0261641 added missing python packages b92b235 fix bytearray encode issue 2434f1c Tweak Volt's brake pedal stickiness e2f73d2 enable has a whole byte to itself d5a9e1e correct checksum f8ed9fa better names 986a14c don't alias pointers 9b8472e add watchdog support 8f0add9 handle faults 1d917f8 split gas set into 2 values, and have a fault state 1b77026 j2534 isn't alpha anymore fbcc872 Merge pull request #92 from commaai/pedal 8a6f44b pedal is sending messages 08f464c python 3 bro is bad bro 9390961 kline checksum algo was broken... 3b7c33b add kline debug support aa622bc init values 631ea9f better refactor eb1fd75 add PEDAL adc sets ccaa310 don't build with usb 8d4d763 debug console works bd09883 comma pedal is building 75a29d5 Merge pull request #84 from gregjhogan/j2534-hds eece37d only the panda has gmlan 9f43abe Merge pull request #89 from vntarasov/volt 5364d43 Merge pull request #88 from vntarasov/smaller-firmware 377a1ec bump version for descriptor fix 4fabdf0 Merge pull request #87 from gregjhogan/usb-multi-packet-control 8580773 fix sending WinUSB Extended Properties Feature Descriptor 6908feb Chevy Volt safety 786a004 Enable optimization to reduce firmware size d70f43b hack to fix thinkpad 95ab1ae fixed flow control message padding bbd04d1 updated installer 62216d0 single standalone DLL for J2534 driver 5c9138d fixed 11 bit address issue f3b0ad2 fix LOOPBACK getting set when DATA_RATE is set b750d36 updated README a9a097f lowered CPU utilization 7c26a70 TIS needs unsupported protocols to return an error 42692b4 TIS doesn't like ChannelID being zero cf126bb SET_CONFIG return error for reserved parameters 2e99dbf fix HDS issues 8203cc8 add is_grey e946a54 add insecure_okay flag 4363b3e check webpage 4f59ded add secure mode note 6b11fb5 add autosecuring to tests b27d185 Merge pull request #86 from commaai/better_pairing 4b53b42 elm wifi isn't an automated test 99f85cb Merge pull request #85 from gregjhogan/usb-wcid 0d38060 auto-install WinUSB device driver c6653ca from python import 38cc0ee add wifi_secure_mode, boots in insecure mode git-subtree-dir: panda git-subtree-split: 67d52089a1300b86800d897f2b271e0a24cf6dd6
670 lines
25 KiB
Python
670 lines
25 KiB
Python
from __future__ import print_function
|
|
import os
|
|
import sys
|
|
import time
|
|
import socket
|
|
import select
|
|
import pytest
|
|
import struct
|
|
|
|
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
|
|
import elm_car_simulator
|
|
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", ".."))
|
|
from panda import Panda
|
|
|
|
def elm_connect():
|
|
s = socket.create_connection(("192.168.0.10", 35000))
|
|
s.setblocking(0)
|
|
return s
|
|
|
|
def read_or_fail(s):
|
|
ready = select.select([s], [], [], 4)
|
|
assert ready[0], "Socket did not receive data within the timeout duration."
|
|
return s.recv(1000)
|
|
|
|
def sendrecv(s, dat):
|
|
s.send(dat)
|
|
return read_or_fail(s)
|
|
|
|
def send_compare(s, dat, ret, timeout=4):
|
|
s.send(dat)
|
|
res = b''
|
|
while ret.startswith(res) and ret != res:
|
|
print("Waiting")
|
|
ready = select.select([s], [], [], timeout)
|
|
if not ready[0]:
|
|
print("current recv data:", repr(res))
|
|
break;
|
|
res += s.recv(1000)
|
|
#print("final recv data: '%s'" % repr(res))
|
|
assert ret == res#, "Data does not agree (%s) (%s)"%(repr(ret), repr(res))
|
|
|
|
def sync_reset(s):
|
|
s.send("ATZ\r")
|
|
res = b''
|
|
while not res.endswith("ELM327 v1.5\r\r>"):
|
|
res += read_or_fail(s)
|
|
print("Reset response is '%s'" % repr(res))
|
|
|
|
def test_reset():
|
|
s = socket.create_connection(("192.168.0.10", 35000))
|
|
s.setblocking(0)
|
|
|
|
try:
|
|
sync_reset(s)
|
|
finally:
|
|
s.close()
|
|
|
|
def test_elm_cli():
|
|
s = elm_connect()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
|
|
send_compare(s, b'ATI\r', b'ATI\rELM327 v1.5\r\r>')
|
|
|
|
#Test Echo Off
|
|
#Expected to be misimplimentation, but this is how the reference device behaved.
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') #Here is the odd part
|
|
send_compare(s, b'ATE0\r', b'OK\r\r>') #Should prob show this immediately
|
|
send_compare(s, b'ATI\r', b'ELM327 v1.5\r\r>')
|
|
|
|
#Test Newline On
|
|
send_compare(s, b'ATL1\r', b'OK\r\n\r\n>')
|
|
send_compare(s, b'ATI\r', b'ELM327 v1.5\r\n\r\n>')
|
|
send_compare(s, b'ATL0\r', b'OK\r\r>')
|
|
send_compare(s, b'ATI\r', b'ELM327 v1.5\r\r>')
|
|
|
|
send_compare(s, b'ATI\r', b'ELM327 v1.5\r\r>') #Test repeat command no echo
|
|
send_compare(s, b'\r', b'ELM327 v1.5\r\r>')
|
|
|
|
send_compare(s, b'aTi\r', b'ELM327 v1.5\r\r>') #Test different case
|
|
|
|
send_compare(s, b' a T i\r', b'ELM327 v1.5\r\r>') #Test with white space
|
|
|
|
send_compare(s, b'ATCATHAT\r', b'?\r\r>') #Test Invalid AT command
|
|
|
|
send_compare(s, b'01 00 00 00 00 00 00 00\r', b'?\r\r>') #Test Invalid (too long) OBD command
|
|
send_compare(s, b'01 GZ\r', b'?\r\r>') #Test Invalid (Non hex chars) OBD command
|
|
finally:
|
|
s.close()
|
|
|
|
def test_elm_setget_protocol():
|
|
s = elm_connect()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>") # Set auto
|
|
send_compare(s, b'ATDP\r', b"AUTO\r\r>")
|
|
send_compare(s, b'ATDPN\r', b"A0\r\r>")
|
|
|
|
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set protocol
|
|
send_compare(s, b'ATDP\r', b"ISO 15765-4 (CAN 11/500)\r\r>")
|
|
send_compare(s, b'ATDPN\r', b"6\r\r>")
|
|
|
|
send_compare(s, b'ATSPA6\r', b"OK\r\r>") # Set auto with protocol default
|
|
send_compare(s, b'ATDP\r', b"AUTO, ISO 15765-4 (CAN 11/500)\r\r>")
|
|
send_compare(s, b'ATDPN\r', b"A6\r\r>")
|
|
|
|
send_compare(s, b'ATSP7\r', b"OK\r\r>")
|
|
send_compare(s, b'ATDP\r', b"ISO 15765-4 (CAN 29/500)\r\r>")
|
|
send_compare(s, b'ATDPN\r', b"7\r\r>") #Test Does not accept invalid protocols
|
|
send_compare(s, b'ATSPD\r', b"?\r\r>")
|
|
send_compare(s, b'ATDP\r', b"ISO 15765-4 (CAN 29/500)\r\r>")
|
|
send_compare(s, b'ATDPN\r', b"7\r\r>")
|
|
finally:
|
|
s.close()
|
|
|
|
def test_elm_protocol_failure():
|
|
s = elm_connect()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"SEARCHING...\rUNABLE TO CONNECT\r\r>", timeout=10)
|
|
|
|
send_compare(s, b'ATSP1\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
|
|
send_compare(s, b'ATSP2\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
|
|
send_compare(s, b'ATSP3\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"BUS INIT: ...ERROR\r\r>")
|
|
|
|
send_compare(s, b'ATSP4\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"BUS INIT: ...ERROR\r\r>")
|
|
|
|
send_compare(s, b'ATSP5\r', b"OK\r\r>")
|
|
send_compare(s, b'0100\r', b"BUS INIT: ERROR\r\r>")
|
|
|
|
#send_compare(s, b'ATSP6\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSP7\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSP8\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSP9\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSPA\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSPB\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
#
|
|
#send_compare(s, b'ATSPC\r', b"OK\r\r>")
|
|
#send_compare(s, b'0100\r', b"NO DATA\r\r>")
|
|
finally:
|
|
s.close()
|
|
|
|
def test_elm_protocol_autodetect_ISO14230_KWP_FAST():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)#, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers ON
|
|
send_compare(s, b'ATS0\r', b"OK\r\r>")
|
|
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'010D\r', b"SEARCHING...\r410D53\r\r>", timeout=10)
|
|
send_compare(s, b'ATDPN\r', b"A5\r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_basic_send_lin():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)#, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP5\r', b"ATSP5\rOK\r\r>") # Set Proto
|
|
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'0100\r', b"BUS INIT: OK\r41 00 FF FF FF FE \r\r>")
|
|
send_compare(s, b'010D\r', b"41 0D 53 \r\r>")
|
|
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces Off
|
|
send_compare(s, b'0100\r', b"4100FFFFFFFE\r\r>")
|
|
send_compare(s, b'010D\r', b"410D53\r\r>")
|
|
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Spaces Off Headers On
|
|
send_compare(s, b'0100\r', b"86F1104100FFFFFFFEC3\r\r>")
|
|
send_compare(s, b'010D\r', b"83F110410D5325\r\r>")
|
|
|
|
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces On Headers On
|
|
send_compare(s, b'0100\r', b"86 F1 10 41 00 FF FF FF FE C3 \r\r>")
|
|
send_compare(s, b'010D\r', b"83 F1 10 41 0D 53 25 \r\r>")
|
|
|
|
send_compare(s, b'1F00\r', b"NO DATA\r\r>") # Unhandled msg, no response.
|
|
|
|
# Repeat last check to see if it still works after NO DATA was received
|
|
send_compare(s, b'0100\r', b"86 F1 10 41 00 FF FF FF FE C3 \r\r>")
|
|
send_compare(s, b'010D\r', b"83 F1 10 41 0D 53 25 \r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_send_lin_multiline_msg():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATSP5\r', b"OK\r\r>") # Set Proto
|
|
|
|
send_compare(s, b'0902\r', # headers OFF, Spaces ON
|
|
b"BUS INIT: OK\r"
|
|
"49 02 01 00 00 00 31 \r"
|
|
"49 02 02 44 34 47 50 \r"
|
|
"49 02 03 30 30 52 35 \r"
|
|
"49 02 04 35 42 31 32 \r"
|
|
"49 02 05 33 34 35 36 \r\r>")
|
|
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'0902\r', # Headers OFF, Spaces OFF
|
|
b"49020100000031\r"
|
|
"49020244344750\r"
|
|
"49020330305235\r"
|
|
"49020435423132\r"
|
|
"49020533343536\r\r>")
|
|
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
send_compare(s, b'0902\r', # Headers ON, Spaces OFF
|
|
b"87F1104902010000003105\r"
|
|
"87F11049020244344750E4\r"
|
|
"87F11049020330305235BD\r"
|
|
"87F11049020435423132B1\r"
|
|
"87F11049020533343536AA\r\r>")
|
|
|
|
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces ON
|
|
send_compare(s, b'0902\r', # Headers ON, Spaces ON
|
|
b"87 F1 10 49 02 01 00 00 00 31 05 \r"
|
|
"87 F1 10 49 02 02 44 34 47 50 E4 \r"
|
|
"87 F1 10 49 02 03 30 30 52 35 BD \r"
|
|
"87 F1 10 49 02 04 35 42 31 32 B1 \r"
|
|
"87 F1 10 49 02 05 33 34 35 36 AA \r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_send_lin_multiline_msg_throughput():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, can=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP5\r', b"ATSP5\rOK\r\r>") # Set Proto
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers OFF
|
|
|
|
send_compare(s, b'09fc\r', # headers OFF, Spaces OFF
|
|
b"BUS INIT: OK\r" +
|
|
b''.join((b'49FC' + hex(num+1)[2:].upper().zfill(2) +
|
|
b'AAAA' + hex(num+1)[2:].upper().zfill(4) + b'\r'
|
|
for num in range(80))) +
|
|
b"\r>",
|
|
timeout=10
|
|
)
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_panda_safety_mode_KWPFast():
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
p_car = Panda(serial) # Configure this so the messages will send
|
|
p_car.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
p_car.kline_drain()
|
|
|
|
p_elm = Panda("WIFI")
|
|
p_elm.set_safety_mode(0xE327);
|
|
|
|
def get_checksum(dat):
|
|
result = 0
|
|
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
|
|
return struct.pack("B", result % 0x100)
|
|
|
|
def timed_recv_check(p, bus, goodmsg):
|
|
t = time.time()
|
|
msg = bytearray()
|
|
|
|
while time.time()-t < 0.5 and len(msg) != len(goodmsg):
|
|
msg += p._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, len(goodmsg)-len(msg))
|
|
#print("Received", repr(msg))
|
|
if msg == goodmsg:
|
|
return True
|
|
time.sleep(0.01)
|
|
return False
|
|
|
|
def kline_send(p, x, bus=2):
|
|
p.kline_drain(bus=bus)
|
|
p._handle.bulkWrite(2, chr(bus).encode()+x)
|
|
return timed_recv_check(p, bus, x)
|
|
|
|
def did_send(priority, toaddr, fromaddr, dat, bus=2, checkbyte=None):
|
|
msgout = struct.pack("BBB", priority | len(dat), toaddr, fromaddr) + dat
|
|
msgout += get_checksum(msgout) if checkbyte is None else checkbyte
|
|
print("Sending", hex(priority), hex(toaddr), hex(fromaddr), repr(msgout))
|
|
|
|
if not kline_send(p_elm, msgout, bus=bus):
|
|
return False
|
|
return timed_recv_check(p_car, bus, msgout)
|
|
|
|
assert not did_send(0xC0, 0x33, 0xF1, b'\x01\x0F', bus=3) #wrong bus
|
|
assert not did_send(0xC0, 0x33, 0xF1, b'') #wrong length
|
|
assert not did_send(0xB0, 0x33, 0xF1, b'\x01\x0E') #bad priority
|
|
assert not did_send(0xC0, 0x00, 0xF1, b'\x01\x0D') #bad addr
|
|
assert not did_send(0xC0, 0x33, 0x00, b'\x01\x0C') #bad addr
|
|
|
|
assert did_send(0xC0, 0x33, 0xF1, b'\x01\x0B') #good! (obd func req)
|
|
|
|
def test_elm_lin_keepalive():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, can=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP5\r', b"ATSP5\rOK\r\r>") # Set Proto
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers OFF
|
|
|
|
send_compare(s, b'0100\r', b"BUS INIT: OK\r4100FFFFFFFE\r\r>")
|
|
assert sim.lin_active
|
|
time.sleep(6)
|
|
assert sim.lin_active
|
|
|
|
send_compare(s, b'ATPC\r', b"OK\r\r>") #STOP KEEPALIVE
|
|
assert sim.lin_active
|
|
time.sleep(6)
|
|
assert not sim.lin_active
|
|
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
#////////////
|
|
def test_elm_protocol_autodetect_ISO15765():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
send_compare(s, b'ATS0\r', b"OK\r\r>")
|
|
|
|
sim.can_mode_11b()
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'010D\r', b"SEARCHING...\r7E803410D53\r\r>", timeout=10)
|
|
send_compare(s, b'ATDPN\r', b"A6\r\r>")
|
|
|
|
sim.can_mode_29b()
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'010D\r', b"SEARCHING...\r18DAF11003410D53\r\r>", timeout=10)
|
|
send_compare(s, b'ATDPN\r', b"A7\r\r>")
|
|
|
|
sim.change_can_baud(250)
|
|
|
|
sim.can_mode_11b()
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'010D\r', b"SEARCHING...\r7E803410D53\r\r>", timeout=10)
|
|
send_compare(s, b'ATDPN\r', b"A8\r\r>")
|
|
|
|
sim.can_mode_29b()
|
|
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
|
send_compare(s, b'010D\r', b"SEARCHING...\r18DAF11003410D53\r\r>", timeout=10)
|
|
send_compare(s, b'ATDPN\r', b"A9\r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_basic_send_can():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP6\r', b"ATSP6\rOK\r\r>") # Set Proto
|
|
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'0100\r', b"41 00 FF FF FF FE \r\r>")
|
|
send_compare(s, b'010D\r', b"41 0D 53 \r\r>")
|
|
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces Off
|
|
send_compare(s, b'0100\r', b"4100FFFFFFFE\r\r>")
|
|
send_compare(s, b'010D\r', b"410D53\r\r>")
|
|
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Spaces Off Headers On
|
|
send_compare(s, b'0100\r', b"7E8064100FFFFFFFE\r\r>")
|
|
send_compare(s, b'010D\r', b"7E803410D53\r\r>")
|
|
|
|
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces On Headers On
|
|
send_compare(s, b'0100\r', b"7E8 06 41 00 FF FF FF FE \r\r>")
|
|
send_compare(s, b'010D\r', b"7E8 03 41 0D 53 \r\r>")
|
|
|
|
send_compare(s, b'1F00\r', b"NO DATA\r\r>") # Unhandled msg, no response.
|
|
|
|
# Repeat last check to see if it still works after NO DATA was received
|
|
send_compare(s, b'0100\r', b"7E8 06 41 00 FF FF FF FE \r\r>")
|
|
send_compare(s, b'010D\r', b"7E8 03 41 0D 53 \r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_send_can_multimsg():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
|
|
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
|
|
sim.can_add_extra_noise(b'\x03\x41\x0D\xFA', addr=0x7E9)# Inject message into the stream
|
|
send_compare(s, b'010D\r',
|
|
b"7E8 03 41 0D 53 \r"
|
|
"7E9 03 41 0D FA \r\r>") # Check it was ignored.
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
"""The ability to correctly filter out messages with the wrong PID is not
|
|
implemented correctly in the reference device."""
|
|
def test_elm_can_check_mode_pid():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers OFF
|
|
|
|
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
|
|
sim.can_add_extra_noise(b'\x03\x41\x0E\xFA')# Inject message into the stream
|
|
send_compare(s, b'010D\r', b"410D53\r\r>") # Check it was ignored.
|
|
send_compare(s, b'0100\r', b"4100FFFFFFFE\r\r>") # Check it was ignored again.
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_send_can_multiline_msg():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP6\r', b"ATSP6\rOK\r\r>") # Set Proto
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
|
|
send_compare(s, b'0902\r', # headers OFF, Spaces ON
|
|
b"014 \r"
|
|
"0: 49 02 01 31 44 34 \r"
|
|
"1: 47 50 30 30 52 35 35 \r"
|
|
"2: 42 31 32 33 34 35 36 \r\r>")
|
|
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'0902\r', # Headers OFF, Spaces OFF
|
|
b"014\r"
|
|
"0:490201314434\r"
|
|
"1:47503030523535\r"
|
|
"2:42313233343536\r\r>")
|
|
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
send_compare(s, b'0902\r', # Headers ON, Spaces OFF
|
|
b"7E81014490201314434\r"
|
|
"7E82147503030523535\r"
|
|
"7E82242313233343536\r\r>")
|
|
|
|
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces ON
|
|
send_compare(s, b'0902\r', # Headers ON, Spaces ON
|
|
b"7E8 10 14 49 02 01 31 44 34 \r"
|
|
"7E8 21 47 50 30 30 52 35 35 \r"
|
|
"7E8 22 42 31 32 33 34 35 36 \r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_send_can_multiline_msg_throughput():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATSP6\r', b"ATSP6\rOK\r\r>") # Set Proto
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
|
|
rows = 584
|
|
send_compare(s, b'09ff\r', # headers ON, Spaces OFF
|
|
("7E8" + "1" + hex((rows*7)+6)[2:].upper().zfill(3) + "49FF01"+"AA0000\r" +
|
|
"".join(
|
|
("7E82"+hex((num+1)%0x10)[2:].upper()+("AA"*5) +
|
|
hex(num+1)[2:].upper().zfill(4) + "\r" for num in range(rows))
|
|
) + "\r>").encode(),
|
|
timeout=10
|
|
)
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_interrupted_obd_cmd_resets_state():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
s.send(b"09fd\r")
|
|
ready = select.select([s], [], [], 4)
|
|
assert ready[0], "Socket did not receive data within the timeout duration."
|
|
s.send(b"ATI\r")
|
|
|
|
assert b"236\r0:49FD01AAAAAA\r" in s.recv(10000)
|
|
|
|
#Will likely have to be improved to scan for STOPPED if the FW gets more responsive.
|
|
ready = select.select([s], [], [], 4)
|
|
assert ready[0], "Socket did not receive data within the timeout duration."
|
|
|
|
assert b"STOPPED" in s.recv(10000)
|
|
|
|
sim.set_enable(False)
|
|
send_compare(s, b'09fd\r', b"NO DATA\r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_can_baud():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
|
|
sim.start()
|
|
|
|
try:
|
|
sync_reset(s)
|
|
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
|
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
|
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
|
|
|
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
|
|
send_compare(s, b'0100\r', b"7E8064100FFFFFFFE\r\r>")
|
|
|
|
send_compare(s, b'ATSP8\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/250)
|
|
send_compare(s, b'0100\r', b"CAN ERROR\r\r>")
|
|
|
|
sim.change_can_baud(250)
|
|
|
|
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
|
|
send_compare(s, b'0100\r', b"CAN ERROR\r\r>")
|
|
|
|
send_compare(s, b'ATSP8\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/250)
|
|
send_compare(s, b'0100\r', b"7E8064100FFFFFFFE\r\r>")
|
|
finally:
|
|
sim.stop()
|
|
sim.join()
|
|
s.close()
|
|
|
|
def test_elm_panda_safety_mode_ISO15765():
|
|
s = elm_connect()
|
|
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
|
p_car = Panda(serial) # Configure this so the messages will send
|
|
p_car.set_can_speed_kbps(0, 500)
|
|
p_car.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
|
|
|
p_elm = Panda("WIFI")
|
|
p_elm.set_safety_mode(0xE327);
|
|
|
|
#sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
|
|
#sim.start()
|
|
|
|
def did_send(p, addr, dat, bus):
|
|
p.can_send(addr, dat, bus)
|
|
t = time.time()
|
|
while time.time()-t < 0.5:
|
|
msg = p.can_recv()
|
|
for addrin, _, datin, busin in msg:
|
|
if (0x80 | bus) == busin and addr == addrin and datin == dat:
|
|
return True
|
|
time.sleep(0.01)
|
|
return False
|
|
|
|
try:
|
|
sync_reset(s) # Reset elm (which requests the ELM327 safety mode)
|
|
|
|
#29 bit
|
|
assert not did_send(p_elm, 0x18DB33F1, b'\x02\x01\x00\x00\x00\x00\x00\x00', 1) #wrong canid
|
|
assert not did_send(p_elm, 0x18DB33F1, b'\x02\x01\x00', 0) #wrong length
|
|
assert not did_send(p_elm, 0x10000000, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr
|
|
assert not did_send(p_elm, 0x18DAF133, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr (phy addr)
|
|
assert not did_send(p_elm, 0x18DAF000, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr
|
|
assert not did_send(p_elm, 0x18DAF1F3, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr! (phys rsp to elm)
|
|
|
|
assert did_send(p_elm, 0x18DB33F1, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #good! (obd func req)
|
|
assert did_send(p_elm, 0x18DA10F1, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #good! (phys response)
|
|
|
|
#11 bit
|
|
assert not did_send(p_elm, 0X7DF, b'\x02\x01\x00\x00\x00\x00\x00\x00', 1) #wrong canid
|
|
assert not did_send(p_elm, 0X7DF, b'\x02\x01\x00', 0) #wrong length
|
|
assert not did_send(p_elm, 0xAA, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr
|
|
assert not did_send(p_elm, 0x7DA, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr (phy addr)
|
|
assert not did_send(p_elm, 0x7E8, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #bad addr (sending 'response')
|
|
|
|
assert did_send(p_elm, 0x7DF, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #good! (obd func req)
|
|
assert did_send(p_elm, 0x7E1, b'\x02\x01\x00\x00\x00\x00\x00\x00', 0) #good! (phys response)
|
|
|
|
finally:
|
|
s.close()
|