diff --git a/python/uds.py b/python/uds.py index 6f6922a0b..f727b7507 100644 --- a/python/uds.py +++ b/python/uds.py @@ -141,6 +141,12 @@ class DYNAMIC_DEFINITION_TYPE(IntEnum): DEFINE_BY_MEMORY_ADDRESS = 2 CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3 +class ISOTP_FRAME_TYPE(IntEnum): + SINGLE = 0 + FIRST = 1 + CONSECUTIVE = 2 + FLOW = 3 + class DynamicSourceDefinition(NamedTuple): data_identifier: int position: int @@ -438,30 +444,29 @@ class IsoTpMessage(): timeout = self.timeout start_time = time.monotonic() - updated = False + rx_in_progress = False try: while True: for msg in self._can_client.recv(): - self._isotp_rx_next(msg) + frame_type = self._isotp_rx_next(msg) start_time = time.monotonic() - updated = True + rx_in_progress = frame_type == ISOTP_FRAME_TYPE.CONSECUTIVE if self.tx_done and self.rx_done: - return self.rx_dat, updated + return self.rx_dat, False # no timeout indicates non-blocking if timeout == 0: - return None, updated + return None, rx_in_progress if time.monotonic() - start_time > timeout: raise MessageTimeoutError("timeout waiting for response") finally: if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - {hex(self._can_client.rx_addr)} 0x{bytes.hex(self.rx_dat)}") - def _isotp_rx_next(self, rx_data: bytes) -> None: + def _isotp_rx_next(self, rx_data: bytes) -> ISOTP_FRAME_TYPE: # ISO 15765-2 specifies an eight byte CAN frame for ISO-TP communication assert len(rx_data) == self.max_len, f"isotp - rx: invalid CAN frame length: {len(rx_data)}" - # single rx_frame - if rx_data[0] >> 4 == 0x0: + if rx_data[0] >> 4 == ISOTP_FRAME_TYPE.SINGLE: self.rx_len = rx_data[0] & 0xFF assert self.rx_len < self.max_len, f"isotp - rx: invalid single frame length: {self.rx_len}" self.rx_dat = rx_data[1:1 + self.rx_len] @@ -469,9 +474,9 @@ class IsoTpMessage(): self.rx_done = True if self.debug: print(f"ISO-TP: RX - single frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") + return ISOTP_FRAME_TYPE.SINGLE - # first rx_frame - elif rx_data[0] >> 4 == 0x1: + elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.FIRST: self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1] assert self.max_len <= self.rx_len, f"isotp - rx: invalid first frame length: {self.rx_len}" self.rx_dat = rx_data[2:] @@ -483,9 +488,9 @@ class IsoTpMessage(): print(f"ISO-TP: TX - flow control continue - {hex(self._can_client.tx_addr)}") # send flow control message self._can_client.send([self.flow_control_msg]) + return ISOTP_FRAME_TYPE.FIRST - # consecutive rx frame - elif rx_data[0] >> 4 == 0x2: + elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.CONSECUTIVE: assert not self.rx_done, "isotp - rx: consecutive frame with no active frame" self.rx_idx += 1 assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index" @@ -498,9 +503,9 @@ class IsoTpMessage(): self._can_client.send([self.flow_control_msg]) if self.debug: print(f"ISO-TP: RX - consecutive frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") + return ISOTP_FRAME_TYPE.CONSECUTIVE - # flow control - elif rx_data[0] >> 4 == 0x3: + elif rx_data[0] >> 4 == ISOTP_FRAME_TYPE.FLOW: assert not self.tx_done, "isotp - rx: flow control with no active frame" assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort" assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid" @@ -533,6 +538,7 @@ class IsoTpMessage(): # wait (do nothing until next flow control message) if self.debug: print(f"ISO-TP: TX - flow control wait - {hex(self._can_client.tx_addr)}") + return ISOTP_FRAME_TYPE.FLOW # 4-15 - reserved else: