use pyupgrade to update syntax (#1889)

This commit is contained in:
Cameron Clough
2024-02-24 21:56:28 +00:00
committed by GitHub
parent 96a3099398
commit 0c7d5f11d7
29 changed files with 122 additions and 130 deletions

View File

@@ -2,7 +2,6 @@
import os import os
import struct import struct
from functools import wraps from functools import wraps
from typing import Optional
from panda import Panda, PandaDFU from panda import Panda, PandaDFU
from panda.python.constants import McuType from panda.python.constants import McuType
@@ -57,7 +56,7 @@ class PandaJungle(Panda):
fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn.replace("panda", "panda_jungle")) fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn.replace("panda", "panda_jungle"))
super().flash(fn=fn, code=code, reconnect=reconnect) super().flash(fn=fn, code=code, reconnect=reconnect)
def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool: def recover(self, timeout: int | None = 60, reset: bool = True) -> bool:
dfu_serial = self.get_dfu_serial() dfu_serial = self.get_dfu_serial()
if reset: if reset:

View File

@@ -4,6 +4,6 @@ from panda import PandaJungle
if __name__ == "__main__": if __name__ == "__main__":
for p in PandaJungle.list(): for p in PandaJungle.list():
pp = PandaJungle(p) pp = PandaJungle(p)
print("%s: %s" % (pp.get_serial()[0], pp.get_version())) print(f"{pp.get_serial()[0]}: {pp.get_version()}")

View File

@@ -66,7 +66,7 @@ class Info():
message_id = message_id[2:] # remove leading '0x' message_id = message_id[2:] # remove leading '0x'
else: else:
message_id = hex(int(message_id))[2:] # old message IDs are in decimal message_id = hex(int(message_id))[2:] # old message IDs are in decimal
message_id = '%s:%s' % (bus, message_id) message_id = f'{bus}:{message_id}'
data = row[CSV_KEYS[dtype]["data"]] data = row[CSV_KEYS[dtype]["data"]]
if data.startswith('0x'): if data.startswith('0x'):

View File

@@ -52,7 +52,7 @@ class Info():
def load(self, filename): def load(self, filename):
"""Given a CSV file, adds information about message IDs and their values.""" """Given a CSV file, adds information about message IDs and their values."""
with open(filename, 'r') as inp: with open(filename) as inp:
reader = csv.reader(inp) reader = csv.reader(inp)
header = next(reader, None) header = next(reader, None)
if header[0] == 'time': if header[0] == 'time':
@@ -64,7 +64,7 @@ class Info():
for row in reader: for row in reader:
bus = row[2] bus = row[2]
message_id = hex(int(row[1]))[2:] message_id = hex(int(row[1]))[2:]
message_id = '%s:%s' % (bus, message_id) message_id = f'{bus}:{message_id}'
data = row[3] data = row[3]
self.store(message_id, data) self.store(message_id, data)
@@ -75,7 +75,7 @@ class Info():
message_id = row[1][2:] # remove leading '0x' message_id = row[1][2:] # remove leading '0x'
else: else:
message_id = hex(int(row[1]))[2:] # old message IDs are in decimal message_id = hex(int(row[1]))[2:] # old message IDs are in decimal
message_id = '%s:%s' % (bus, message_id) message_id = f'{bus}:{message_id}'
if row[1].startswith('0x'): if row[1].startswith('0x'):
data = row[2][2:] # remove leading '0x' data = row[2][2:] # remove leading '0x'
else: else:

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
from typing import List, Optional
from tqdm import tqdm from tqdm import tqdm
from panda import Panda from panda import Panda
from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, InvalidSubAddressError, \ from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, InvalidSubAddressError, \
@@ -25,7 +24,7 @@ if __name__ == "__main__":
addrs += [0x18da0000 + (i << 8) + 0xf1 for i in range(256)] addrs += [0x18da0000 + (i << 8) + 0xf1 for i in range(256)]
results = {} results = {}
sub_addrs: List[Optional[int]] = [None] sub_addrs: list[int | None] = [None]
if args.sub_addr: if args.sub_addr:
if args.sub_addr == "scan": if args.sub_addr == "scan":
sub_addrs = list(range(0xff + 1)) sub_addrs = list(range(0xff + 1))

View File

@@ -1,9 +1,11 @@
# https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml # https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml
[tool.ruff] [tool.ruff]
select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"]
ignore = ["W292", "E741", "E402", "C408", "ISC003"]
line-length = 160 line-length = 160
target-version="py311" target-version="py311"
[tool.ruff.lint]
select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"]
ignore = ["W292", "E741", "E402", "C408", "ISC003"]
flake8-implicit-str-concat.allow-multiline=false flake8-implicit-str-concat.allow-multiline=false
[tool.pytest.ini_options] [tool.pytest.ini_options]

View File

