diff --git a/examples/query_fw_versions.py b/examples/query_fw_versions.py index 6795f62c..aaf54f41 100755 --- a/examples/query_fw_versions.py +++ b/examples/query_fw_versions.py @@ -6,7 +6,7 @@ from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseErr if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--rxoffset', default="0x8") + parser.add_argument('--rxoffset', default="") parser.add_argument('--nonstandard', action='store_true') parser.add_argument('--debug', action='store_true') parser.add_argument('--addr') @@ -31,8 +31,8 @@ if __name__ == "__main__": uds_data_ids[uds_id] = "IDENTIFICATION_OPTION_SYSTEM_SUPPLIER_SPECIFIC" panda = Panda() - panda.set_safety_mode(Panda.SAFETY_ELM327) panda.set_power_save(0) + panda.set_safety_mode(Panda.SAFETY_ELM327) print("querying addresses ...") with tqdm(addrs) as t: for addr in t: @@ -40,8 +40,11 @@ if __name__ == "__main__": if addr == 0x7df or addr == 0x18db33f1: continue t.set_description(hex(addr)) + panda.send_heartbeat() - uds_client = UdsClient(panda, addr, addr + int(args.rxoffset, base=16), bus=1 if panda.has_obd() else 0, timeout=0.2, debug=args.debug) + bus = 1 if panda.has_obd() else 0 + rx_addr = addr + int(args.rxoffset, base=16) if args.rxoffset else None + uds_client = UdsClient(panda, addr, rx_addr, bus, timeout=0.2, debug=args.debug) # Check for anything alive at this address, and switch to the highest # available diagnostic session without security access try: diff --git a/python/uds.py b/python/uds.py index 5106e6de..48fcf532 100644 --- a/python/uds.py +++ b/python/uds.py @@ -4,6 +4,7 @@ import struct from collections import deque from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast from enum import IntEnum +from functools import partial class SERVICE_TYPE(IntEnum): DIAGNOSTIC_SESSION_CONTROL = 0x10 @@ -376,39 +377,43 @@ class IsoTpMessage(): self.rx_done = False if self.debug: - print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}") + print(f"ISO-TP: REQUEST - {hex(self._can_client.tx_addr)} 0x{bytes.hex(self.tx_dat)}") self._tx_first_frame() def _tx_first_frame(self) -> None: if self.tx_len < self.max_len: # single frame (send all bytes) if self.debug: - print("ISO-TP: TX - single frame") + print(f"ISO-TP: TX - single frame - {hex(self._can_client.tx_addr)}") msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00") self.tx_done = True else: # first frame (send first 6 bytes) if self.debug: - print("ISO-TP: TX - first frame") + print(f"ISO-TP: TX - first frame - {hex(self._can_client.tx_addr)}") msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00") self._can_client.send([msg]) - def recv(self) -> Optional[bytes]: - start_time = time.time() + def recv(self, timeout=None) -> Optional[bytes]: + if timeout is None: + timeout = self.timeout + + start_time = time.monotonic() try: while True: for msg in self._can_client.recv(): self._isotp_rx_next(msg) + start_time = time.monotonic() if self.tx_done and self.rx_done: return self.rx_dat # no timeout indicates non-blocking - if self.timeout == 0: + if timeout == 0: return None - if time.time() - start_time > self.timeout: + if time.monotonic() - start_time > timeout: raise MessageTimeoutError("timeout waiting for response") finally: if self.debug and self.rx_dat: - print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}") + print(f"ISO-TP: RESPONSE - {hex(self._can_client.rx_addr)} 0x{bytes.hex(self.rx_dat)}") def _isotp_rx_next(self, rx_data: bytes) -> None: # single rx_frame @@ -418,7 +423,7 @@ class IsoTpMessage(): self.rx_idx = 0 self.rx_done = True if self.debug: - print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}") + print(f"ISO-TP: RX - single frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") return # first rx_frame @@ -428,9 +433,9 @@ class IsoTpMessage(): self.rx_idx = 0 self.rx_done = False if self.debug: - print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}") + print(f"ISO-TP: RX - first frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") if self.debug: - print("ISO-TP: TX - flow control continue") + print(f"ISO-TP: TX - flow control continue - {hex(self._can_client.tx_addr)}") # send flow control message (send all bytes) msg = b"\x30\x00\x00".ljust(self.max_len, b"\x00") self._can_client.send([msg]) @@ -446,7 +451,7 @@ class IsoTpMessage(): if self.rx_len == len(self.rx_dat): self.rx_done = True if self.debug: - print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}") + print(f"ISO-TP: RX - consecutive frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") return # flow control @@ -456,7 +461,7 @@ class IsoTpMessage(): assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid" if rx_data[0] == 0x30: if self.debug: - print("ISO-TP: RX - flow control continue") + print(f"ISO-TP: RX - flow control continue - {hex(self._can_client.tx_addr)}") delay_ts = rx_data[2] & 0x7F # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. @@ -478,11 +483,11 @@ class IsoTpMessage(): if end >= self.tx_len: self.tx_done = True if self.debug: - print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}") + print(f"ISO-TP: TX - consecutive frame - {hex(self._can_client.tx_addr)} idx={self.tx_idx} done={self.tx_done}") elif rx_data[0] == 0x31: # wait (do nothing until next flow control message) if self.debug: - print("ISO-TP: TX - flow control wait") + print(f"ISO-TP: TX - flow control wait - {hex(self._can_client.tx_addr)}") FUNCTIONAL_ADDRS = [0x7DF, 0x18DB33F1] @@ -503,13 +508,16 @@ def get_rx_addr_for_tx_addr(tx_addr, rx_offset=0x8): class UdsClient(): - def __init__(self, panda, tx_addr: int, rx_addr: int = None, bus: int = 0, timeout: float = 1, debug: bool = False): + def __init__(self, panda, tx_addr: int, rx_addr: int = None, bus: int = 0, timeout: float = 1, debug: bool = False, + tx_timeout: float = 1, response_pending_timeout: float = 10): self.bus = bus self.tx_addr = tx_addr self.rx_addr = rx_addr if rx_addr is not None else get_rx_addr_for_tx_addr(tx_addr) self.timeout = timeout self.debug = debug - self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, self.rx_addr, self.bus, debug=self.debug) + can_send_with_timeout = partial(panda.can_send, timeout=int(tx_timeout*1000)) + self._can_client = CanClient(can_send_with_timeout, panda.can_recv, self.tx_addr, self.rx_addr, self.bus, debug=self.debug) + self.response_pending_timeout = response_pending_timeout # generic uds request def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int = None, data: bytes = None) -> bytes: @@ -522,12 +530,15 @@ class UdsClient(): # send request, wait for response isotp_msg = IsoTpMessage(self._can_client, self.timeout, self.debug) isotp_msg.send(req) + response_pending = False while True: - resp = isotp_msg.recv() + timeout = self.response_pending_timeout if response_pending else self.timeout + resp = isotp_msg.recv(timeout) if resp is None: continue + response_pending = False resp_sid = resp[0] if len(resp) > 0 else None # negative response @@ -544,6 +555,7 @@ class UdsClient(): error_desc = resp[3:].hex() # wait for another message if response pending if error_code == 0x78: + response_pending = True if self.debug: print("UDS-RX: response pending") continue @@ -654,7 +666,7 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: - raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + raise ValueError('invalid response data identifier: {} expected: {}'.format(hex(resp_id), hex(data_identifier_type))) return resp[2:] def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1):