dragonpilot 2023-03-27T06:11:06 for EON/C2

version: dragonpilot v0.9.2 beta for EON/C2
date: 2023-03-27T06:11:06
dp-dev(priv2) master commit: 0a08aa2b73a505e11e4c98ac6c5989464b7d339f
This commit is contained in:
Dragonpilot Team
2023-03-25 08:58:31 +00:00
committed by Comma Device
parent f770882b7f
commit ea800c8f74
452 changed files with 22242 additions and 10694 deletions

View File

@@ -5,18 +5,20 @@ import time
import usb1
import struct
import hashlib
import binascii
import datetime
import traceback
import warnings
import logging
from functools import wraps
from typing import Optional
from itertools import accumulate
from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, SECTOR_SIZES_FX, SECTOR_SIZES_H7
from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7
from .base import BaseHandle
from .constants import McuType
from .dfu import PandaDFU
from .isotp import isotp_send, isotp_recv
from .spi import SpiHandle
from .spi import PandaSpiHandle, PandaSpiException
from .usb import PandaUsbHandle
__version__ = '0.0.10'
@@ -24,55 +26,48 @@ __version__ = '0.0.10'
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(level=LOGLEVEL, format='%(message)s')
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
DEBUG = os.getenv("PANDADEBUG") is not None
CAN_TRANSACTION_MAGIC = struct.pack("<I", 0x43414E2F)
USBPACKET_MAX_SIZE = 0x40
CANPACKET_HEAD_SIZE = 0x5
CANPACKET_HEAD_SIZE = 0x6
DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)}
def calculate_checksum(data):
res = 0
for b in data:
res ^= b
return res
def pack_can_buffer(arr):
snds = [CAN_TRANSACTION_MAGIC]
snds = [b'']
for address, _, dat, bus in arr:
assert len(dat) in LEN_TO_DLC
#logging.debug(" W 0x%x: 0x%s", address, dat.hex())
extended = 1 if address >= 0x800 else 0
data_len_code = LEN_TO_DLC[len(dat)]
header = bytearray(5)
header = bytearray(CANPACKET_HEAD_SIZE)
word_4b = address << 3 | extended << 2
header[0] = (data_len_code << 4) | (bus << 1)
header[1] = word_4b & 0xFF
header[2] = (word_4b >> 8) & 0xFF
header[3] = (word_4b >> 16) & 0xFF
header[4] = (word_4b >> 24) & 0xFF
header[5] = calculate_checksum(header[:5] + dat)
snds[-1] += header + dat
if len(snds[-1]) > 256: # Limit chunks to 256 bytes
snds.append(CAN_TRANSACTION_MAGIC)
snds.append(b'')
return snds
def unpack_can_buffer(dat):
ret = []
if len(dat) < len(CAN_TRANSACTION_MAGIC):
return ret
if dat[:len(CAN_TRANSACTION_MAGIC)] != CAN_TRANSACTION_MAGIC:
logging.error("CAN: recv didn't start with magic")
return ret
dat = dat[len(CAN_TRANSACTION_MAGIC):]
while len(dat) >= CANPACKET_HEAD_SIZE:
data_len = DLC_TO_LEN[(dat[0]>>4)]
header = dat[:CANPACKET_HEAD_SIZE]
dat = dat[CANPACKET_HEAD_SIZE:]
bus = (header[0] >> 1) & 0x7
address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3
@@ -84,17 +79,18 @@ def unpack_can_buffer(dat):
# rejected
bus += 192
data = dat[:data_len]
dat = dat[data_len:]
# we need more from the next transfer
if data_len > len(dat) - CANPACKET_HEAD_SIZE:
break
#logging.debug(" R 0x%x: 0x%s", address, data.hex())
assert calculate_checksum(dat[:(CANPACKET_HEAD_SIZE+data_len)]) == 0, "CAN packet checksum incorrect"
data = dat[CANPACKET_HEAD_SIZE:(CANPACKET_HEAD_SIZE+data_len)]
dat = dat[(CANPACKET_HEAD_SIZE+data_len):]
ret.append((address, 0, data, bus))
if len(dat) > 0:
logging.error("CAN: malformed packet. leftover data")
return ret
return (ret, dat)
def ensure_health_packet_version(fn):
@wraps(fn)
@@ -162,8 +158,6 @@ class Panda:
SAFETY_FAW = 26
SAFETY_BODY = 27
SAFETY_HYUNDAI_CANFD = 28
SAFETY_VOLVO_C1 = 29
SAFETY_VOLVO_EUCD = 30
SERIAL_DEBUG = 0
SERIAL_ESP = 1
@@ -188,11 +182,14 @@ class Panda:
HW_TYPE_RED_PANDA_V2 = b'\x08'
HW_TYPE_TRES = b'\x09'
CAN_PACKET_VERSION = 3
CAN_PACKET_VERSION = 4
HEALTH_PACKET_VERSION = 11
CAN_HEALTH_PACKET_VERSION = 3
HEALTH_STRUCT = struct.Struct("<IIIIIIIIIBBBBBBHBBBHfBBB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIHHBBB")
CAN_HEALTH_PACKET_VERSION = 4
# dp - 2 extra "B" at the end:
# "usb_power_mode": a[23],
# "torque_interceptor_detected": a[24],
HEALTH_STRUCT = struct.Struct("<IIIIIIIIIBBBBBBHBBBHfBBBB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIIHHBBB")
F2_DEVICES = (HW_TYPE_PEDAL, )
F4_DEVICES = (HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS)
@@ -201,9 +198,6 @@ class Panda:
INTERNAL_DEVICES = (HW_TYPE_UNO, HW_TYPE_DOS)
HAS_OBD = (HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES)
CLOCK_SOURCE_MODE_DISABLED = 0
CLOCK_SOURCE_MODE_FREE_RUNNING = 1
# first byte is for EPS scaling factor
FLAG_TOYOTA_ALT_BRAKE = (1 << 8)
FLAG_TOYOTA_STOCK_LONGITUDINAL = (2 << 8)
@@ -234,47 +228,96 @@ class Panda:
FLAG_GM_HW_CAM = 1
FLAG_GM_HW_CAM_LONG = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, spi: bool = False, disable_checks: bool = True):
self._serial = serial
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True):
self._connect_serial = serial
self._disable_checks = disable_checks
self._handle = None
self._bcd_device = None
self._handle: BaseHandle
self._handle_open = False
self.can_rx_overflow_buffer = b''
# connect and set mcu type
self._spi = spi
self.connect(claim)
# reset comms
self.can_reset_communications()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self):
self._handle.close()
self._handle = None
if self._handle_open:
self._handle.close()
self._handle_open = False
def connect(self, claim=True, wait=False):
if self._handle is not None:
self.close()
self._handle = None
self.close()
if self._spi:
self._handle = SpiHandle()
# try USB first, then SPI
self._handle, serial, self.bootstub, bcd = self.usb_connect(self._connect_serial, claim=claim, wait=wait)
if self._handle is None:
self._handle, serial, self.bootstub, bcd = self.spi_connect(self._connect_serial)
# TODO implement
self._serial = "SPIDEV"
self.bootstub = False
if self._handle is None:
raise Exception("failed to connect to panda")
else:
self.usb_connect(claim=claim, wait=wait)
# Some fallback logic to determine panda and MCU type for old bootstubs,
# since we now support multiple MCUs and need to know which fw to flash.
# Three cases to consider:
# A) oldest bootstubs don't have any way to distinguish
# MCU or panda type
# B) slightly newer (~2 weeks after first C3's built) bootstubs
# have the panda type set in the USB bcdDevice
# C) latest bootstubs also implement the endpoint for panda type
self._bcd_hw_type = None
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
missing_hw_type_endpoint = self.bootstub and ret.startswith(b'\xff\x00\xc1\x3e\xde\xad\xd0\x0d')
if missing_hw_type_endpoint and bcd is not None:
self._bcd_hw_type = bcd
assert self._handle is not None
# For case A, we assume F4 MCU type, since all H7 pandas should be case B at worst
self._assume_f4_mcu = (self._bcd_hw_type is None) and missing_hw_type_endpoint
self._serial = serial
self._connect_serial = serial
self._handle_open = True
self._mcu_type = self.get_mcu_type()
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
print("connected")
logging.debug("connected")
# disable openpilot's heartbeat checks
if self._disable_checks:
self.set_heartbeat_disabled()
self.set_power_save(0)
def usb_connect(self, claim=True, wait=False):
@staticmethod
def spi_connect(serial):
# get UID to confirm slave is present and up
handle = None
spi_serial = None
bootstub = None
try:
handle = PandaSpiHandle()
dat = handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12, timeout=100)
spi_serial = binascii.hexlify(dat).decode()
bootstub = Panda.flasher_present(handle)
except PandaSpiException:
pass
# no connection or wrong panda
if None in (spi_serial, bootstub) or (serial is not None and (spi_serial != serial)):
handle = None
spi_serial = None
bootstub = False
return handle, spi_serial, bootstub, None
@staticmethod
def usb_connect(serial, claim=True, wait=False):
handle, usb_serial, bootstub, bcd = None, None, None, None
context = usb1.USBContext()
while 1:
try:
@@ -284,30 +327,69 @@ class Panda:
this_serial = device.getSerialNumber()
except Exception:
continue
if self._serial is None or this_serial == self._serial:
self._serial = this_serial
print("opening device", self._serial, hex(device.getProductID()))
self.bootstub = device.getProductID() == 0xddee
self._handle = device.open()
if serial is None or this_serial == serial:
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
usb_serial = this_serial
bootstub = device.getProductID() == 0xddee
handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
self._handle.setAutoDetachKernelDriver(True)
handle.setAutoDetachKernelDriver(True)
if claim:
self._handle.claimInterface(0)
# self._handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
handle.claimInterface(0)
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
# bcdDevice wasn't always set to the hw type, ignore if it's the old constant
bcd = device.getbcdDevice()
if bcd is not None and bcd != 0x2300:
self._bcd_device = bytearray([bcd >> 8, ])
this_bcd = device.getbcdDevice()
if this_bcd is not None and this_bcd != 0x2300:
bcd = bytearray([this_bcd >> 8, ])
break
except Exception as e:
print("exception", e)
traceback.print_exc()
if not wait or self._handle is not None:
except Exception:
logging.exception("USB connect error")
if not wait or handle is not None:
break
context = usb1.USBContext() # New context needed so new devices show up
usb_handle = None
if handle is not None:
usb_handle = PandaUsbHandle(handle)
return usb_handle, usb_serial, bootstub, bcd
@staticmethod
def list():
ret = Panda.usb_list()
ret += Panda.spi_list()
return list(set(ret))
@staticmethod
def usb_list():
context = usb1.USBContext()
ret = []
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
try:
serial = device.getSerialNumber()
if len(serial) == 24:
ret.append(serial)
else:
warnings.warn(f"found device with panda descriptors but invalid serial: {serial}", RuntimeWarning)
except Exception:
continue
except Exception:
pass
return ret
@staticmethod
def spi_list():
_, serial, _, _ = Panda.spi_connect(None)
if serial is not None:
return [serial, ]
return []
def reset(self, enter_bootstub=False, enter_bootloader=False, reconnect=True):
try:
if enter_bootloader:
@@ -323,7 +405,7 @@ class Panda:
self.reconnect()
def reconnect(self):
if self._handle is not None:
if self._handle_open:
self.close()
time.sleep(1.0)
@@ -335,7 +417,7 @@ class Panda:
success = True
break
except Exception:
print("reconnecting is taking %d seconds..." % (i + 1))
logging.debug("reconnecting is taking %d seconds...", i + 1)
try:
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type))
dfu.recover()
@@ -345,39 +427,41 @@ class Panda:
if not success:
raise Exception("reconnect failed")
@staticmethod
def flasher_present(handle: BaseHandle) -> bool:
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
return fr[4:8] == b"\xde\xad\xd0\x0d"
@staticmethod
def flash_static(handle, code, mcu_type):
assert mcu_type is not None, "must set valid mcu_type to flash"
# confirm flasher is present
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
assert fr[4:8] == b"\xde\xad\xd0\x0d"
assert Panda.flasher_present(handle)
# determine sectors to erase
apps_sectors_cumsum = accumulate(SECTOR_SIZES_H7[1:] if mcu_type == MCU_TYPE_H7 else SECTOR_SIZES_FX[1:])
apps_sectors_cumsum = accumulate(mcu_type.config.sector_sizes[1:])
last_sector = next((i + 1 for i, v in enumerate(apps_sectors_cumsum) if v > len(code)), -1)
assert last_sector >= 1, "Binary too small? No sector to erase."
assert last_sector < 7, "Binary too large! Risk of overwriting provisioning chunk."
# unlock flash
print("flash: unlocking")
logging.warning("flash: unlocking")
handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
# erase sectors
print(f"flash: erasing sectors 1 - {last_sector}")
logging.warning(f"flash: erasing sectors 1 - {last_sector}")
for i in range(1, last_sector + 1):
handle.controlWrite(Panda.REQUEST_IN, 0xb2, i, 0, b'')
# flash over EP2
STEP = 0x10
print("flash: flashing")
logging.warning("flash: flashing")
for i in range(0, len(code), STEP):
handle.bulkWrite(2, code[i:i + STEP])
# reset
print("flash: resetting")
logging.warning("flash: resetting")
try:
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
except Exception:
@@ -385,9 +469,9 @@ class Panda:
def flash(self, fn=None, code=None, reconnect=True):
if not fn:
fn = DEFAULT_H7_FW_FN if self._mcu_type == MCU_TYPE_H7 else DEFAULT_FW_FN
fn = self._mcu_type.config.app_path
assert os.path.isfile(fn)
print("flash: main version is " + self.get_version())
logging.debug("flash: main version is %s", self.get_version())
if not self.bootstub:
self.reset(enter_bootstub=True)
assert(self.bootstub)
@@ -397,7 +481,7 @@ class Panda:
code = f.read()
# get version
print("flash: bootstub version is " + self.get_version())
logging.debug("flash: bootstub version is %s", self.get_version())
# do flash
Panda.flash_static(self._handle, code, mcu_type=self._mcu_type)
@@ -428,30 +512,16 @@ class Panda:
def wait_for_dfu(dfu_serial: str, timeout: Optional[int] = None) -> bool:
t_start = time.monotonic()
while dfu_serial not in PandaDFU.list():
print("waiting for DFU...")
logging.debug("waiting for DFU...")
time.sleep(0.1)
if timeout is not None and (time.monotonic() - t_start) > timeout:
return False
return True
@staticmethod
def list():
context = usb1.USBContext()
ret = []
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
try:
serial = device.getSerialNumber()
if len(serial) == 24:
ret.append(serial)
else:
warnings.warn(f"found device with panda descriptors but invalid serial: {serial}", RuntimeWarning)
except Exception:
continue
except Exception:
pass
return ret
def up_to_date(self) -> bool:
current = self.get_signature()
expected = Panda.get_signature_from_firmware(self.get_mcu_type().config.app_path)
return (current == expected)
def call_control_api(self, msg):
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
@@ -487,6 +557,7 @@ class Panda:
"fan_power": a[21],
"safety_rx_checks_invalid": a[22],
"usb_power_mode": a[23],
"torque_interceptor_detected": a[24],
}
@ensure_can_health_packet_version
@@ -520,11 +591,12 @@ class Panda:
"total_tx_cnt": a[13],
"total_rx_cnt": a[14],
"total_fwd_cnt": a[15],
"can_speed": a[16],
"can_data_speed": a[17],
"canfd_enabled": a[18],
"brs_enabled": a[19],
"canfd_non_iso": a[20],
"total_tx_checksum_error_cnt": a[16],
"can_speed": a[17],
"can_data_speed": a[18],
"canfd_enabled": a[19],
"brs_enabled": a[20],
"canfd_non_iso": a[21],
}
# ******************* control *******************
@@ -532,8 +604,8 @@ class Panda:
def enter_bootloader(self):
try:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd1, 0, 0, b'')
except Exception as e:
print(e)
except Exception:
logging.exception("exception while entering bootloader")
def get_version(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40).decode('utf8')
@@ -544,7 +616,7 @@ class Panda:
f.seek(-128, 2) # Seek from end of file
return f.read(128)
def get_signature(self):
def get_signature(self) -> bytes:
part_1 = self._handle.controlRead(Panda.REQUEST_IN, 0xd3, 0, 0, 0x40)
part_2 = self._handle.controlRead(Panda.REQUEST_IN, 0xd4, 0, 0, 0x40)
return bytes(part_1 + part_2)
@@ -552,10 +624,9 @@ class Panda:
def get_type(self):
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
# bootstub doesn't implement this call, so fallback to bcdDevice
invalid_type = self.bootstub and (ret is None or len(ret) != 1)
if invalid_type and self._bcd_device is not None:
ret = self._bcd_device
# old bootstubs don't implement this endpoint, see comment in Panda.device
if self._bcd_hw_type is not None and (ret is None or len(ret) != 1):
ret = self._bcd_hw_type
return ret
@@ -568,15 +639,20 @@ class Panda:
else:
return (0, 0, 0)
def get_mcu_type(self):
def get_mcu_type(self) -> McuType:
hw_type = self.get_type()
if hw_type in Panda.F2_DEVICES:
return MCU_TYPE_F2
return McuType.F2
elif hw_type in Panda.F4_DEVICES:
return MCU_TYPE_F4
return McuType.F4
elif hw_type in Panda.H7_DEVICES:
return MCU_TYPE_H7
return None
return McuType.H7
else:
# have to assume F4, see comment in Panda.connect
if self._assume_f4_mcu:
return McuType.F4
raise ValueError(f"unknown HW type: {hw_type}")
def has_obd(self):
return self.get_type() in Panda.HAS_OBD
@@ -585,14 +661,28 @@ class Panda:
return self.get_type() in Panda.INTERNAL_DEVICES
def get_serial(self):
"""
Returns the comma-issued dongle ID from our provisioning
"""
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
assert(hashsig == calc_hash)
return [dat[0:0x10].decode("utf8"), dat[0x10:0x10 + 10].decode("utf8")]
def get_usb_serial(self):
"""
Returns the serial number reported from the USB descriptor;
matches the MCU UID
"""
return self._serial
def get_uid(self):
"""
Returns the UID from the MCU
"""
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12)
return binascii.hexlify(dat).decode()
def get_secret(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
@@ -659,6 +749,9 @@ class Panda:
# Timeout is in ms. If set to 0, the timeout is infinite.
CAN_SEND_TIMEOUT_MS = 10
def can_reset_communications(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xc0, 0, 0, b'')
@ensure_can_packet_version
def can_send_many(self, arr, timeout=CAN_SEND_TIMEOUT_MS):
snds = pack_can_buffer(arr)
@@ -670,10 +763,10 @@ class Panda:
tx = tx[bs:]
if len(tx) == 0:
break
print("CAN: PARTIAL SEND MANY, RETRYING")
logging.error("CAN: PARTIAL SEND MANY, RETRYING")
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD SEND MANY, RETRYING")
logging.error("CAN: BAD SEND MANY, RETRYING")
def can_send(self, addr, dat, bus, timeout=CAN_SEND_TIMEOUT_MS):
self.can_send_many([[addr, None, dat, bus]], timeout=timeout)
@@ -686,9 +779,10 @@ class Panda:
dat = self._handle.bulkRead(1, 16384) # Max receive batch size + 2 extra reserve frames
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD RECV, RETRYING")
logging.error("CAN: BAD RECV, RETRYING")
time.sleep(0.1)
return unpack_can_buffer(dat)
msgs, self.can_rx_overflow_buffer = unpack_can_buffer(self.can_rx_overflow_buffer + dat)
return msgs
def can_clear(self, bus):
"""Clears all messages from the specified internal CAN ringbuffer as
@@ -722,8 +816,10 @@ class Panda:
def serial_write(self, port_number, ln):
ret = 0
if type(ln) == str:
ln = bytes(ln, 'utf-8')
for i in range(0, len(ln), 0x20):
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + bytes(ln[i:i + 0x20], 'utf-8'))
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20])
return ret
def serial_clear(self, port_number):
@@ -741,19 +837,15 @@ class Panda:
# pulse low for wakeup
def kline_wakeup(self, k=True, l=True):
assert k or l, "must specify k-line, l-line, or both"
if DEBUG:
print("kline wakeup...")
logging.debug("kline wakeup...")
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf0, 2 if k and l else int(l), 0, b'')
if DEBUG:
print("kline wakeup done")
logging.debug("kline wakeup done")
def kline_5baud(self, addr, k=True, l=True):
assert k or l, "must specify k-line, l-line, or both"
if DEBUG:
print("kline 5 baud...")
logging.debug("kline 5 baud...")
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf4, 2 if k and l else int(l), addr, b'')
if DEBUG:
print("kline 5 baud done")
logging.debug("kline 5 baud done")
def kline_drain(self, bus=2):
# drain buffer
@@ -762,8 +854,7 @@ class Panda:
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xe0, bus, 0, 0x40)
if len(ret) == 0:
break
elif DEBUG:
print(f"kline drain: 0x{ret.hex()}")
logging.debug(f"kline drain: 0x{ret.hex()}")
bret += ret
return bytes(bret)
@@ -771,8 +862,8 @@ class Panda:
echo = bytearray()
while len(echo) != cnt:
ret = self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt - len(echo))
if DEBUG and len(ret) > 0:
print(f"kline recv: 0x{ret.hex()}")
if len(ret) > 0:
logging.debug(f"kline recv: 0x{ret.hex()}")
echo += ret
return bytes(echo)
@@ -782,14 +873,13 @@ class Panda:
x += bytes([sum(x) % 0x100])
for i in range(0, len(x), 0xf):
ts = x[i:i + 0xf]
if DEBUG:
print(f"kline send: 0x{ts.hex()}")
logging.debug(f"kline send: 0x{ts.hex()}")
self._handle.bulkWrite(2, bytes([bus]) + ts)
echo = self.kline_ll_recv(len(ts), bus=bus)
if echo != ts:
print(f"**** ECHO ERROR {i} ****")
print(f"0x{echo.hex()}")
print(f"0x{ts.hex()}")
logging.error(f"**** ECHO ERROR {i} ****")
logging.error(f"0x{echo.hex()}")
logging.error(f"0x{ts.hex()}")
assert echo == ts
def kline_recv(self, bus=2, header_len=4):
@@ -822,6 +912,11 @@ class Panda:
a = struct.unpack("HBBBBBB", dat)
return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6])
# ****************** Timer *****************
def get_microsecond_timer(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa8, 0, 0, 4)
return struct.unpack("I", dat)[0]
# ******************* IR *******************
def set_ir_power(self, percentage):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb0, int(percentage), 0, b'')
@@ -839,10 +934,10 @@ class Panda:
def set_phone_power(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb3, int(enabled), 0, b'')
# ************** Clock Source **************
def set_clock_source_mode(self, mode):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf5, int(mode), 0, b'')
# ****************** Siren *****************
def set_siren(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'')
# ****************** Debug *****************
def set_green_led(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')

67
panda/python/base.py Normal file
View File

@@ -0,0 +1,67 @@
from abc import ABC, abstractmethod
from typing import List
from .constants import McuType
TIMEOUT = int(15 * 1e3) # default timeout, in milliseconds
class BaseHandle(ABC):
"""
A handle to talk to a panda.
Borrows heavily from the libusb1 handle API.
"""
@abstractmethod
def close(self) -> None:
...
@abstractmethod
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT) -> int:
...
@abstractmethod
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT) -> bytes:
...
@abstractmethod
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = TIMEOUT) -> int:
...
@abstractmethod
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
...
class BaseSTBootloaderHandle(ABC):
"""
A handle to talk to a panda while it's in the STM32 bootloader.
"""
@abstractmethod
def get_mcu_type(self) -> McuType:
...
@abstractmethod
def close(self) -> None:
...
@abstractmethod
def clear_status(self) -> None:
...
@abstractmethod
def program(self, address: int, dat: bytes) -> None:
...
@abstractmethod
def erase_app(self) -> None:
...
@abstractmethod
def erase_bootstub(self) -> None:
...
@abstractmethod
def jump(self, address: int) -> None:
...

View File

@@ -100,9 +100,9 @@ class CcpClient():
msgs = self._panda.can_recv() or []
if len(msgs) >= 256:
print("CAN RX buffer overflow!!!", file=sys.stderr)
for rx_addr, _, rx_data, rx_bus in msgs:
for rx_addr, _, rx_data_bytearray, rx_bus in msgs:
if rx_bus == self.can_bus and rx_addr == self.rx_addr:
rx_data = bytes(rx_data) # convert bytearray to bytes
rx_data = bytes(rx_data_bytearray)
if self.debug:
print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
assert len(rx_data) == 8, f"message length not 8: {len(rx_data)}"
@@ -183,7 +183,7 @@ class CcpClient():
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr
return mta_addr # type: ignore
def download_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
@@ -192,7 +192,7 @@ class CcpClient():
resp = self._recv_dto(0.025)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr
return mta_addr # type: ignore
def upload(self, size: int) -> bytes:
if size > 5:
@@ -296,7 +296,7 @@ class CcpClient():
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr
return mta_addr # type: ignore
def program_6_bytes(self, data: bytes) -> int:
if len(data) != 6:
@@ -305,7 +305,7 @@ class CcpClient():
resp = self._recv_dto(0.1)
# mta_addr_ext = resp[0]
mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0]
return mta_addr
return mta_addr # type: ignore
def move_memory_block(self, size: int) -> None:
self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size))

57
panda/python/constants.py Normal file
View File

@@ -0,0 +1,57 @@
import os
import enum
from typing import List, NamedTuple
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
class McuConfig(NamedTuple):
mcu: str
mcu_idcode: int
uid_address: int
block_size: int
sector_sizes: List[int]
serial_number_address: int
app_address: int
app_path: str
bootstub_address: int
bootstub_path: str
Fx = (
0x1FFF7A10,
0x800,
[0x4000 for _ in range(4)] + [0x10000] + [0x20000 for _ in range(11)],
0x1FFF79C0,
0x8004000,
os.path.join(BASEDIR, "board", "obj", "panda.bin.signed"),
0x8000000,
os.path.join(BASEDIR, "board", "obj", "bootstub.panda.bin"),
)
F2Config = McuConfig("STM32F2", 0x411, *Fx)
F4Config = McuConfig("STM32F4", 0x463, *Fx)
H7Config = McuConfig(
"STM32H7",
0x483,
0x1FF1E800,
0x400,
# there is an 8th sector, but we use that for the provisioning chunk, so don't program over that!
[0x20000 for _ in range(7)],
0x080FFFC0,
0x8020000,
os.path.join(BASEDIR, "board", "obj", "panda_h7.bin.signed"),
0x8000000,
os.path.join(BASEDIR, "board", "obj", "bootstub.panda_h7.bin"),
)
@enum.unique
class McuType(enum.Enum):
F2 = F2Config
F4 = F4Config
H7 = H7Config
@property
def config(self):
return self.value
MCU_TYPE_BY_IDCODE = {m.config.mcu_idcode: m for m in McuType}

View File

@@ -1,22 +1,31 @@
import usb1
import struct
import binascii
from .config import BOOTSTUB_ADDRESS, APP_ADDRESS_H7, APP_ADDRESS_FX, BLOCK_SIZE_H7, BLOCK_SIZE_FX, DEFAULT_H7_BOOTSTUB_FN, DEFAULT_BOOTSTUB_FN
from typing import List, Optional
from .base import BaseSTBootloaderHandle
from .spi import STBootloaderSPIHandle, PandaSpiException
from .usb import STBootloaderUSBHandle
from .constants import McuType
MCU_TYPE_F2 = 0
MCU_TYPE_F4 = 1
MCU_TYPE_H7 = 2
class PandaDFU:
def __init__(self, dfu_serial: Optional[str]):
# try USB, then SPI
handle: Optional[BaseSTBootloaderHandle]
handle = PandaDFU.usb_connect(dfu_serial)
if handle is None:
handle = PandaDFU.spi_connect(dfu_serial)
# *** DFU mode ***
DFU_DNLOAD = 1
DFU_UPLOAD = 2
DFU_GETSTATUS = 3
DFU_CLRSTATUS = 4
DFU_ABORT = 6
if handle is None:
raise Exception(f"failed to open DFU device {dfu_serial}")
class PandaDFU(object):
def __init__(self, dfu_serial):
self._handle: BaseSTBootloaderHandle = handle
self._mcu_type: McuType = self._handle.get_mcu_type()
@staticmethod
def usb_connect(dfu_serial: Optional[str]) -> Optional[STBootloaderUSBHandle]:
handle = None
context = usb1.USBContext()
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0x0483 and device.getProductID() == 0xdf11:
@@ -24,14 +33,37 @@ class PandaDFU(object):
this_dfu_serial = device.open().getASCIIStringDescriptor(3)
except Exception:
continue
if this_dfu_serial == dfu_serial or dfu_serial is None:
self._mcu_type = self.get_mcu_type(device)
self._handle = device.open()
return
raise Exception("failed to open " + dfu_serial if dfu_serial is not None else "DFU device")
handle = STBootloaderUSBHandle(device, device.open())
break
return handle
@staticmethod
def list():
def spi_connect(dfu_serial: Optional[str]) -> Optional[STBootloaderSPIHandle]:
handle = None
this_dfu_serial = None
try:
handle = STBootloaderSPIHandle()
this_dfu_serial = PandaDFU.st_serial_to_dfu_serial(handle.get_uid(), handle.get_mcu_type())
except PandaSpiException:
handle = None
if dfu_serial is not None and dfu_serial != this_dfu_serial:
handle = None
return handle
@staticmethod
def list() -> List[str]:
ret = PandaDFU.usb_list()
ret += PandaDFU.spi_list()
return list(set(ret))
@staticmethod
def usb_list() -> List[str]:
context = usb1.USBContext()
dfu_serials = []
try:
@@ -46,80 +78,41 @@ class PandaDFU(object):
return dfu_serials
@staticmethod
def st_serial_to_dfu_serial(st, mcu_type=MCU_TYPE_F4):
def spi_list() -> List[str]:
try:
h = PandaDFU.spi_connect(None)
if h is not None:
dfu_serial = PandaDFU.st_serial_to_dfu_serial(h.get_uid(), h.get_mcu_type())
return [dfu_serial, ]
except PandaSpiException:
pass
return []
@staticmethod
def st_serial_to_dfu_serial(st: str, mcu_type: McuType = McuType.F4):
if st is None or st == "none":
return None
uid_base = struct.unpack("H" * 6, bytes.fromhex(st))
if mcu_type == MCU_TYPE_H7:
if mcu_type == McuType.H7:
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4], uid_base[3])).upper().decode("utf-8")
else:
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
# TODO: Find a way to detect F4 vs F2
def get_mcu_type(self, dev):
return MCU_TYPE_H7 if dev.getbcdDevice() == 512 else MCU_TYPE_F4
def get_mcu_type(self) -> McuType:
return self._mcu_type
def status(self):
while 1:
dat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
if dat[1] == 0:
break
def clear_status(self):
# Clear status
stat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
if stat[4] == 0xa:
self._handle.controlRead(0x21, DFU_CLRSTATUS, 0, 0, 0)
elif stat[4] == 0x9:
self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, b"")
self.status()
stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
def erase(self, address):
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
self.status()
def program(self, address, dat, block_size=None):
if block_size is None:
block_size = len(dat)
# Set Address Pointer
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self.status()
# Program
dat += b"\xFF" * ((block_size - len(dat)) % block_size)
for i in range(0, len(dat) // block_size):
ldat = dat[i * block_size:(i + 1) * block_size]
print("programming %d with length %d" % (i, len(ldat)))
self._handle.controlWrite(0x21, DFU_DNLOAD, 2 + i, 0, ldat)
self.status()
def reset(self):
self._handle.jump(self._mcu_type.config.bootstub_address)
def program_bootstub(self, code_bootstub):
self.clear_status()
self.erase(BOOTSTUB_ADDRESS)
if self._mcu_type == MCU_TYPE_H7:
self.erase(APP_ADDRESS_H7)
self.program(BOOTSTUB_ADDRESS, code_bootstub, BLOCK_SIZE_H7)
else:
self.erase(APP_ADDRESS_FX)
self.program(BOOTSTUB_ADDRESS, code_bootstub, BLOCK_SIZE_FX)
self._handle.clear_status()
self._handle.erase_bootstub()
self._handle.erase_app()
self._handle.program(self._mcu_type.config.bootstub_address, code_bootstub)
self.reset()
def recover(self):
fn = DEFAULT_H7_BOOTSTUB_FN if self._mcu_type == MCU_TYPE_H7 else DEFAULT_BOOTSTUB_FN
with open(fn, "rb") as f:
with open(self._mcu_type.config.bootstub_path, "rb") as f:
code = f.read()
self.program_bootstub(code)
def reset(self):
# **** Reset ****
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", BOOTSTUB_ADDRESS))
self.status()
try:
self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, b"")
_ = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
except Exception:
pass

View File

@@ -1,9 +1,22 @@
import binascii
import os
import fcntl
import math
import time
import struct
import spidev
import logging
import threading
from contextlib import contextmanager
from functools import reduce
from typing import List
from typing import List, Optional
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType, MCU_TYPE_BY_IDCODE
try:
import spidev
except ImportError:
spidev = None
# Constants
SYNC = 0x5A
@@ -12,17 +25,69 @@ DACK = 0x85
NACK = 0x1F
CHECKSUM_START = 0xAB
MAX_RETRY_COUNT = 5
MIN_ACK_TIMEOUT_MS = 100
MAX_XFER_RETRY_COUNT = 5
USB_MAX_SIZE = 0x40
# This mimics the handle given by libusb1 for easy interoperability
class SpiHandle:
def __init__(self):
self.spi = spidev.SpiDev() # pylint: disable=c-extension-no-member
self.spi.open(0, 0)
DEV_PATH = "/dev/spidev0.0"
self.spi.max_speed_hz = 30000000
class PandaSpiException(Exception):
pass
class PandaSpiUnavailable(PandaSpiException):
pass
class PandaSpiNackResponse(PandaSpiException):
pass
class PandaSpiMissingAck(PandaSpiException):
pass
class PandaSpiBadChecksum(PandaSpiException):
pass
class PandaSpiTransferFailed(PandaSpiException):
pass
SPI_LOCK = threading.Lock()
class SpiDevice:
"""
Provides locked, thread-safe access to a panda's SPI interface.
"""
def __init__(self, speed=30000000):
if not os.path.exists(DEV_PATH):
raise PandaSpiUnavailable(f"SPI device not found: {DEV_PATH}")
if spidev is None:
raise PandaSpiUnavailable("spidev is not installed")
self._spidev = spidev.SpiDev() # pylint: disable=c-extension-no-member
self._spidev.open(0, 0)
self._spidev.max_speed_hz = speed
@contextmanager
def acquire(self):
try:
SPI_LOCK.acquire()
fcntl.flock(self._spidev, fcntl.LOCK_EX)
yield self._spidev
finally:
fcntl.flock(self._spidev, fcntl.LOCK_UN)
SPI_LOCK.release()
def close(self):
self._spidev.close()
class PandaSpiHandle(BaseHandle):
"""
A class that mimics a libusb1 handle for panda SPI communications.
"""
def __init__(self):
self.dev = SpiDevice()
# helpers
def _calc_checksum(self, data: List[int]) -> int:
@@ -31,75 +96,218 @@ class SpiHandle:
cksum ^= b
return cksum
def _transfer(self, endpoint: int, data, max_rx_len: int = 1000) -> bytes:
def _wait_for_ack(self, spi, ack_val: int, timeout: int) -> None:
timeout_s = max(MIN_ACK_TIMEOUT_MS, timeout) * 1e-3
start = time.monotonic()
while (timeout == 0) or ((time.monotonic() - start) < timeout_s):
dat = spi.xfer2(b"\x12")[0]
if dat == NACK:
raise PandaSpiNackResponse
elif dat == ack_val:
return
raise PandaSpiMissingAck
def _transfer(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000) -> bytes:
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
logging.debug("==============================================")
for n in range(MAX_RETRY_COUNT):
exc = PandaSpiException()
for n in range(MAX_XFER_RETRY_COUNT):
logging.debug("\ntry #%d", n+1)
try:
logging.debug("- send header")
packet = struct.pack("<BBHH", SYNC, endpoint, len(data), max_rx_len)
packet += bytes([reduce(lambda x, y: x^y, packet) ^ CHECKSUM_START])
self.spi.xfer2(packet)
spi.xfer2(packet)
logging.debug("- waiting for ACK")
# TODO: add timeout?
dat = b"\x00"
while dat[0] not in [HACK, NACK]:
dat = self.spi.xfer2(b"\x12")
if dat[0] == NACK:
raise Exception("Got NACK response for header")
logging.debug("- waiting for header ACK")
self._wait_for_ack(spi, HACK, timeout)
# send data
logging.debug("- sending data")
packet = bytes([*data, self._calc_checksum(data)])
self.spi.xfer2(packet)
spi.xfer2(packet)
logging.debug("- waiting for ACK")
dat = b"\x00"
while dat[0] not in [DACK, NACK]:
dat = self.spi.xfer2(b"\xab")
if dat[0] == NACK:
raise Exception("Got NACK response for data")
logging.debug("- waiting for data ACK")
self._wait_for_ack(spi, DACK, timeout)
# get response length, then response
response_len_bytes = bytes(self.spi.xfer2(b"\x00" * 2))
response_len_bytes = bytes(spi.xfer2(b"\x00" * 2))
response_len = struct.unpack("<H", response_len_bytes)[0]
if response_len > max_rx_len:
raise PandaSpiException("response length greater than max")
logging.debug("- receiving response")
dat = bytes(self.spi.xfer2(b"\x00" * (response_len + 1)))
dat = bytes(spi.xfer2(b"\x00" * (response_len + 1)))
if self._calc_checksum([DACK, *response_len_bytes, *dat]) != 0:
raise Exception("SPI got bad checksum")
raise PandaSpiBadChecksum
return dat[:-1]
except Exception:
logging.exception("SPI transfer failed, %d retries left", n)
raise Exception(f"SPI transaction failed {MAX_RETRY_COUNT} times")
except PandaSpiException as e:
exc = e
logging.debug("SPI transfer failed, %d retries left", n, exc_info=True)
raise exc
# libusb1 functions
def close(self):
self.spi.close()
self.dev.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = 0):
return self._transfer(0, struct.pack("<BHHH", request, value, index, 0))
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT):
with self.dev.acquire() as spi:
return self._transfer(spi, 0, struct.pack("<BHHH", request, value, index, 0), timeout)
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = 0):
return self._transfer(0, struct.pack("<BHHH", request, value, index, length))
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT):
with self.dev.acquire() as spi:
return self._transfer(spi, 0, struct.pack("<BHHH", request, value, index, length), timeout)
# TODO: implement these properly
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = 0) -> int:
for x in range(math.ceil(len(data) / USB_MAX_SIZE)):
self._transfer(endpoint, data[USB_MAX_SIZE*x:USB_MAX_SIZE*(x+1)])
return len(data)
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = TIMEOUT) -> int:
with self.dev.acquire() as spi:
for x in range(math.ceil(len(data) / USB_MAX_SIZE)):
self._transfer(spi, endpoint, data[USB_MAX_SIZE*x:USB_MAX_SIZE*(x+1)], timeout)
return len(data)
def bulkRead(self, endpoint: int, length: int, timeout: int = 0) -> bytes:
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
ret: List[int] = []
for _ in range(math.ceil(length / USB_MAX_SIZE)):
d = self._transfer(endpoint, [], max_rx_len=USB_MAX_SIZE)
ret += d
if len(d) < USB_MAX_SIZE:
break
with self.dev.acquire() as spi:
for _ in range(math.ceil(length / USB_MAX_SIZE)):
d = self._transfer(spi, endpoint, [], timeout, max_rx_len=USB_MAX_SIZE)
ret += d
if len(d) < USB_MAX_SIZE:
break
return bytes(ret)
class STBootloaderSPIHandle(BaseSTBootloaderHandle):
"""
Implementation of the STM32 SPI bootloader protocol described in:
https://www.st.com/resource/en/application_note/an4286-spi-protocol-used-in-the-stm32-bootloader-stmicroelectronics.pdf
"""
SYNC = 0x5A
ACK = 0x79
NACK = 0x1F
def __init__(self):
self.dev = SpiDevice(speed=1000000)
# say hello
try:
with self.dev.acquire() as spi:
spi.xfer([self.SYNC, ])
try:
self._get_ack(spi)
except PandaSpiNackResponse:
# NACK ok here, will only ACK the first time
pass
self._mcu_type = MCU_TYPE_BY_IDCODE[self.get_chip_id()]
except PandaSpiException:
raise PandaSpiException("failed to connect to panda") # pylint: disable=W0707
def _get_ack(self, spi, timeout=1.0):
data = 0x00
start_time = time.monotonic()
while data not in (self.ACK, self.NACK) and (time.monotonic() - start_time < timeout):
data = spi.xfer([0x00, ])[0]
time.sleep(0.001)
spi.xfer([self.ACK, ])
if data == self.NACK:
raise PandaSpiNackResponse
elif data != self.ACK:
raise PandaSpiMissingAck
def _cmd(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes:
ret = b""
with self.dev.acquire() as spi:
# sync + command
spi.xfer([self.SYNC, ])
spi.xfer([cmd, cmd ^ 0xFF])
self._get_ack(spi)
# "predata" - for commands that send the first data without a checksum
if predata is not None:
spi.xfer(predata)
self._get_ack(spi)
# send data
if data is not None:
for d in data:
if predata is not None:
spi.xfer(d + self._checksum(predata + d))
else:
spi.xfer(d + self._checksum(d))
self._get_ack(spi, timeout=20)
# receive
if read_bytes > 0:
ret = spi.xfer([0x00, ]*(read_bytes + 1))[1:]
if data is None or len(data) == 0:
self._get_ack(spi)
return bytes(ret)
def _checksum(self, data: bytes) -> bytes:
if len(data) == 1:
ret = data[0] ^ 0xFF
else:
ret = reduce(lambda a, b: a ^ b, data)
return bytes([ret, ])
# *** Bootloader commands ***
def read(self, address: int, length: int):
data = [struct.pack('>I', address), struct.pack('B', length - 1)]
return self._cmd(0x11, data=data, read_bytes=length)
def get_chip_id(self) -> int:
r = self._cmd(0x02, read_bytes=3)
assert r[0] == 1 # response length - 1
return ((r[1] << 8) + r[2])
def go_cmd(self, address: int) -> None:
self._cmd(0x21, data=[struct.pack('>I', address), ])
# *** helpers ***
def get_uid(self):
dat = self.read(McuType.H7.config.uid_address, 12)
return binascii.hexlify(dat).decode()
def erase_sector(self, sector: int):
p = struct.pack('>H', 0) # number of sectors to erase
d = struct.pack('>H', sector)
self._cmd(0x44, data=[d, ], predata=p)
# *** PandaDFU API ***
def erase_app(self):
self.erase_sector(1)
def erase_bootstub(self):
self.erase_sector(0)
def get_mcu_type(self):
return self._mcu_type
def clear_status(self):
pass
def close(self):
self.dev.close()
def program(self, address, dat):
bs = 256 # max block size for writing to flash over SPI
dat += b"\xFF" * ((bs - len(dat)) % bs)
for i in range(0, len(dat) // bs):
block = dat[i * bs:(i + 1) * bs]
self._cmd(0x31, data=[
struct.pack('>I', address + i*bs),
bytes([len(block) - 1]) + block,
])
def jump(self, address):
self.go_cmd(self._mcu_type.config.bootstub_address)

95
panda/python/usb.py Normal file
View File

@@ -0,0 +1,95 @@
import struct
from typing import List
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType
class PandaUsbHandle(BaseHandle):
def __init__(self, libusb_handle):
self._libusb_handle = libusb_handle
def close(self):
self._libusb_handle.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT):
return self._libusb_handle.controlWrite(request_type, request, value, index, data, timeout)
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT):
return self._libusb_handle.controlRead(request_type, request, value, index, length, timeout)
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = TIMEOUT) -> int:
return self._libusb_handle.bulkWrite(endpoint, data, timeout) # type: ignore
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
return self._libusb_handle.bulkRead(endpoint, length, timeout) # type: ignore
class STBootloaderUSBHandle(BaseSTBootloaderHandle):
DFU_DNLOAD = 1
DFU_UPLOAD = 2
DFU_GETSTATUS = 3
DFU_CLRSTATUS = 4
DFU_ABORT = 6
def __init__(self, libusb_device, libusb_handle):
self._libusb_handle = libusb_handle
# TODO: Find a way to detect F4 vs F2
# TODO: also check F4 BCD, don't assume in else
self._mcu_type = McuType.H7 if libusb_device.getbcdDevice() == 512 else McuType.F4
def _status(self) -> None:
while 1:
dat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if dat[1] == 0:
break
def _erase_page_address(self, address: int) -> None:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
self._status()
def get_mcu_type(self):
return self._mcu_type
def erase_app(self):
self._erase_page_address(self._mcu_type.config.app_address)
def erase_bootstub(self):
self._erase_page_address(self._mcu_type.config.bootstub_address)
def clear_status(self):
# Clear status
stat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if stat[4] == 0xa:
self._libusb_handle.controlRead(0x21, self.DFU_CLRSTATUS, 0, 0, 0)
elif stat[4] == 0x9:
self._libusb_handle.controlWrite(0x21, self.DFU_ABORT, 0, 0, b"")
self._status()
stat = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
def close(self):
self._libusb_handle.close()
def program(self, address, dat):
# Set Address Pointer
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
# Program
bs = self._mcu_type.config.block_size
dat += b"\xFF" * ((bs - len(dat)) % bs)
for i in range(0, len(dat) // bs):
ldat = dat[i * bs:(i + 1) * bs]
print("programming %d with length %d" % (i, len(ldat)))
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2 + i, 0, ldat)
self._status()
def jump(self, address):
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
try:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2, 0, b"")
_ = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
except Exception:
pass