@@ -9,7 +9,6 @@ import binascii
import datetime import datetime
import logging import logging
from functools import wraps, partial from functools import wraps, partial
from typing import Optional
from itertools import accumulate from itertools import accumulate
from .base import BaseHandle from .base import BaseHandle
@@ -234,7 +233,7 @@ class Panda:
FLAG_FORD_LONG_CONTROL = 1 FLAG_FORD_LONG_CONTROL = 1
FLAG_FORD_CANFD = 2 FLAG_FORD_CANFD = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True, can_speed_kbps: int = 500): def __init__(self, serial: str | None = None, claim: bool = True, disable_checks: bool = True, can_speed_kbps: int = 500):
self._connect_serial = serial self._connect_serial = serial
self._disable_checks = disable_checks self._disable_checks = disable_checks
@@ -530,7 +529,7 @@ class Panda:
if reconnect: if reconnect:
self.reconnect() self.reconnect()
def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool: def recover(self, timeout: int | None = 60, reset: bool = True) -> bool:
dfu_serial = self.get_dfu_serial() dfu_serial = self.get_dfu_serial()
if reset: if reset:
@@ -549,7 +548,7 @@ class Panda:
return True return True
@staticmethod @staticmethod
def wait_for_dfu(dfu_serial: Optional[str], timeout: Optional[int] = None) -> bool: def wait_for_dfu(dfu_serial: str | None, timeout: int | None = None) -> bool:
t_start = time.monotonic() t_start = time.monotonic()
dfu_list = PandaDFU.list() dfu_list = PandaDFU.list()
while (dfu_serial is None and len(dfu_list) == 0) or (dfu_serial is not None and dfu_serial not in dfu_list): while (dfu_serial is None and len(dfu_list) == 0) or (dfu_serial is not None and dfu_serial not in dfu_list):
@@ -561,7 +560,7 @@ class Panda:
return True return True
@staticmethod @staticmethod
def wait_for_panda(serial: Optional[str], timeout: int) -> bool: def wait_for_panda(serial: str | None, timeout: int) -> bool:
t_start = time.monotonic() t_start = time.monotonic()
serials = Panda.list() serials = Panda.list()
while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials): while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials):

View File

@@ -1,6 +1,6 @@
import os import os
import enum import enum
from typing import List, NamedTuple from typing import NamedTuple
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../") BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
FW_PATH = os.path.join(BASEDIR, "board/obj/") FW_PATH = os.path.join(BASEDIR, "board/obj/")
@@ -10,7 +10,7 @@ USBPACKET_MAX_SIZE = 0x40
class McuConfig(NamedTuple): class McuConfig(NamedTuple):
mcu: str mcu: str
mcu_idcode: int mcu_idcode: int
sector_sizes: List[int] sector_sizes: list[int]
sector_count: int # total sector count, used for MCU identification in DFU mode sector_count: int # total sector count, used for MCU identification in DFU mode
uid_address: int uid_address: int
block_size: int block_size: int

View File

@@ -2,7 +2,6 @@ import os
import usb1 import usb1
import struct import struct
import binascii import binascii
from typing import List, Optional
from .base import BaseSTBootloaderHandle from .base import BaseSTBootloaderHandle
from .spi import STBootloaderSPIHandle, PandaSpiException from .spi import STBootloaderSPIHandle, PandaSpiException
@@ -11,9 +10,9 @@ from .constants import FW_PATH, McuType
class PandaDFU: class PandaDFU:
def __init__(self, dfu_serial: Optional[str]): def __init__(self, dfu_serial: str | None):
# try USB, then SPI # try USB, then SPI
handle: Optional[BaseSTBootloaderHandle] handle: BaseSTBootloaderHandle | None
self._context, handle = PandaDFU.usb_connect(dfu_serial) self._context, handle = PandaDFU.usb_connect(dfu_serial)
if handle is None: if handle is None:
self._context, handle = PandaDFU.spi_connect(dfu_serial) self._context, handle = PandaDFU.spi_connect(dfu_serial)
@@ -38,7 +37,7 @@ class PandaDFU:
self._context.close() self._context.close()
@staticmethod @staticmethod
def usb_connect(dfu_serial: Optional[str]): def usb_connect(dfu_serial: str | None):
handle = None handle = None
context = usb1.USBContext() context = usb1.USBContext()
context.open() context.open()
@@ -56,7 +55,7 @@ class PandaDFU:
return context, handle return context, handle
@staticmethod @staticmethod
def spi_connect(dfu_serial: Optional[str]): def spi_connect(dfu_serial: str | None):
handle = None handle = None
this_dfu_serial = None this_dfu_serial = None
@@ -72,13 +71,7 @@ class PandaDFU:
return None, handle return None, handle
@staticmethod @staticmethod
def list() -> List[str]: def usb_list() -> list[str]:
ret = PandaDFU.usb_list()
ret += PandaDFU.spi_list()
return list(set(ret))
@staticmethod
def usb_list() -> List[str]:
dfu_serials = [] dfu_serials = []
try: try:
with usb1.USBContext() as context: with usb1.USBContext() as context:
@@ -93,7 +86,7 @@ class PandaDFU:
return dfu_serials return dfu_serials
@staticmethod @staticmethod
def spi_list() -> List[str]: def spi_list() -> list[str]:
try: try:
_, h = PandaDFU.spi_connect(None) _, h = PandaDFU.spi_connect(None)
if h is not None: if h is not None:
@@ -134,3 +127,9 @@ class PandaDFU:
code = f.read() code = f.read()
self.program_bootstub(code) self.program_bootstub(code)
self.reset() self.reset()
@staticmethod
def list() -> list[str]:
ret = PandaDFU.usb_list()
ret += PandaDFU.spi_list()
return list(set(ret))

View File

@@ -79,10 +79,10 @@ def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None):
sends = [] sends = []
while len(x) > 0: while len(x) > 0:
if subaddr: if subaddr:
sends.append(((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00"))) sends.append((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00"))
x = x[6:] x = x[6:]
else: else:
sends.append(((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00"))) sends.append((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00"))
x = x[7:] x = x[7:]
idx += 1 idx += 1

View File

@@ -1,5 +1,5 @@
# mimic a python serial port # mimic a python serial port
class PandaSerial(object): class PandaSerial:
def __init__(self, panda, port, baud): def __init__(self, panda, port, baud):
self.panda = panda self.panda = panda
self.port = port self.port = port

View File

@@ -9,7 +9,7 @@ import logging
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
from functools import reduce from functools import reduce
from typing import Callable, List, Optional from collections.abc import Callable
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType, MCU_TYPE_BY_IDCODE, USBPACKET_MAX_SIZE from .constants import McuType, MCU_TYPE_BY_IDCODE, USBPACKET_MAX_SIZE
@@ -341,7 +341,7 @@ class STBootloaderSPIHandle(BaseSTBootloaderHandle):
elif data != self.ACK: elif data != self.ACK:
raise PandaSpiMissingAck raise PandaSpiMissingAck
def _cmd_no_retry(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes: def _cmd_no_retry(self, cmd: int, data: list[bytes] | None = None, read_bytes: int = 0, predata=None) -> bytes:
ret = b"" ret = b""
with self.dev.acquire() as spi: with self.dev.acquire() as spi:
# sync + command # sync + command
@@ -371,7 +371,7 @@ class STBootloaderSPIHandle(BaseSTBootloaderHandle):
return bytes(ret) return bytes(ret)
def _cmd(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes: def _cmd(self, cmd: int, data: list[bytes] | None = None, read_bytes: int = 0, predata=None) -> bytes:
exc = PandaSpiException() exc = PandaSpiException()
for n in range(MAX_XFER_RETRY_COUNT): for n in range(MAX_XFER_RETRY_COUNT):
try: try:

View File

@@ -1,7 +1,8 @@
import time import time
import struct import struct
from collections import deque from collections import deque
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast from typing import NamedTuple, Deque, cast
from collections.abc import Callable, Generator
from enum import IntEnum from enum import IntEnum
from functools import partial from functools import partial
@@ -300,8 +301,8 @@ def get_dtc_status_names(status):
return result return result
class CanClient(): class CanClient():
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], list[tuple[int, int, bytes, int]]],
tx_addr: int, rx_addr: int, bus: int, sub_addr: Optional[int] = None, debug: bool = False): tx_addr: int, rx_addr: int, bus: int, sub_addr: int | None = None, debug: bool = False):
self.tx = can_send self.tx = can_send
self.rx = can_recv self.rx = can_recv
self.tx_addr = tx_addr self.tx_addr = tx_addr
@@ -335,7 +336,7 @@ class CanClient():
msgs = self.rx() msgs = self.rx()
if drain: if drain:
if self.debug: if self.debug:
print("CAN-RX: drain - {}".format(len(msgs))) print(f"CAN-RX: drain - {len(msgs)}")
self.rx_buff.clear() self.rx_buff.clear()
else: else:
for rx_addr, _, rx_data, rx_bus in msgs or []: for rx_addr, _, rx_data, rx_bus in msgs or []:
@@ -366,7 +367,7 @@ class CanClient():
except IndexError: except IndexError:
pass # empty pass # empty
def send(self, msgs: List[bytes], delay: float = 0) -> None: def send(self, msgs: list[bytes], delay: float = 0) -> None:
for i, msg in enumerate(msgs): for i, msg in enumerate(msgs):
if delay and i != 0: if delay and i != 0:
if self.debug: if self.debug:
@@ -443,7 +444,7 @@ class IsoTpMessage():
if not setup_only: if not setup_only:
self._can_client.send([msg]) self._can_client.send([msg])
def recv(self, timeout=None) -> Tuple[Optional[bytes], bool]: def recv(self, timeout=None) -> tuple[bytes | None, bool]:
if timeout is None: if timeout is None:
timeout = self.timeout timeout = self.timeout
@@ -566,11 +567,11 @@ def get_rx_addr_for_tx_addr(tx_addr, rx_offset=0x8):
# standard 29 bit response addr (flip last two bytes) # standard 29 bit response addr (flip last two bytes)
return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF) return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF)
raise ValueError("invalid tx_addr: {}".format(tx_addr)) raise ValueError(f"invalid tx_addr: {tx_addr}")
class UdsClient(): class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: Optional[int] = None, bus: int = 0, sub_addr: Optional[int] = None, timeout: float = 1, def __init__(self, panda, tx_addr: int, rx_addr: int | None = None, bus: int = 0, sub_addr: int | None = None, timeout: float = 1,
debug: bool = False, tx_timeout: float = 1, response_pending_timeout: float = 10): debug: bool = False, tx_timeout: float = 1, response_pending_timeout: float = 10):
self.bus = bus self.bus = bus
self.tx_addr = tx_addr self.tx_addr = tx_addr
@@ -583,7 +584,7 @@ class UdsClient():
self.response_pending_timeout = response_pending_timeout self.response_pending_timeout = response_pending_timeout
# generic uds request # generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: Optional[int] = None, data: Optional[bytes] = None) -> bytes: def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int | None = None, data: bytes | None = None) -> bytes:
req = bytes([service_type]) req = bytes([service_type])
if subfunction is not None: if subfunction is not None:
req += bytes([subfunction]) req += bytes([subfunction])
@@ -623,12 +624,12 @@ class UdsClient():
if self.debug: if self.debug:
print("UDS-RX: response pending") print("UDS-RX: response pending")
continue continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) raise NegativeResponseError(f'{service_desc} - {error_desc}', service_id, error_code)
# positive response # positive response
if service_type + 0x40 != resp_sid: if service_type + 0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) raise InvalidServiceIdError(f'invalid response service id: {resp_sid_hex}')
if subfunction is not None: if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None resp_sfn = resp[1] if len(resp) > 1 else None
@@ -671,7 +672,7 @@ class UdsClient():
def tester_present(self, ): def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: Optional[bytes] = None): def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes | None = None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET) timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET)
@@ -714,8 +715,8 @@ class UdsClient():
"data": resp[2:], # TODO: parse the reset of response "data": resp[2:], # TODO: parse the reset of response
} }
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: Optional[BAUD_RATE_TYPE] = None): def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE | None = None):
data: Optional[bytes] data: bytes | None
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE # baud_rate_type = BAUD_RATE_TYPE
@@ -733,21 +734,21 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type: if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {} expected: {}'.format(hex(resp_id), hex(data_identifier_type))) raise ValueError(f'invalid response data identifier: {hex(resp_id)} expected: {hex(data_identifier_type)}')
return resp[2:] return resp[2:]
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1): def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4: if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
if memory_size_bytes < 1 or memory_size_bytes > 4: if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
data = bytes([memory_size_bytes << 4 | memory_address_bytes]) data = bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8): if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address)) raise ValueError(f'invalid memory_address: {memory_address}')
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8): if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size)) raise ValueError(f'invalid memory_size: {memory_size}')
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
@@ -758,7 +759,7 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type: if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
return resp[2:] # TODO: parse the response return resp[2:] # TODO: parse the response
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int): def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
@@ -767,11 +768,11 @@ class UdsClient():
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int,
source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): source_definitions: list[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4: if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
if memory_size_bytes < 1 or memory_size_bytes > 4: if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
data = struct.pack('!H', dynamic_data_identifier) data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
@@ -781,15 +782,15 @@ class UdsClient():
data += bytes([memory_size_bytes << 4 | memory_address_bytes]) data += bytes([memory_size_bytes << 4 | memory_address_bytes])
for s in source_definitions: for s in source_definitions:
if s.memory_address >= 1 << (memory_address_bytes * 8): if s.memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(s.memory_address)) raise ValueError(f'invalid memory_address: {s.memory_address}')
data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:] data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:]
if s.memory_size >= 1 << (memory_size_bytes * 8): if s.memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(s.memory_size)) raise ValueError(f'invalid memory_size: {s.memory_size}')
data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:] data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass pass
else: else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type))) raise ValueError(f'invalid dynamic identifier type: {hex(dynamic_definition_type)}')
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data) self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes): def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
@@ -797,20 +798,20 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type: if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1): def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1):
if memory_address_bytes < 1 or memory_address_bytes > 4: if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
if memory_size_bytes < 1 or memory_size_bytes > 4: if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
data = bytes([memory_size_bytes << 4 | memory_address_bytes]) data = bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8): if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address)) raise ValueError(f'invalid memory_address: {memory_address}')
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8): if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size)) raise ValueError(f'invalid memory_size: {memory_size}')
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
data += data_record data += data_record
@@ -864,7 +865,7 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type: if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) raise ValueError(f'invalid response data identifier: {hex(resp_id)}')
return resp[2:] return resp[2:]
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''): def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''):
@@ -872,23 +873,23 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type: if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) raise ValueError(f'invalid response routine identifier: {hex(resp_id)}')
return resp[2:] return resp[2:]
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00): def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00):
data = bytes([data_format]) data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4: if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
if memory_size_bytes < 1 or memory_size_bytes > 4: if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
data += bytes([memory_size_bytes << 4 | memory_address_bytes]) data += bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8): if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address)) raise ValueError(f'invalid memory_address: {memory_address}')
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8): if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size)) raise ValueError(f'invalid memory_size: {memory_size}')
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
@@ -896,7 +897,7 @@ class UdsClient():
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
else: else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}')
return max_num_bytes # max number of bytes per transfer data request return max_num_bytes # max number of bytes per transfer data request
@@ -904,16 +905,16 @@ class UdsClient():
data = bytes([data_format]) data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4: if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}')
if memory_size_bytes < 1 or memory_size_bytes > 4: if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}')
data += bytes([memory_size_bytes << 4 | memory_address_bytes]) data += bytes([memory_size_bytes << 4 | memory_address_bytes])
if memory_address >= 1 << (memory_address_bytes * 8): if memory_address >= 1 << (memory_address_bytes * 8):
raise ValueError('invalid memory_address: {}'.format(memory_address)) raise ValueError(f'invalid memory_address: {memory_address}')
data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] data += struct.pack('!I', memory_address)[4 - memory_address_bytes:]
if memory_size >= 1 << (memory_size_bytes * 8): if memory_size >= 1 << (memory_size_bytes * 8):
raise ValueError('invalid memory_size: {}'.format(memory_size)) raise ValueError(f'invalid memory_size: {memory_size}')
data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += struct.pack('!I', memory_size)[4 - memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
@@ -921,7 +922,7 @@ class UdsClient():
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0]
else: else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}')
return max_num_bytes # max number of bytes per transfer data request return max_num_bytes # max number of bytes per transfer data request
@@ -930,7 +931,7 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count: if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id)) raise ValueError(f'invalid block_sequence_count: {resp_id}')
return resp[1:] return resp[1:]
def request_transfer_exit(self): def request_transfer_exit(self):

