mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-02-18 17:23:52 +08:00
Merge branch 'upstream/master' into sync-20241122
This commit is contained in:
@@ -31,7 +31,7 @@ def calculate_checksum(data):
|
||||
res ^= b
|
||||
return res
|
||||
|
||||
def pack_can_buffer(arr):
|
||||
def pack_can_buffer(arr, fd=False):
|
||||
snds = [b'']
|
||||
for address, dat, bus in arr:
|
||||
assert len(dat) in LEN_TO_DLC
|
||||
@@ -41,7 +41,7 @@ def pack_can_buffer(arr):
|
||||
data_len_code = LEN_TO_DLC[len(dat)]
|
||||
header = bytearray(CANPACKET_HEAD_SIZE)
|
||||
word_4b = address << 3 | extended << 2
|
||||
header[0] = (data_len_code << 4) | (bus << 1)
|
||||
header[0] = (data_len_code << 4) | (bus << 1) | int(fd)
|
||||
header[1] = word_4b & 0xFF
|
||||
header[2] = (word_4b >> 8) & 0xFF
|
||||
header[3] = (word_4b >> 16) & 0xFF
|
||||
@@ -808,6 +808,9 @@ class Panda:
|
||||
def set_canfd_non_iso(self, bus, non_iso):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xfc, bus, int(non_iso), b'')
|
||||
|
||||
def set_canfd_auto(self, bus, auto):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe8, bus, int(auto), b'')
|
||||
|
||||
def set_uart_baud(self, uart, rate):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate / 300), b'')
|
||||
|
||||
@@ -829,15 +832,15 @@ class Panda:
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xc0, 0, 0, b'')
|
||||
|
||||
@ensure_can_packet_version
|
||||
def can_send_many(self, arr, timeout=CAN_SEND_TIMEOUT_MS):
|
||||
snds = pack_can_buffer(arr)
|
||||
def can_send_many(self, arr, *, fd=False, timeout=CAN_SEND_TIMEOUT_MS):
|
||||
snds = pack_can_buffer(arr, fd=fd)
|
||||
for tx in snds:
|
||||
while len(tx) > 0:
|
||||
bs = self._handle.bulkWrite(3, tx, timeout=timeout)
|
||||
tx = tx[bs:]
|
||||
|
||||
def can_send(self, addr, dat, bus, timeout=CAN_SEND_TIMEOUT_MS):
|
||||
self.can_send_many([[addr, dat, bus]], timeout=timeout)
|
||||
def can_send(self, addr, dat, bus, *, fd=False, timeout=CAN_SEND_TIMEOUT_MS):
|
||||
self.can_send_many([[addr, dat, bus]], fd=fd, timeout=timeout)
|
||||
|
||||
@ensure_can_packet_version
|
||||
def can_recv(self):
|
||||
|
||||
@@ -100,11 +100,14 @@ class PandaDFU:
|
||||
def st_serial_to_dfu_serial(st: str, mcu_type: McuType = McuType.F4):
|
||||
if st is None or st == "none":
|
||||
return None
|
||||
uid_base = struct.unpack("H" * 6, bytes.fromhex(st))
|
||||
if mcu_type == McuType.H7:
|
||||
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4], uid_base[3])).upper().decode("utf-8")
|
||||
else:
|
||||
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
|
||||
try:
|
||||
uid_base = struct.unpack("H" * 6, bytes.fromhex(st))
|
||||
if mcu_type == McuType.H7:
|
||||
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4], uid_base[3])).upper().decode("utf-8")
|
||||
else:
|
||||
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
|
||||
except struct.error:
|
||||
return None
|
||||
|
||||
def get_mcu_type(self) -> McuType:
|
||||
return self._mcu_type
|
||||
|
||||
@@ -402,7 +402,8 @@ class STBootloaderSPIHandle(BaseSTBootloaderHandle):
|
||||
|
||||
def get_chip_id(self) -> int:
|
||||
r = self._cmd(0x02, read_bytes=3)
|
||||
assert r[0] == 1 # response length - 1
|
||||
if r[0] != 1: # response length - 1
|
||||
raise PandaSpiException("incorrect response length")
|
||||
return ((r[1] << 8) + r[2])
|
||||
|
||||
def go_cmd(self, address: int) -> None:
|
||||
|
||||
Reference in New Issue
Block a user