Move isotp.py, ccp.py and xcp.py to opendbc (#2166)

* Move ccp.py and xcp.py to opendbc

* move isotp

* more cleanup
This commit is contained in:
Willem Melching
2025-02-26 19:41:16 +01:00
committed by GitHub
parent 0924df1e8e
commit a744fa7780
7 changed files with 2 additions and 837 deletions

View File

@@ -1,9 +1,8 @@
from .python.constants import McuType, BASEDIR, FW_PATH, USBPACKET_MAX_SIZE # noqa: F401
from .python.spi import PandaSpiException, PandaProtocolMismatch, STBootloaderSPIHandle # noqa: F401
from .python.serial import PandaSerial # noqa: F401
from .python.canhandle import CanHandle # noqa: F401
from .python.utils import logger # noqa: F401
from .python import (Panda, PandaDFU, isotp, # noqa: F401
from .python import (Panda, PandaDFU, # noqa: F401
pack_can_buffer, unpack_can_buffer, calculate_checksum,
DLC_TO_LEN, LEN_TO_DLC, CANPACKET_HEAD_SIZE)

View File

@@ -4,7 +4,7 @@ import struct
from opendbc.car.structs import CarParams
from panda import Panda
from hexdump import hexdump
from panda.python.isotp import isotp_send, isotp_recv
from opendbc.car.isotp import isotp_send, isotp_recv
# 0x7e0 = Toyota
# 0x18DB33F1 for Honda?

View File

@@ -14,7 +14,6 @@ from opendbc.car.structs import CarParams
from .base import BaseHandle
from .constants import FW_PATH, McuType
from .dfu import PandaDFU
from .isotp import isotp_send, isotp_recv
from .spi import PandaSpiHandle, PandaSpiException, PandaProtocolMismatch
from .usb import PandaUsbHandle
from .utils import logger
@@ -795,14 +794,6 @@ class Panda:
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
# ******************* isotp *******************
def isotp_send(self, addr, dat, bus, recvaddr=None, subaddr=None):
return isotp_send(self, dat, addr, bus, recvaddr, subaddr)
def isotp_recv(self, addr, bus=0, sendaddr=None, subaddr=None):
return isotp_recv(self, addr, bus, sendaddr, subaddr)
# ******************* serial *******************
def serial_read(self, port_number):

View File

@@ -1,53 +0,0 @@
import struct
import signal
from .base import BaseHandle
class CanHandle(BaseHandle):
def __init__(self, p, bus):
self.p = p
self.bus = bus
def transact(self, dat):
def _handle_timeout(signum, frame):
# will happen on reset or can error
raise TimeoutError
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(1)
try:
self.p.isotp_send(1, dat, self.bus, recvaddr=2)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(1)
try:
ret = self.p.isotp_recv(2, self.bus, sendaddr=1)
finally:
signal.alarm(0)
return ret
def close(self):
pass
def controlWrite(self, request_type, request, value, index, data, timeout=0, expect_disconnect=False):
# ignore data in reply, panda doesn't use it
return self.controlRead(request_type, request, value, index, 0, timeout)
def controlRead(self, request_type, request, value, index, length, timeout=0):
dat = struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length)
return self.transact(dat)
def bulkWrite(self, endpoint, data, timeout=0):
if len(data) > 0x10:
raise ValueError("Data must not be longer than 0x10")
dat = struct.pack("HH", endpoint, len(data)) + data
return self.transact(dat)
def bulkRead(self, endpoint, length, timeout=0):
dat = struct.pack("HH", endpoint, 0)
return self.transact(dat)

View File

@@ -1,374 +0,0 @@
import sys
import time
import struct
from enum import IntEnum, Enum
from dataclasses import dataclass
from typing import Optional
@dataclass
class ExchangeStationIdsReturn:
id_length: int
data_type: int
available: int
protected: int
@dataclass
class GetDaqListSizeReturn:
list_size: int
first_pid: int
@dataclass
class GetSessionStatusReturn:
status: int
info: Optional[int]
@dataclass
class DiagnosticServiceReturn:
length: int
type: int
@dataclass
class ActionServiceReturn:
length: int
type: int
class COMMAND_CODE(IntEnum):
CONNECT = 0x01
SET_MTA = 0x02
DNLOAD = 0x03
UPLOAD = 0x04
TEST = 0x05
START_STOP = 0x06
DISCONNECT = 0x07
START_STOP_ALL = 0x08
GET_ACTIVE_CAL_PAGE = 0x09
SET_S_STATUS = 0x0C
GET_S_STATUS = 0x0D
BUILD_CHKSUM = 0x0E
SHORT_UP = 0x0F
CLEAR_MEMORY = 0x10
SELECT_CAL_PAGE = 0x11
GET_SEED = 0x12
UNLOCK = 0x13
GET_DAQ_SIZE = 0x14
SET_DAQ_PTR = 0x15
WRITE_DAQ = 0x16
EXCHANGE_ID = 0x17
PROGRAM = 0x18
MOVE = 0x19
GET_CCP_VERSION = 0x1B
DIAG_SERVICE = 0x20
ACTION_SERVICE = 0x21
PROGRAM_6 = 0x22
DNLOAD_6 = 0x23
COMMAND_RETURN_CODES = {
0x00: "acknowledge / no error",
0x01: "DAQ processor overload",
0x10: "command processor busy",
0x11: "DAQ processor busy",
0x12: "internal timeout",
0x18: "key request",
0x19: "session status request",
0x20: "cold start request",
0x21: "cal. data init. request",
0x22: "DAQ list init. request",
0x23: "code update request",
0x30: "unknown command",
0x31: "command syntax",
0x32: "parameter(s) out of range",
0x33: "access denied",
0x34: "overload",
0x35: "access locked",
0x36: "resource/function not available",
}
class BYTE_ORDER(Enum):
LITTLE_ENDIAN = '<'
BIG_ENDIAN = '>'
class CommandTimeoutError(Exception):
pass
class CommandCounterError(Exception):
pass
class CommandResponseError(Exception):
def __init__(self, message, return_code):
super().__init__()
self.message = message
self.return_code = return_code
def __str__(self):
return self.message
class CcpClient():
def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, byte_order: BYTE_ORDER=BYTE_ORDER.BIG_ENDIAN, debug=False):
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.can_bus = bus
self.byte_order = byte_order
self.debug = debug
self._panda = panda
self._command_counter = -1
def _send_cro(self, cmd: int, dat: bytes = b"") -> None:
self._command_counter = (self._command_counter + 1) & 0xFF
tx_data = (bytes([cmd, self._command_counter]) + dat).ljust(8, b"\x00")
if self.debug:
print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(tx_data)}")
assert len(tx_data) == 8, "data is not 8 bytes"
self._panda.can_clear(self.can_bus)
self._panda.can_clear(0xFFFF)
self._panda.can_send(self.tx_addr, tx_data, self.can_bus)
def _recv_dto(self, timeout: float) -> bytes:
start_time = time.time()
while time.time() - start_time < timeout:
msgs = self._panda.can_recv() or []
if len(msgs) >= 256:
print("CAN RX buffer overflow!!!", file=sys.stderr)
for rx_addr, rx_data_bytearray, rx_bus in msgs:
if rx_bus == self.can_bus and rx_addr == self.rx_addr:
rx_data = bytes(rx_data_bytearray)
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
assert len(rx_data) == 8, f"message length not 8: {len(rx_data)}"
pid = rx_data[0]
if pid == 0xFF or pid == 0xFE:
err = rx_data[1]
err_desc = COMMAND_RETURN_CODES.get(err, "unknown error")
ctr = rx_data[2]
dat = rx_data[3:]
if pid == 0xFF and self._command_counter != ctr:
raise CommandCounterError(f"counter invalid: {ctr} != {self._command_counter}")
if err >= 0x10 and err <= 0x12:
if self.debug:
print(f"CCP-WAIT: {hex(err)} - {err_desc}")
start_time = time.time()
continue
if err >= 0x30:
raise CommandResponseError(f"{hex(err)} - {err_desc}", err)
else:
dat = rx_data[1:]
return dat
time.sleep(0.001)
raise CommandTimeoutError("timeout waiting for response")
# commands
def connect(self, station_addr: int) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.CONNECT, struct.pack("<H", station_addr))
self._recv_dto(0.025)
def exchange_station_ids(self, device_id_info: bytes = b"") -> ExchangeStationIdsReturn:
self._send_cro(COMMAND_CODE.EXCHANGE_ID, device_id_info)
resp = self._recv_dto(0.025)
return ExchangeStationIdsReturn(id_length=resp[0], data_type=resp[1], available=resp[2], protected=resp[3])
def get_seed(self, resource_mask: int) -> bytes:
if resource_mask > 255:
raise ValueError("resource mask must be less than 256")
self._send_cro(COMMAND_CODE.GET_SEED, bytes([resource_mask]))
resp = self._recv_dto(0.025)
# protected = resp[0] == 0
seed = resp[1:]
return seed
def unlock(self, key: bytes) -> int:
if len(key) > 6:
raise ValueError("max key size is 6 bytes")
self._send_cro(COMMAND_CODE.UNLOCK, key)
resp = self._recv_dto(0.025)
status = resp[0]
return status
def set_memory_transfer_address(self, mta_num: int, addr_ext: int, addr: int) -> None:
if mta_num > 255:
raise ValueError("MTA number must be less than 256")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.SET_MTA, bytes([mta_num, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
self._recv_dto(0.025)
def download(self, data: bytes) -> int:
if len(data) > 5:
raise ValueError("max data size is 5 bytes")
self._send_cro(COMMAND_CODE.DNLOAD, bytes([len(data)]) + data)
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def download_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
raise ValueError("data size must be 6 bytes")
self._send_cro(COMMAND_CODE.DNLOAD_6, data)
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def upload(self, size: int) -> bytes:
if size > 5:
raise ValueError("size must be less than 6")
self._send_cro(COMMAND_CODE.UPLOAD, bytes([size]))
return self._recv_dto(0.025)[:size]
def short_upload(self, size: int, addr_ext: int, addr: int) -> bytes:
if size > 5:
raise ValueError("size must be less than 6")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.SHORT_UP, bytes([size, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
return self._recv_dto(0.025)[:size]
def select_calibration_page(self) -> None:
self._send_cro(COMMAND_CODE.SELECT_CAL_PAGE)
self._recv_dto(0.025)
def get_daq_list_size(self, list_num: int, can_id: int = 0) -> GetDaqListSizeReturn:
if list_num > 255:
raise ValueError("list number must be less than 256")
self._send_cro(COMMAND_CODE.GET_DAQ_SIZE, bytes([list_num, 0]) + struct.pack(f"{self.byte_order.value}I", can_id))
resp = self._recv_dto(0.025)
return GetDaqListSizeReturn(list_size=resp[0], first_pid=resp[1])
def set_daq_list_pointer(self, list_num: int, odt_num: int, element_num: int) -> None:
if list_num > 255:
raise ValueError("list number must be less than 256")
if odt_num > 255:
raise ValueError("ODT number must be less than 256")
if element_num > 255:
raise ValueError("element number must be less than 256")
self._send_cro(COMMAND_CODE.SET_DAQ_PTR, bytes([list_num, odt_num, element_num]))
self._recv_dto(0.025)
def write_daq_list_entry(self, size: int, addr_ext: int, addr: int) -> None:
if size > 255:
raise ValueError("size must be less than 256")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cro(COMMAND_CODE.WRITE_DAQ, bytes([size, addr_ext]) + struct.pack(f"{self.byte_order.value}I", addr))
self._recv_dto(0.025)
def start_stop_transmission(self, mode: int, list_num: int, odt_num: int, channel_num: int, rate_prescaler: int = 0) -> None:
if mode > 255:
raise ValueError("mode must be less than 256")
if list_num > 255:
raise ValueError("list number must be less than 256")
if odt_num > 255:
raise ValueError("ODT number must be less than 256")
if channel_num > 255:
raise ValueError("channel number must be less than 256")
if rate_prescaler > 65535:
raise ValueError("rate prescaler must be less than 65536")
self._send_cro(COMMAND_CODE.START_STOP, bytes([mode, list_num, odt_num, channel_num]) + struct.pack(f"{self.byte_order.value}H", rate_prescaler))
self._recv_dto(0.025)
def disconnect(self, station_addr: int, temporary: bool = False) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.DISCONNECT, bytes([int(not temporary), 0x00]) + struct.pack("<H", station_addr))
self._recv_dto(0.025)
def set_session_status(self, status: int) -> None:
if status > 255:
raise ValueError("status must be less than 256")
self._send_cro(COMMAND_CODE.SET_S_STATUS, bytes([status]))
self._recv_dto(0.025)
def get_session_status(self) -> GetSessionStatusReturn:
self._send_cro(COMMAND_CODE.GET_S_STATUS)
resp = self._recv_dto(0.025)
info = resp[2] if resp[1] else None
return GetSessionStatusReturn(status=resp[0], info=info)
def build_checksum(self, size: int) -> bytes:
self._send_cro(COMMAND_CODE.BUILD_CHKSUM, struct.pack(f"{self.byte_order.value}I", size))
resp = self._recv_dto(30.0)
chksum_size = resp[0]
assert chksum_size <= 4, "checksum more than 4 bytes"
chksum = resp[1:1+chksum_size]
return chksum
def clear_memory(self, size: int) -> None:
self._send_cro(COMMAND_CODE.CLEAR_MEMORY, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(30.0)
def program(self, size: int, data: bytes) -> int:
if size > 5:
raise ValueError("size must be less than 6")
if len(data) > 5:
raise ValueError("max data size is 5 bytes")
self._send_cro(COMMAND_CODE.PROGRAM, bytes([size]) + data)
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def program_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
raise ValueError("data size must be 6 bytes")
self._send_cro(COMMAND_CODE.PROGRAM_6, data)
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr # type: ignore
def move_memory_block(self, size: int) -> None:
self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(0.025)
def diagnostic_service(self, service_num: int, data: bytes = b"") -> DiagnosticServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.DIAG_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return DiagnosticServiceReturn(length=resp[0], type=resp[1])
def action_service(self, service_num: int, data: bytes = b"") -> ActionServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.ACTION_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return ActionServiceReturn(length=resp[0], type=resp[1])
def test_availability(self, station_addr: int) -> None:
if station_addr > 65535:
raise ValueError("station address must be less than 65536")
# NOTE: station address is always little endian
self._send_cro(COMMAND_CODE.TEST, struct.pack("<H", station_addr))
self._recv_dto(0.025)
def start_stop_synchronised_transmission(self, mode: int) -> None:
if mode > 255:
raise ValueError("mode must be less than 256")
self._send_cro(COMMAND_CODE.START_STOP_ALL, bytes([mode]))
self._recv_dto(0.025)
def get_active_calibration_page(self):
self._send_cro(COMMAND_CODE.GET_ACTIVE_CAL_PAGE)
resp = self._recv_dto(0.025)
# cal_addr_ext = resp[0]
cal_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return cal_addr
def get_version(self, desired_version: float = 2.1) -> float:
major, minor = map(int, str(desired_version).split("."))
self._send_cro(COMMAND_CODE.GET_CCP_VERSION, bytes([major, minor]))
resp = self._recv_dto(0.025)
return float(f"{resp[0]}.{resp[1]}")

View File

@@ -1,140 +0,0 @@
import binascii
import time
DEBUG = False
def msg(x):
if DEBUG:
print("S:", binascii.hexlify(x))
assert len(x) <= 7
ret = bytes([len(x)]) + x
return ret.ljust(8, b"\x00")
kmsgs = []
def recv(panda, cnt, addr, nbus):
global kmsgs
ret = []
while len(ret) < cnt:
kmsgs += panda.can_recv()
nmsgs = []
for ids, dat, bus in kmsgs:
if ids == addr and bus == nbus and len(ret) < cnt:
ret.append(dat)
else:
# leave around
nmsgs.append((ids, dat, bus))
kmsgs = nmsgs[-256:]
return ret
def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr):
msg = recv(panda, 1, addr, bus)[0]
# TODO: handle other subaddr also communicating
assert msg[0] == subaddr
if msg[1] & 0xf0 == 0x10:
# first
tlen = ((msg[1] & 0xf) << 8) | msg[2]
dat = msg[3:]
# 0 block size?
CONTINUE = bytes([subaddr]) + b"\x30" + b"\x00" * 6
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen - len(dat) + 5) // 6, addr, bus):
assert mm[0] == subaddr
assert mm[1] == (0x20 | (idx & 0xF))
dat += mm[2:]
idx += 1
elif msg[1] & 0xf0 == 0x00:
# single
tlen = msg[1] & 0xf
dat = msg[2:]
else:
print(binascii.hexlify(msg))
raise AssertionError
return dat[0:tlen]
# **** import below this line ****
def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None):
if recvaddr is None:
recvaddr = addr + 8
if len(x) <= 7 and subaddr is None:
panda.can_send(addr, msg(x), bus)
elif len(x) <= 6 and subaddr is not None:
panda.can_send(addr, bytes([subaddr]) + msg(x)[0:7], bus)
else:
if subaddr:
ss = bytes([subaddr, 0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:5]
x = x[5:]
else:
ss = bytes([0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:6]
x = x[6:]
idx = 1
sends = []
while len(x) > 0:
if subaddr:
sends.append((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00"))
x = x[6:]
else:
sends.append((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00"))
x = x[7:]
idx += 1
# actually send
panda.can_send(addr, ss, bus)
rr = recv(panda, 1, recvaddr, bus)[0]
if rr.find(b"\x30\x01") != -1:
for s in sends[:-1]:
panda.can_send(addr, s, 0)
rr = recv(panda, 1, recvaddr, bus)[0]
panda.can_send(addr, sends[-1], 0)
else:
if rate is None:
panda.can_send_many([(addr, s, bus) for s in sends])
else:
for dat in sends:
panda.can_send(addr, dat, bus)
time.sleep(rate)
def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None):
if sendaddr is None:
sendaddr = addr - 8
if subaddr is not None:
dat = isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr)
else:
msg = recv(panda, 1, addr, bus)[0]
if msg[0] & 0xf0 == 0x10:
# first
tlen = ((msg[0] & 0xf) << 8) | msg[1]
dat = msg[2:]
# 0 block size?
CONTINUE = b"\x30" + b"\x00" * 7
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen - len(dat) + 6) // 7, addr, bus):
assert mm[0] == (0x20 | (idx & 0xF))
dat += mm[1:]
idx += 1
elif msg[0] & 0xf0 == 0x00:
# single
tlen = msg[0] & 0xf
dat = msg[1:]
else:
raise AssertionError
dat = dat[0:tlen]
if DEBUG:
print("R:", binascii.hexlify(dat))
return dat

View File

@@ -1,258 +0,0 @@
import sys
import time
import struct
from enum import IntEnum
class COMMAND_CODE(IntEnum):
CONNECT = 0xFF
DISCONNECT = 0xFE
GET_STATUS = 0xFD
SYNCH = 0xFC
GET_COMM_MODE_INFO = 0xFB
GET_ID = 0xFA
SET_REQUEST = 0xF9
GET_SEED = 0xF8
UNLOCK = 0xF7
SET_MTA = 0xF6
UPLOAD = 0xF5
SHORT_UPLOAD = 0xF4
BUILD_CHECKSUM = 0xF3
TRANSPORT_LAYER_CMD = 0xF2
USER_CMD = 0xF1
DOWNLOAD = 0xF0
DOWNLOAD_NEXT = 0xEF
DOWNLOAD_MAX = 0xEE
SHORT_DOWNLOAD = 0xED
MODIFY_BITS = 0xEC
SET_CAL_PAGE = 0xEB
GET_CAL_PAGE = 0xEA
GET_PAG_PROCESSOR_INFO = 0xE9
GET_SEGMENT_INFO = 0xE8
GET_PAGE_INFO = 0xE7
SET_SEGMENT_MODE = 0xE6
GET_SEGMENT_MODE = 0xE5
COPY_CAL_PAGE = 0xE4
CLEAR_DAQ_LIST = 0xE3
SET_DAQ_PTR = 0xE2
WRITE_DAQ = 0xE1
SET_DAQ_LIST_MODE = 0xE0
GET_DAQ_LIST_MODE = 0xDF
START_STOP_DAQ_LIST = 0xDE
START_STOP_SYNCH = 0xDD
GET_DAQ_CLOCK = 0xDC
READ_DAQ = 0xDB
GET_DAQ_PROCESSOR_INFO = 0xDA
GET_DAQ_RESOLUTION_INFO = 0xD9
GET_DAQ_LIST_INFO = 0xD8
GET_DAQ_EVENT_INFO = 0xD7
FREE_DAQ = 0xD6
ALLOC_DAQ = 0xD5
ALLOC_ODT = 0xD4
ALLOC_ODT_ENTRY = 0xD3
PROGRAM_START = 0xD2
PROGRAM_CLEAR = 0xD1
PROGRAM = 0xD0
PROGRAM_RESET = 0xCF
GET_PGM_PROCESSOR_INFO = 0xCE
GET_SECTOR_INFO = 0xCD
PROGRAM_PREPARE = 0xCC
PROGRAM_FORMAT = 0xCB
PROGRAM_NEXT = 0xCA
PROGRAM_MAX = 0xC9
PROGRAM_VERIFY = 0xC8
ERROR_CODES = {
0x00: "Command processor synchronization",
0x10: "Command was not executed",
0x11: "Command rejected because DAQ is running",
0x12: "Command rejected because PGM is running",
0x20: "Unknown command or not implemented optional command",
0x21: "Command syntax invalid",
0x22: "Command syntax valid but command parameter(s) out of range",
0x23: "The memory location is write protected",
0x24: "The memory location is not accessible",
0x25: "Access denied, Seed & Key is required",
0x26: "Selected page not available",
0x27: "Selected page mode not available",
0x28: "Selected segment not valid",
0x29: "Sequence error",
0x2A: "DAQ configuration not valid",
0x30: "Memory overflow error",
0x31: "Generic error",
0x32: "The slave internal program verify routine detects an error",
}
class CONNECT_MODE(IntEnum):
NORMAL = 0x00,
USER_DEFINED = 0x01,
class GET_ID_REQUEST_TYPE(IntEnum):
ASCII = 0x00,
ASAM_MC2_FILE = 0x01,
ASAM_MC2_PATH = 0x02,
ASAM_MC2_URL = 0x03,
ASAM_MC2_UPLOAD = 0x04,
# 128-255 user defined
class CommandTimeoutError(Exception):
pass
class CommandCounterError(Exception):
pass
class CommandResponseError(Exception):
def __init__(self, message, return_code):
super().__init__()
self.message = message
self.return_code = return_code
def __str__(self):
return self.message
class XcpClient():
def __init__(self, panda, tx_addr: int, rx_addr: int, bus: int=0, timeout: float=0.1, debug=False, pad=True):
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.can_bus = bus
self.timeout = timeout
self.debug = debug
self._panda = panda
self._byte_order = ">"
self._max_cto = 8
self._max_dto = 8
self.pad = pad
def _send_cto(self, cmd: int, dat: bytes = b"") -> None:
tx_data = (bytes([cmd]) + dat)
# Some ECUs don't respond if the packets are not padded to 8 bytes
if self.pad:
tx_data = tx_data.ljust(8, b"\x00")
if self.debug:
print("CAN-CLEAR: TX")
self._panda.can_clear(self.can_bus)
if self.debug:
print("CAN-CLEAR: RX")
self._panda.can_clear(0xFFFF)
if self.debug:
print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(tx_data)}")
self._panda.can_send(self.tx_addr, tx_data, self.can_bus)
def _recv_dto(self, timeout: float) -> bytes:
start_time = time.time()
while time.time() - start_time < timeout:
msgs = self._panda.can_recv() or []
if len(msgs) >= 256:
print("CAN RX buffer overflow!!!", file=sys.stderr)
for rx_addr, rx_data, rx_bus in msgs:
if rx_bus == self.can_bus and rx_addr == self.rx_addr:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
pid = rx_data[0]
if pid == 0xFE:
err = rx_data[1]
err_desc = ERROR_CODES.get(err, "unknown error")
dat = rx_data[2:]
raise CommandResponseError(f"{hex(err)} - {err_desc} {dat}", err)
return bytes(rx_data[1:])
time.sleep(0.001)
raise CommandTimeoutError("timeout waiting for response")
# commands
def connect(self, connect_mode: CONNECT_MODE=CONNECT_MODE.NORMAL) -> dict:
self._send_cto(COMMAND_CODE.CONNECT, bytes([connect_mode]))
resp = self._recv_dto(self.timeout)
assert len(resp) == 7, f"incorrect data length: {len(resp)}"
self._byte_order = ">" if resp[1] & 0x01 else "<"
self._slave_block_mode = resp[1] & 0x40 != 0
self._max_cto = resp[2]
self._max_dto = struct.unpack(f"{self._byte_order}H", resp[3:5])[0]
return {
"cal_support": resp[0] & 0x01 != 0,
"daq_support": resp[0] & 0x04 != 0,
"stim_support": resp[0] & 0x08 != 0,
"pgm_support": resp[0] & 0x10 != 0,
"byte_order": self._byte_order,
"address_granularity": 2**((resp[1] & 0x06) >> 1),
"slave_block_mode": self._slave_block_mode,
"optional": resp[1] & 0x80 != 0,
"max_cto": self._max_cto,
"max_dto": self._max_dto,
"protocol_version": resp[5],
"transport_version": resp[6],
}
def disconnect(self) -> None:
self._send_cto(COMMAND_CODE.DISCONNECT)
resp = self._recv_dto(self.timeout)
assert len(resp) == 0, f"incorrect data length: {len(resp)}"
def get_id(self, req_id_type: GET_ID_REQUEST_TYPE = GET_ID_REQUEST_TYPE.ASCII) -> dict:
if req_id_type > 255:
raise ValueError("request id type must be less than 255")
self._send_cto(COMMAND_CODE.GET_ID, bytes([req_id_type]))
resp = self._recv_dto(self.timeout)
return {
# mode = 0 means MTA was set
# mode = 1 means data is at end (only CAN-FD has space for this)
"mode": resp[0],
"length": struct.unpack(f"{self._byte_order}I", resp[3:7])[0],
"identifier": resp[7:] if self._max_cto > 8 else None
}
def get_seed(self, mode: int = 0) -> bytes:
if mode > 255:
raise ValueError("mode must be less than 255")
self._send_cto(COMMAND_CODE.GET_SEED, bytes([0, mode]))
# TODO: add support for longer seeds spread over multiple blocks
ret = self._recv_dto(self.timeout)
length = ret[0]
return ret[1:length+1]
def unlock(self, key: bytes) -> bytes:
# TODO: add support for longer keys spread over multiple blocks
self._send_cto(COMMAND_CODE.UNLOCK, bytes([len(key)]) + key)
return self._recv_dto(self.timeout)
def set_mta(self, addr: int, addr_ext: int = 0) -> bytes:
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
# TODO: this looks broken (missing addr extension)
self._send_cto(COMMAND_CODE.SET_MTA, bytes([0x00, 0x00, addr_ext]) + struct.pack(f"{self._byte_order}I", addr))
return self._recv_dto(self.timeout)
def upload(self, size: int) -> bytes:
if size > 255:
raise ValueError("size must be less than 256")
if not self._slave_block_mode and size > self._max_dto - 1:
raise ValueError("block mode not supported")
self._send_cto(COMMAND_CODE.UPLOAD, bytes([size]))
resp = b""
while len(resp) < size:
resp += self._recv_dto(self.timeout)[:size - len(resp) + 1]
return resp[:size] # trim off bytes with undefined values
def short_upload(self, size: int, addr_ext: int, addr: int) -> bytes:
if size > 6:
raise ValueError("size must be less than 7")
if addr_ext > 255:
raise ValueError("address extension must be less than 256")
self._send_cto(COMMAND_CODE.SHORT_UPLOAD, bytes([size, 0x00, addr_ext]) + struct.pack(f"{self._byte_order}I", addr))
return self._recv_dto(self.timeout)[:size] # trim off bytes with undefined values
def download(self, data: bytes) -> bytes:
size = len(data)
if size > 255:
raise ValueError("size must be less than 256")
if not self._slave_block_mode and size > self._max_dto - 2:
raise ValueError("block mode not supported")
self._send_cto(COMMAND_CODE.DOWNLOAD, bytes([size]) + data)
return self._recv_dto(self.timeout)[:size]