View File

@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
""" """
Panda CAN Controller Dongle Panda CAN Controller Dongle
~~~~~ ~~~~~

View File

@@ -2,7 +2,7 @@
import os import os
import time import time
import threading import threading
from typing import Any, List from typing import Any
from panda import Panda from panda import Panda
@@ -38,7 +38,7 @@ if __name__ == "__main__":
threading.Thread(target=flood_tx, args=(sender,)).start() threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can in a few second time period # Receive as much as we can in a few second time period
rx: List[Any] = [] rx: list[Any] = []
old_len = 0 old_len = 0
start_time = time.time() start_time = time.time()
while time.time() - start_time < 3 or len(rx) > old_len: while time.time() - start_time < 3 or len(rx) > old_len:

View File

@@ -190,7 +190,7 @@ class ELMCarSimulator():
if pid == 0x02: # Show VIN if pid == 0x02: # Show VIN
return b"1D4GP00R55B123456" return b"1D4GP00R55B123456"
if pid == 0xFC: # test long multi message. Ligned up for LIN responses if pid == 0xFC: # test long multi message. Ligned up for LIN responses
return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80))) return b''.join(struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80))
if pid == 0xFD: # test long multi message if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80)) parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
return b'\xAA\xAA\xAA' + b''.join(parts) return b'\xAA\xAA\xAA' + b''.join(parts)
@@ -199,7 +199,7 @@ class ELMCarSimulator():
return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA' return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF: if pid == 0xFF:
return b'\xAA\x00\x00' + \ return b'\xAA\x00\x00' + \
b"".join(((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584))) b"".join((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584))
#return b"\xAA"*100#(0xFFF-3) #return b"\xAA"*100#(0xFFF-3)

