uds: handle function addrs and fw version query example

This commit is contained in:
Greg Hogan 2019-11-16 18:54:15 -08:00
parent 6626a54241
commit 8138fc14c8
3 changed files with 97 additions and 68 deletions

View File

@ -1,60 +0,0 @@
#!/usr/bin/env python3
from panda import Panda
from panda.python.uds import UdsClient, NegativeResponseError, DATA_IDENTIFIER_TYPE
if __name__ == "__main__":
address = 0x18da30f1 # Honda EPS
panda = Panda()
panda.set_safety_mode(Panda.SAFETY_ELM327)
uds_client = UdsClient(panda, address, bus=1 if panda.has_obd() else 0, debug=False)
print("tester present ...")
uds_client.tester_present()
try:
print("")
print("read data by id: boot software id ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)
try:
print("")
print("read data by id: application software id ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)
try:
print("")
print("read data by id: application data id ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)
try:
print("")
print("read data by id: boot software fingerprint ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)
try:
print("")
print("read data by id: application software fingerprint ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)
try:
print("")
print("read data by id: application data fingerprint ...")
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT)
print(data.decode('utf-8'))
except NegativeResponseError as e:
print(e)

71
examples/query_fw_versions.py Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
from tqdm import tqdm
from panda import Panda
from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, DATA_IDENTIFIER_TYPE
if __name__ == "__main__":
addrs = [0x700 + i for i in range(256)]
addrs += [0x18da0000 + (i<<8) + 0xf1 for i in range(256)]
results = {}
panda = Panda()
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
print("querying addresses ...")
for addr in tqdm(addrs):
# skip functional broadcast addrs
if addr == 0x7df or addr == 0x18db33f1:
continue
uds_client = UdsClient(panda, addr, bus=1 if panda.has_obd() else 0, timeout=0.1, debug=False)
try:
uds_client.tester_present()
except NegativeResponseError:
pass
except MessageTimeoutError:
continue
resp = {}
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass
try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data
except NegativeResponseError:
pass
if resp.keys():
results[addr] = resp
print("results:")
for addr, resp in results.items():
for id, dat in resp.items():
print(hex(addr), hex(id), dat.decode())

View File

@ -3,6 +3,7 @@ import time
import struct
from typing import Callable, NamedTuple, Tuple, List
from enum import IntEnum
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
@ -276,6 +277,23 @@ class CanClient():
self.bus = bus
self.debug = debug
def _recv_filter(self, bus, addr):
# handle functionl addresses (switch to first addr to respond)
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = addr-8
self.rx_addr = addr
return is_response
if self.tx_addr == 0x18DB33F1:
is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = 0x18DA00F1 + (addr<<8 & 0xFF00)
self.rx_addr = addr
return bus == self.bus and addr == self.rx_addr
def recv(self, drain=False) -> List[bytes]:
msg_array = []
while True:
@ -284,7 +302,7 @@ class CanClient():
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if rx_bus == self.bus and rx_addr == self.rx_addr and len(rx_data) > 0:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
msg_array.append(rx_data)
@ -295,7 +313,7 @@ class CanClient():
def send(self, msgs: List[bytes], delay: float=0) -> None:
first = True
for msg in msgs:
if not first and delay:
if delay and not first:
if self.debug: print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
@ -317,6 +335,11 @@ class IsoTpMessage():
self.tx_idx = 0
self.tx_done = False
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}")
self._tx_first_frame()
@ -333,11 +356,6 @@ class IsoTpMessage():
self._can_client.send([msg])
def recv(self) -> bytes:
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False
start_time = time.time()
try:
while True:
@ -351,7 +369,7 @@ class IsoTpMessage():
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame