import sys import time import struct from enum import IntEnum class COMMAND_CODE(IntEnum): CONNECT = 0xFF DISCONNECT = 0xFE GET_STATUS = 0xFD SYNCH = 0xFC GET_COMM_MODE_INFO = 0xFB GET_ID = 0xFA SET_REQUEST = 0xF9 GET_SEED = 0xF8 UNLOCK = 0xF7 SET_MTA = 0xF6 UPLOAD = 0xF5 SHORT_UPLOAD = 0xF4 BUILD_CHECKSUM = 0xF3 TRANSPORT_LAYER_CMD = 0xF2 USER_CMD = 0xF1 DOWNLOAD = 0xF0 DOWNLOAD_NEXT = 0xEF DOWNLOAD_MAX = 0xEE SHORT_DOWNLOAD = 0xED MODIFY_BITS = 0xEC SET_CAL_PAGE = 0xEB GET_CAL_PAGE = 0xEA GET_PAG_PROCESSOR_INFO = 0xE9 GET_SEGMENT_INFO = 0xE8 GET_PAGE_INFO = 0xE7 SET_SEGMENT_MODE = 0xE6 GET_SEGMENT_MODE = 0xE5 COPY_CAL_PAGE = 0xE4 CLEAR_DAQ_LIST = 0xE3 SET_DAQ_PTR = 0xE2 WRITE_DAQ = 0xE1 SET_DAQ_LIST_MODE = 0xE0 GET_DAQ_LIST_MODE = 0xDF START_STOP_DAQ_LIST = 0xDE START_STOP_SYNCH = 0xDD GET_DAQ_CLOCK = 0xDC READ_DAQ = 0xDB GET_DAQ_PROCESSOR_INFO = 0xDA GET_DAQ_RESOLUTION_INFO = 0xD9 GET_DAQ_LIST_INFO = 0xD8 GET_DAQ_EVENT_INFO = 0xD7 FREE_DAQ = 0xD6 ALLOC_DAQ = 0xD5 ALLOC_ODT = 0xD4 ALLOC_ODT_ENTRY = 0xD3 PROGRAM_START = 0xD2 PROGRAM_CLEAR = 0xD1 PROGRAM = 0xD0 PROGRAM_RESET = 0xCF GET_PGM_PROCESSOR_INFO = 0xCE GET_SECTOR_INFO = 0xCD PROGRAM_PREPARE = 0xCC PROGRAM_FORMAT = 0xCB PROGRAM_NEXT = 0xCA PROGRAM_MAX = 0xC9 PROGRAM_VERIFY = 0xC8 ERROR_CODES = { 0x00: "Command processor synchronization", 0x10: "Command was not executed", 0x11: "Command rejected because DAQ is running", 0x12: "Command rejected because PGM is running", 0x20: "Unknown command or not implemented optional command", 0x21: "Command syntax invalid", 0x22: "Command syntax valid but command parameter(s) out of range", 0x23: "The memory location is write protected", 0x24: "The memory location is not accessible", 0x25: "Access denied, Seed & Key is required", 0x26: "Selected page not available", 0x27: "Selected page mode not available", 0x28: "Selected segment not valid", 0x29: "Sequence error", 0x2A: "DAQ configuration not valid", 0x30: "Memory overflow error", 0x31: "Generic error", 0x32: "The slave internal program verify routine detects an error", } class CONNECT_MODE(IntEnum): NORMAL = 0x00, USER_DEFINED = 0x01, class GET_ID_REQUEST_TYPE(IntEnum): ASCII = 0x00, ASAM_MC2_FILE = 0x01, ASAM_MC2_PATH = 0x02, ASAM_MC2_URL = 0x03, ASAM_MC2_UPLOAD = 0x04, # 128-255 user defined class CommandTimeoutError(Exception): pass class CommandCounterError(Exception): pass class CommandResponseError(Exception): def __init__(self, message, return_code): super().__init__() self.message = message self.return_code = return_code def __str__(self): return self.message class XcpClient(): def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, timeout: float=0.1, debug=False, pad=True): self.tx_addr = tx_addr self.rx_addr = rx_addr self.can_bus = bus self.timeout = timeout self.debug = debug self._panda = panda self._byte_order = ">" self._max_cto = 8 self._max_dto = 8 self.pad = pad def _send_cto(self, cmd: int, dat: bytes = b"") -> None: tx_data = (bytes([cmd]) + dat) # Some ECUs don't respond if the packets are not padded to 8 bytes if self.pad: tx_data = tx_data.ljust(8, b"\x00") if self.debug: print("CAN-CLEAR: TX") self._panda.can_clear(self.can_bus) if self.debug: print("CAN-CLEAR: RX") self._panda.can_clear(0xFFFF) if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(tx_data)}") self._panda.can_send(self.tx_addr, tx_data, self.can_bus) def _recv_dto(self, timeout: float) -> bytes: start_time = time.time() while time.time() - start_time < timeout: msgs = self._panda.can_recv() or [] if len(msgs) >= 256: print("CAN RX buffer overflow!!!", file=sys.stderr) for rx_addr, rx_data, rx_bus in msgs: if rx_bus == self.can_bus and rx_addr == self.rx_addr: rx_data = bytes(rx_data) # convert bytearray to bytes if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}") pid = rx_data[0] if pid == 0xFE: err = rx_data[1] err_desc = ERROR_CODES.get(err, "unknown error") dat = rx_data[2:] raise CommandResponseError(f"{hex(err)} - {err_desc} {dat}", err) return bytes(rx_data[1:]) time.sleep(0.001) raise CommandTimeoutError("timeout waiting for response") # commands def connect(self, connect_mode: CONNECT_MODE=CONNECT_MODE.NORMAL) -> dict: self._send_cto(COMMAND_CODE.CONNECT, bytes([connect_mode])) resp = self._recv_dto(self.timeout) assert len(resp) == 7, f"incorrect data length: {len(resp)}" self._byte_order = ">" if resp[1] & 0x01 else "<" self._slave_block_mode = resp[1] & 0x40 != 0 self._max_cto = resp[2] self._max_dto = struct.unpack(f"{self._byte_order}H", resp[3:5])[0] return { "cal_support": resp[0] & 0x01 != 0, "daq_support": resp[0] & 0x04 != 0, "stim_support": resp[0] & 0x08 != 0, "pgm_support": resp[0] & 0x10 != 0, "byte_order": self._byte_order, "address_granularity": 2**((resp[1] & 0x06) >> 1), "slave_block_mode": self._slave_block_mode, "optional": resp[1] & 0x80 != 0, "max_cto": self._max_cto, "max_dto": self._max_dto, "protocol_version": resp[5], "transport_version": resp[6], } def disconnect(self) -> None: self._send_cto(COMMAND_CODE.DISCONNECT) resp = self._recv_dto(self.timeout) assert len(resp) == 0, f"incorrect data length: {len(resp)}" def get_id(self, req_id_type: GET_ID_REQUEST_TYPE = GET_ID_REQUEST_TYPE.ASCII) -> dict: if req_id_type > 255: raise ValueError("request id type must be less than 255") self._send_cto(COMMAND_CODE.GET_ID, bytes([req_id_type])) resp = self._recv_dto(self.timeout) return { # mode = 0 means MTA was set # mode = 1 means data is at end (only CAN-FD has space for this) "mode": resp[0], "length": struct.unpack(f"{self._byte_order}I", resp[3:7])[0], "identifier": resp[7:] if self._max_cto > 8 else None } def get_seed(self, mode: int = 0) -> bytes: if mode > 255: raise ValueError("mode must be less than 255") self._send_cto(COMMAND_CODE.GET_SEED, bytes([0, mode])) # TODO: add support for longer seeds spread over multiple blocks ret = self._recv_dto(self.timeout) length = ret[0] return ret[1:length+1] def unlock(self, key: bytes) -> bytes: # TODO: add support for longer keys spread over multiple blocks self._send_cto(COMMAND_CODE.UNLOCK, bytes([len(key)]) + key) return self._recv_dto(self.timeout) def set_mta(self, addr: int, addr_ext: int = 0) -> bytes: if addr_ext > 255: raise ValueError("address extension must be less than 256") # TODO: this looks broken (missing addr extension) self._send_cto(COMMAND_CODE.SET_MTA, bytes([0x00, 0x00, addr_ext]) + struct.pack(f"{self._byte_order}I", addr)) return self._recv_dto(self.timeout) def upload(self, size: int) -> bytes: if size > 255: raise ValueError("size must be less than 256") if not self._slave_block_mode and size > self._max_dto - 1: raise ValueError("block mode not supported") self._send_cto(COMMAND_CODE.UPLOAD, bytes([size])) resp = b"" while len(resp) < size: resp += self._recv_dto(self.timeout)[:size - len(resp) + 1] return resp[:size] # trim off bytes with undefined values def short_upload(self, size: int, addr_ext: int, addr: int) -> bytes: if size > 6: raise ValueError("size must be less than 7") if addr_ext > 255: raise ValueError("address extension must be less than 256") self._send_cto(COMMAND_CODE.SHORT_UPLOAD, bytes([size, 0x00, addr_ext]) + struct.pack(f"{self._byte_order}I", addr)) return self._recv_dto(self.timeout)[:size] # trim off bytes with undefined values def download(self, data: bytes) -> bytes: size = len(data) if size > 255: raise ValueError("size must be less than 256") if not self._slave_block_mode and size > self._max_dto - 2: raise ValueError("block mode not supported") self._send_cto(COMMAND_CODE.DOWNLOAD, bytes([size]) + data) return self._recv_dto(self.timeout)[:size]