View File

@@ -6,7 +6,7 @@ import select
class Reader(threading.Thread): class Reader(threading.Thread):
def __init__(self, s, *args, **kwargs): def __init__(self, s, *args, **kwargs):
super(Reader, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._s = s self._s = s
self.__stop = False self.__stop = False

View File

@@ -33,7 +33,7 @@ def logger(event):
for l in drain_serial(p)[::-1]: for l in drain_serial(p)[::-1]:
ns = l.decode('utf8').strip().split(' ') ns = l.decode('utf8').strip().split(' ')
if len(ns) == 4: if len(ns) == 4:
target_rpm, rpm_fast, power, stall_count = [int(n, 16) for n in ns] target_rpm, rpm_fast, power, stall_count = (int(n, 16) for n in ns)
break break
dat = { dat = {

View File

@@ -4,4 +4,4 @@ from panda import Panda
if __name__ == "__main__": if __name__ == "__main__":
for p in Panda.list(): for p in Panda.list():
pp = Panda(p) pp = Panda(p)
print("%s: %s" % (pp.get_serial()[0], pp.get_version())) print(f"{pp.get_serial()[0]}: {pp.get_version()}")

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from typing import Optional
from panda import Panda from panda import Panda
@@ -8,7 +7,7 @@ if __name__ == "__main__":
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_gmlan(bus=2) p.set_gmlan(bus=2)
#p.can_send(0xaaa, b"\x00\x00", bus=3) #p.can_send(0xaaa, b"\x00\x00", bus=3)
last_add: Optional[int] = None last_add: int | None = None
while True: while True:
ret = p.can_recv() ret = p.can_recv()
if len(ret) > 0: if len(ret) > 0:

View File

@@ -24,7 +24,7 @@ def test_send_recv(p, panda_jungle):
saturation_pct = (comp_kbps / speed) * 100.0 saturation_pct = (comp_kbps / speed) * 100.0
assert 80 < saturation_pct < 100 assert 80 < saturation_pct < 100
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, {:6.2f}%".format(bus, speed, comp_kbps, saturation_pct)) print(f"two pandas bus {bus}, 100 messages at speed {speed:4d}, comp speed is {comp_kbps:7.2f}, {saturation_pct:6.2f}%")
# Run tests in both directions # Run tests in both directions
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
@@ -59,7 +59,7 @@ def test_latency(p, panda_jungle):
r_echo = p_send.can_recv() r_echo = p_send.can_recv()
if len(r) == 0 or len(r_echo) == 0: if len(r) == 0 or len(r_echo) == 0:
print("r: {}, r_echo: {}".format(r, r_echo)) print(f"r: {r}, r_echo: {r_echo}")
assert len(r) == 1 assert len(r) == 1
assert len(r_echo) == 1 assert len(r_echo) == 1

View File

@@ -1,6 +1,5 @@
import time import time
import random import random
from typing import Optional
def get_random_can_messages(n): def get_random_can_messages(n):
@@ -52,7 +51,7 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, two_pandas=False, msg_le
return comp_kbps return comp_kbps
def clear_can_buffers(panda, speed: Optional[int] = None): def clear_can_buffers(panda, speed: int | None = None):
if speed is not None: if speed is not None:
for bus in range(3): for bus in range(3):
panda.set_can_speed_kbps(bus, speed) panda.set_can_speed_kbps(bus, speed)

View File

@@ -1,6 +1,6 @@
import os import os
from cffi import FFI from cffi import FFI
from typing import Any, List, Protocol from typing import Any, Protocol
from panda import LEN_TO_DLC from panda import LEN_TO_DLC
from panda.tests.libpanda.safety_helpers import PandaSafety, setup_safety_helpers from panda.tests.libpanda.safety_helpers import PandaSafety, setup_safety_helpers
@@ -64,7 +64,7 @@ class CANPacket:
returned: int returned: int
extended: int extended: int
addr: int addr: int
data: List[int] data: list[int]
class Panda(PandaSafety, Protocol): class Panda(PandaSafety, Protocol):
# CAN # CAN

View File

@@ -5,7 +5,7 @@ import time
import struct import struct
import itertools import itertools
import threading import threading
from typing import Any, Union, List from typing import Any
from panda import Panda from panda import Panda
@@ -16,7 +16,7 @@ if JUNGLE:
# Generate unique messages # Generate unique messages
NUM_MESSAGES_PER_BUS = 10000 NUM_MESSAGES_PER_BUS = 10000
messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)] messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)]
tx_messages = list(itertools.chain.from_iterable(([[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] for msg in messages))) tx_messages = list(itertools.chain.from_iterable([[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] for msg in messages))
def flood_tx(panda): def flood_tx(panda):
print('Sending!') print('Sending!')
@@ -35,7 +35,7 @@ def flood_tx(panda):
if __name__ == "__main__": if __name__ == "__main__":
serials = Panda.list() serials = Panda.list()
receiver: Union[Panda, PandaJungle] receiver: Panda | PandaJungle
if JUNGLE: if JUNGLE:
sender = Panda() sender = Panda()
receiver = PandaJungle() receiver = PandaJungle()
@@ -52,7 +52,7 @@ if __name__ == "__main__":
threading.Thread(target=flood_tx, args=(sender,)).start() threading.Thread(target=flood_tx, args=(sender,)).start()
# Receive as much as we can, and stop when there hasn't been anything for a second # Receive as much as we can, and stop when there hasn't been anything for a second
rx: List[Any] = [] rx: list[Any] = []
old_len = 0 old_len = 0
last_change = time.monotonic() last_change = time.monotonic()
while time.monotonic() - last_change < 1: while time.monotonic() - last_change < 1:

View File

@@ -12,23 +12,23 @@ if __name__ == "__main__":
print('Microsoft OS String Descriptor') print('Microsoft OS String Descriptor')
dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, length[0]) dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, length[0])
if DEBUG: if DEBUG:
print('LEN: {}'.format(hex(length[0]))) print(f'LEN: {hex(length[0])}')
hexdump("".join(map(chr, dat))) hexdump("".join(map(chr, dat)))
ms_vendor_code = dat[16] ms_vendor_code = dat[16]
if DEBUG: if DEBUG:
print('MS_VENDOR_CODE: {}'.format(hex(length[0]))) print(f'MS_VENDOR_CODE: {hex(length[0])}')
print('\nMicrosoft Compatible ID Feature Descriptor') print('\nMicrosoft Compatible ID Feature Descriptor')
length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1) length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1)
if DEBUG: if DEBUG:
print('LEN: {}'.format(hex(length[0]))) print(f'LEN: {hex(length[0])}')
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, length[0]) dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, length[0])
hexdump("".join(map(chr, dat))) hexdump("".join(map(chr, dat)))
print('\nMicrosoft Extended Properties Feature Descriptor') print('\nMicrosoft Extended Properties Feature Descriptor')
length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1) length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1)
if DEBUG: if DEBUG:
print('LEN: {}'.format(hex(length[0]))) print(f'LEN: {hex(length[0])}')
dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, length[0]) dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, length[0])
hexdump("".join(map(chr, dat))) hexdump("".join(map(chr, dat)))

View File

@@ -3,7 +3,7 @@ import abc
import unittest import unittest
import importlib import importlib
import numpy as np import numpy as np
from typing import Callable, Dict, List, Optional, Tuple from collections.abc import Callable
from opendbc.can.packer import CANPacker # pylint: disable=import-error from opendbc.can.packer import CANPacker # pylint: disable=import-error
from panda import ALTERNATIVE_EXPERIENCE from panda import ALTERNATIVE_EXPERIENCE
@@ -77,7 +77,7 @@ class PandaSafetyTestBase(unittest.TestCase):
def _generic_limit_safety_check(self, msg_function: MessageFunction, min_allowed_value: float, max_allowed_value: float, def _generic_limit_safety_check(self, msg_function: MessageFunction, min_allowed_value: float, max_allowed_value: float,
min_possible_value: float, max_possible_value: float, test_delta: float = 1, inactive_value: float = 0, min_possible_value: float, max_possible_value: float, test_delta: float = 1, inactive_value: float = 0,
msg_allowed = True, additional_setup: Optional[Callable[[float], None]] = None): msg_allowed = True, additional_setup: Callable[[float], None] | None = None):
""" """
Enforces that a signal within a message is only allowed to be sent within a specific range, min_allowed_value -> max_allowed_value. Enforces that a signal within a message is only allowed to be sent within a specific range, min_allowed_value -> max_allowed_value.
Tests the range of min_possible_value -> max_possible_value with a delta of test_delta. Tests the range of min_possible_value -> max_possible_value with a delta of test_delta.
@@ -245,13 +245,13 @@ class LongitudinalAccelSafetyTest(PandaSafetyTestBase, abc.ABC):
class LongitudinalGasBrakeSafetyTest(PandaSafetyTestBase, abc.ABC): class LongitudinalGasBrakeSafetyTest(PandaSafetyTestBase, abc.ABC):
MIN_BRAKE: int = 0 MIN_BRAKE: int = 0
MAX_BRAKE: Optional[int] = None MAX_BRAKE: int | None = None
MAX_POSSIBLE_BRAKE: Optional[int] = None MAX_POSSIBLE_BRAKE: int | None = None
MIN_GAS: int = 0 MIN_GAS: int = 0
MAX_GAS: Optional[int] = None MAX_GAS: int | None = None
INACTIVE_GAS = 0 INACTIVE_GAS = 0
MAX_POSSIBLE_GAS: Optional[int] = None MAX_POSSIBLE_GAS: int | None = None
def test_gas_brake_limits_correct(self): def test_gas_brake_limits_correct(self):
self.assertIsNotNone(self.MAX_POSSIBLE_BRAKE) self.assertIsNotNone(self.MAX_POSSIBLE_BRAKE)
@@ -665,9 +665,9 @@ class MotorTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase, abc.ABC):
class AngleSteeringSafetyTest(PandaSafetyTestBase): class AngleSteeringSafetyTest(PandaSafetyTestBase):
DEG_TO_CAN: float DEG_TO_CAN: float
ANGLE_RATE_BP: List[float] ANGLE_RATE_BP: list[float]
ANGLE_RATE_UP: List[float] # windup limit ANGLE_RATE_UP: list[float] # windup limit
ANGLE_RATE_DOWN: List[float] # unwind limit ANGLE_RATE_DOWN: list[float] # unwind limit
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
@@ -771,14 +771,14 @@ class AngleSteeringSafetyTest(PandaSafetyTestBase):
class PandaSafetyTest(PandaSafetyTestBase): class PandaSafetyTest(PandaSafetyTestBase):
TX_MSGS: Optional[List[List[int]]] = None TX_MSGS: list[list[int]] | None = None
SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space
*range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing *range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing
*range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing *range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing
*range(0x3300, 0x3400), # Honda *range(0x3300, 0x3400), # Honda
0x10400060, 0x104c006c] # GMLAN (exceptions, range/format unclear) 0x10400060, 0x104c006c] # GMLAN (exceptions, range/format unclear)
FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} # {bus: [addr]} FWD_BLACKLISTED_ADDRS: dict[int, list[int]] = {} # {bus: [addr]}
FWD_BUS_LOOKUP: Dict[int, int] = {} FWD_BUS_LOOKUP: dict[int, int] = {}
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
@@ -895,9 +895,9 @@ class PandaSafetyTest(PandaSafetyTestBase):
@add_regen_tests @add_regen_tests
class PandaCarSafetyTest(PandaSafetyTest): class PandaCarSafetyTest(PandaSafetyTest):
STANDSTILL_THRESHOLD: Optional[float] = None STANDSTILL_THRESHOLD: float | None = None
GAS_PRESSED_THRESHOLD = 0 GAS_PRESSED_THRESHOLD = 0
RELAY_MALFUNCTION_ADDRS: Optional[Dict[int, Tuple[int, ...]]] = None RELAY_MALFUNCTION_ADDRS: dict[int, tuple[int, ...]] | None = None
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):

View File

@@ -1,4 +1,3 @@
from typing import Tuple
import unittest import unittest
import panda.tests.safety.common as common import panda.tests.safety.common as common
@@ -77,8 +76,8 @@ class HyundaiButtonBase:
class HyundaiLongitudinalBase(common.LongitudinalAccelSafetyTest): class HyundaiLongitudinalBase(common.LongitudinalAccelSafetyTest):
# pylint: disable=no-member,abstract-method # pylint: disable=no-member,abstract-method
DISABLED_ECU_UDS_MSG: Tuple[int, int] DISABLED_ECU_UDS_MSG: tuple[int, int]
DISABLED_ECU_ACTUATION_MSG: Tuple[int, int] DISABLED_ECU_ACTUATION_MSG: tuple[int, int]
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import unittest import unittest
from typing import Dict, List
from panda import Panda from panda import Panda
from panda.tests.libpanda import libpanda_py from panda.tests.libpanda import libpanda_py
import panda.tests.safety.common as common import panda.tests.safety.common as common
@@ -147,8 +146,8 @@ class TestGmAscmSafety(GmLongitudinalBase, TestGmSafetyBase):
[0xA1, 1], [0x306, 1], [0x308, 1], [0x310, 1], # obs bus [0xA1, 1], [0x306, 1], [0x308, 1], [0x310, 1], # obs bus
[0x315, 2], # ch bus [0x315, 2], # ch bus
[0x104c006c, 3], [0x10400060, 3]] # gmlan [0x104c006c, 3], [0x10400060, 3]] # gmlan
FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} FWD_BLACKLISTED_ADDRS: dict[int, list[int]] = {}
FWD_BUS_LOOKUP: Dict[int, int] = {} FWD_BUS_LOOKUP: dict[int, int] = {}
BRAKE_BUS = 2 BRAKE_BUS = 2
MAX_GAS = 3072 MAX_GAS = 3072

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import unittest import unittest
import numpy as np import numpy as np
from typing import Optional
from panda import Panda from panda import Panda
from panda.tests.libpanda import libpanda_py from panda.tests.libpanda import libpanda_py
@@ -170,9 +169,9 @@ class HondaPcmEnableBase(common.PandaCarSafetyTest):
class HondaBase(common.PandaCarSafetyTest): class HondaBase(common.PandaCarSafetyTest):
MAX_BRAKE = 255 MAX_BRAKE = 255
PT_BUS: Optional[int] = None # must be set when inherited PT_BUS: int | None = None # must be set when inherited
STEER_BUS: Optional[int] = None # must be set when inherited STEER_BUS: int | None = None # must be set when inherited
BUTTONS_BUS: Optional[int] = None # must be set when inherited, tx on this bus, rx on PT_BUS BUTTONS_BUS: int | None = None # must be set when inherited, tx on this bus, rx on PT_BUS
STANDSTILL_THRESHOLD = 0 STANDSTILL_THRESHOLD = 0
RELAY_MALFUNCTION_ADDRS = {0: (0xE4, 0x194)} # STEERING_CONTROL RELAY_MALFUNCTION_ADDRS = {0: (0xE4, 0x194)} # STEERING_CONTROL